본문 바로가기
일/spark

Spark Streaming 성능 Tuning

by blair2dev 2022. 5. 24.

Kafka의 레코드 를 컨슈밍한 후 s3 로 적재하는 Spark streaming 의 배치 duration 1분으로 줄여야 하는 미션이 있다. 

 

 

 

 


 실험 1

EXECUTOR : 5

EXECUTOR MEM : 3G

EXECUTOR CORE : 4

DRIVER CORE : 1 

====

PROCESSING TIME : 2~3m 

 

Line 별 오퍼레이션 

124 : JSON df로 변환 

123 : !rdd.isEmpty 비교

141 : 카프카에 오프셋 커밋 

131 : df.write 

 

Stage

131 라인 :  테이블 파티셔닝 하고 업로드하는 구간에서 병목이 일어난다. 

위와 같이 대부분의 시간을 s3 업로드에 쓰고 있다. 

 

 


 

 실험 2

Executor 10개로 늘렸다  

 

EXECUTOR : 10

EXECUTOR MEM : 3G

EXECUTOR CORE : 4 

DRIVER CORE : 1 

PROCESSING TIME : 2~3m 

Executor 늘리면 core가 많아지면서 task(=partition)수가 많아지고 레코드 수는 같으니까 파일이 더 작아지니까 s3 올리는 게 더 빡쎌 수 도 있어 보여서 확인 해봤는데 쌓이는 parquet 파일의 크기는 같다….? 

 

 


 실험 3

셔플링을 최소화하기 위해 Excutor core를 극단적으로 낮추고 다시 돌려본다. 

 

EXECUTOR : 2

EXECUTOR MEM : 3G

EXECUTOR CORE : 1 

DRIVER CORE : 1 

 ST

확실히 느려졌다. 분산처리가 빠르긴 한가보다.. 하긴 테이블 파티셔닝 같은거 하려면

 

 


 실험 4

이번에는 Executor를 아주 올려보자  

 

EXECUTOR : 20

EXECUTOR MEM : 3G

EXECUTOR CORE : 3

DRIVER CORE : 1 

5 -> 20 으로 올려도 처리시간이 딱히 안오른다. 

 

 


 실험 5

이번에는 core를 올려보자   

 

EXECUTOR : 15

EXECUTOR MEM : 3G

EXECUTOR CORE : 5

DRIVER CORE : 2 

원하는 성능이 안나옴 

 

 


 실험 5

이번에는 repartition  parititionby 이전에 하는 로직을 심은 후 배포하고 돌려보자.

EXECUTOR : 15

EXECUTOR MEM : 3G

EXECUTOR CORE : 5

Driver mem : 5G

DRIVER CORE : 2 

 

드디어 원하는 성능인 1분 이하가 나왔다 !!

코어수 익스큐터 개수등 통제 못했다 

 

 


 실험 6

오리지날로 맞춰서 다시 돌린다. 

 

EXECUTOR : 5

EXECUTOR MEM : 3G

EXECUTOR CORE : 4 

DRIVER CORE : 1 

제대로 나왔다.

 

 

 

 

 


 정합성확인 및 결과 분석

S3 에 적재 되는 파일의 데이터가 확실히 커진 듯 하다

 

as-is

 

 

to-be

확실히 개별 파케이의 크기가 커진게 보인다. repartition 으로 묶기 때문에 그런 것 같다. 

이를 통해 s3 업로드 부하가 줄어들어, 업로드 속도가 개선된듯 하다. (s3업로드 진행 오버헤드가 줄어들음)

 

 

리파티션을 했으니 데이터 적재에 변동 가능성이 있으니 확인 하자  

 

TO-BE 데이터 조회

df1 = spark.read.option("basePath", "s3://***/streamingtuning/").parquet("s3://***streamingtuning/api_version=2/topic=***/action=*/platform=*/year=2022/month=5/day=22/")

 

df1.printSchema()

df2 = df1.select("log_time","s3_entry_time", "entry_time", "month", "day").orderBy(desc("entry_time"))

df2.show(1000)

df2.count()

 

결과 

스키마

***

 

카운트

1693141

 

일부 Value 비교 

**

 

AS-IS 쿼리 

df1 = spark.read.option("basePath", "s3://***/gtm/").parquet("s3://***gtm/api_version=2/topic=***/action=*/platform=*/year=2022/month=5/day=22/")

 

df1.printSchema()

df2 = df1.select("log_time","s3_entry_time", "entry_time", "month", "day").orderBy(desc("entry_time"))

df2.show(1000)

df2.count()

 

결과

스키마

**

 

카운트 

1693141

 

Value 비교

***

 

***는 보안상 가렸다. 결과적으로 정합성에 문제가 없었다!

 

 

 

 

Skipped stage 라는 것이 있는데 뭔지 알아보겠다. 

일단 테스크 수가 200 개이다 

35개인 이전 것과 다르다 

Job 1023 


잡 안에 두개의 스테이지가 있는데 

셔플 아웃풋이 일어난다.  아마 repartition 동작

셔플리드가 일어난다. 아마 partitionby, s3 write 가 일어난다. 

 

skip 된 부분은 이미 repartition 과정에서 적절하게 partitioning 되었기 떄문에 스킵하는 것으로 보인다.

repartition 하면서 적절히 shuffle 된 데이터를 partitionby 에서 업로드만 시켜준다. 

 


AS-IS

셔플리드, 셔플 아웃풋이 없다. 

 

 

빨라진 이유

 

partitionby 는 각 파티션 ( 코어)에 있는 데이터 조각들을 그대로 (셔플링 없이) partitionby 조건대로 파일분할 하여 업로드 하기 때문에 파일 이 작아지고 많아지기 때문에 s3 업로드 로직에서 성능저하가 일어났다. 

 

하지만 repartition을 미리 해줄 경우 partition 조건에 맞게 셔플링이 일어나고 파일을 병합하므로 s3 업로드할 때 파일의 개수가 줄어들기 때문에 s3 업로드 가 빨라지고 성능이 향상된다. 

 

참고 : https://tantusdata.com/spark-shuffle-case-1-partition-by-and-repartition/

 

 

 

 

 

 

 

 

' > spark' 카테고리의 다른 글

Spark Streaming Graceful Shut Down  (0) 2022.05.01
Spark streaming :: Kafka Dstream  (0) 2022.02.22
Spark #2 :: RDD  (0) 2022.02.05
Spark #1 :: Spark 개념, 구조  (0) 2022.02.05