Kafka의 레코드 를 컨슈밍한 후 s3 로 적재하는 Spark streaming 의 배치 duration 1분으로 줄여야 하는 미션이 있다.
실험 1
EXECUTOR : 5
EXECUTOR MEM : 3G
EXECUTOR CORE : 4
DRIVER CORE : 1
====
PROCESSING TIME : 2~3m
Line 별 오퍼레이션
124 : JSON을 df로 변환
123 : !rdd.isEmpty 비교
141 : 카프카에 오프셋 커밋
131 : df.write
Stage
131 라인 : 테이블 파티셔닝 하고 업로드하는 구간에서 병목이 일어난다.
위와 같이 대부분의 시간을 s3 업로드에 쓰고 있다.
실험 2
Executor를 10개로 늘렸다
EXECUTOR : 10
EXECUTOR MEM : 3G
EXECUTOR CORE : 4
DRIVER CORE : 1
PROCESSING TIME : 2~3m
Executor 늘리면 core가 많아지면서 task(=partition)수가 많아지고 레코드 수는 같으니까 파일이 더 작아지니까 s3 올리는 게 더 빡쎌 수 도 있어 보여서 확인 해봤는데 쌓이는 parquet 파일의 크기는 같다….?
실험 3
셔플링을 최소화하기 위해 Excutor와 core를 극단적으로 낮추고 다시 돌려본다.
EXECUTOR : 2
EXECUTOR MEM : 3G
EXECUTOR CORE : 1
DRIVER CORE : 1
ST
확실히 느려졌다. 분산처리가 빠르긴 한가보다.. 하긴 테이블 파티셔닝 같은거 하려면…
실험 4
이번에는 Executor를 아주 올려보자
EXECUTOR : 20
EXECUTOR MEM : 3G
EXECUTOR CORE : 3
DRIVER CORE : 1
5 -> 20 으로 올려도 처리시간이 딱히 안오른다.
실험 5
이번에는 core를 올려보자
EXECUTOR : 15
EXECUTOR MEM : 3G
EXECUTOR CORE : 5
DRIVER CORE : 2
원하는 성능이 안나옴
실험 5
이번에는 repartition 을 parititionby 이전에 하는 로직을 심은 후 배포하고 돌려보자.
EXECUTOR : 15
EXECUTOR MEM : 3G
EXECUTOR CORE : 5
Driver mem : 5G
DRIVER CORE : 2
드디어 원하는 성능인 1분 이하가 나왔다 !!
코어수 익스큐터 개수등 통제 못했다
실험 6
오리지날로 맞춰서 다시 돌린다.
EXECUTOR : 5
EXECUTOR MEM : 3G
EXECUTOR CORE : 4
DRIVER CORE : 1
제대로 나왔다.
정합성확인 및 결과 분석
S3 에 적재 되는 파일의 데이터가 확실히 커진 듯 하다
as-is
to-be
확실히 개별 파케이의 크기가 커진게 보인다. repartition 으로 묶기 때문에 그런 것 같다.
이를 통해 s3 업로드 부하가 줄어들어, 업로드 속도가 개선된듯 하다. (s3업로드 진행 시 오버헤드가 줄어들음)
리파티션을 했으니 데이터 적재에 변동 가능성이 있으니 확인 하자
TO-BE 데이터 조회
df1 = spark.read.option("basePath", "s3://***/streamingtuning/").parquet("s3://***streamingtuning/api_version=2/topic=***/action=*/platform=*/year=2022/month=5/day=22/")
df1.printSchema()
df2 = df1.select("log_time","s3_entry_time", "entry_time", "month", "day").orderBy(desc("entry_time"))
df2.show(1000)
df2.count()
결과
스키마
***
카운트
1693141
일부 Value 비교
**
AS-IS 쿼리
df1 = spark.read.option("basePath", "s3://***/gtm/").parquet("s3://***gtm/api_version=2/topic=***/action=*/platform=*/year=2022/month=5/day=22/")
df1.printSchema()
df2 = df1.select("log_time","s3_entry_time", "entry_time", "month", "day").orderBy(desc("entry_time"))
df2.show(1000)
df2.count()
결과
스키마
**
카운트
1693141
Value 비교
***
***는 보안상 가렸다. 결과적으로 정합성에 문제가 없었다!
Skipped stage 라는 것이 있는데 뭔지 알아보겠다.
일단 테스크 수가 200 개이다
35개인 이전 것과 다르다
Job 1023
잡 안에 두개의 스테이지가 있는데
셔플 아웃풋이 일어난다. 아마 repartition 동작
셔플리드가 일어난다. 아마 partitionby, s3 write 가 일어난다.
skip 된 부분은 이미 repartition 과정에서 적절하게 partitioning 되었기 떄문에 스킵하는 것으로 보인다.
repartition 하면서 적절히 shuffle 된 데이터를 partitionby 에서 업로드만 시켜준다.
AS-IS
셔플리드, 셔플 아웃풋이 없다.
빨라진 이유
partitionby 는 각 파티션 ( 코어)에 있는 데이터 조각들을 그대로 (셔플링 없이) partitionby 조건대로 파일분할 하여 업로드 하기 때문에 파일 이 작아지고 많아지기 때문에 s3 업로드 로직에서 성능저하가 일어났다.
하지만 repartition을 미리 해줄 경우 partition 조건에 맞게 셔플링이 일어나고 파일을 병합하므로 s3 업로드할 때 파일의 개수가 줄어들기 때문에 s3 업로드 가 빨라지고 성능이 향상된다.
참고 : https://tantusdata.com/spark-shuffle-case-1-partition-by-and-repartition/
'일 > spark' 카테고리의 다른 글
Spark Streaming Graceful Shut Down (0) | 2022.05.01 |
---|---|
Spark streaming :: Kafka Dstream (0) | 2022.02.22 |
Spark #2 :: RDD (0) | 2022.02.05 |
Spark #1 :: Spark 개념, 구조 (0) | 2022.02.05 |