- 개요
- 스파크 스트리밍의 데이터 유실, 중복을 막기 위해 셧다운 시 정상적인 종료가 필요
- 우리의 카프카 컨슈밍, 스트리밍 어플리케이션의 데이터, 유실 중복 가능성은 극히 낮다.
하지만 데이터의 100% 보장과 다른 메세지를 사용하는 경우에 사용하게 될 것을 대비하며 graceful shutdown 을 구현 하였다. - 우리 어플리케이션의 데이터 중복 case 는 s3 업로드 후 commit 중 강제종료 (극히 낮은 ms 단위의 찰라에 걸려야 한다. )
-> offset commit이 안됐기 때문에 다음 실행 시 같은 데이터를 또 가져온다.
- 우리의 카프카 컨슈밍, 스트리밍 어플리케이션의 데이터, 유실 중복 가능성은 극히 낮다.
- 스파크 스트리밍의 데이터 유실, 중복을 막기 위해 셧다운 시 정상적인 종료가 필요
- 방법 1 - 옵션을 넣은 후 드라이버 노드에서 종료 (사용)
- spark 옵션을 설정한다. spark.streaming.stopGracefullyOnShutdown = true
- 옵션 넣고
- 드라이버에서 시그텀 주면 되는데
- 드라이버가 어디 노드에 뜨는 지 랜덤하다
- client mode 로 하면 실행시킨 서버에서 하면된다.
- yarn application -kill 하고 spark UI 에서 kill 하는 거하고 같다 이 경우는 kill -9 이다
- yarn application -kill 하면 아래옵션에 의해 SIGTERM 이 있은 후 250 밀리초 이후 SIGKILL 이 있다.
yarn.nodemanager.sleep-delay-before-sigkill.ms ( default 250ms ) (옵션을 바꿔도 적용이 안된다는 말이 있다.) - 따라서 셧다운 로직 전에 강제종료 될 수 있기 때문에 맞는 종료 방법이 아니다.
- yarn application -kill 하면 아래옵션에 의해 SIGTERM 이 있은 후 250 밀리초 이후 SIGKILL 이 있다.
- yarn application -kill 하고 spark UI 에서 kill 하는 거하고 같다 이 경우는 kill -9 이다
- 드라이버로 들어가서 명령어 주는 방법
- yarn application -list | grep gtm_logs_processor
- application id를 알아 낸다.
- yarn application -status <application Id>
- AM Host 의 값이 driver node 의 주소 이다.
- ssh -i EMR-CLUSTER.key ec2-user@AM Host
- ps -ef | grep <app name>
- kill -15 <PID>
- 이거 두 번 줘야 한다 이유는 spark.yarn.maxAppAttempt 라는 옵션이 2 이기 때문에 자동으로 되살아난다.
- 커맨드를 날리면 약간의 시간 이후에 종료된다.
- spark 옵션을 설정한다. spark.streaming.stopGracefullyOnShutdown = true
- 방법 2 - ssc.stop(true, true), MARKER FILE 활용
- ssc.stop(true, true)
- 라는 구문이 돌아가게 스파크 어플리케이션 내부에 구현한다.
- 두 번째 파라미터가 gracefulshutdown = true 이다.
- ssc.start() 구문 다음에 무한루프로 조건을 체킹하는 로직을 넣어주면 되는데 조건은 marker file 의 유무로 하는 경우가 많다.
- marker file 을 s3 특정 위치에 업로드 하면
- 무한루프 조건문 로직에서 해당 마커파일을 모니터링 하다가 해당파일이 업로드 되었다는 것을 알게되고
- ssc.stop(true, true) 가 동작하기 하는 것이다.
아놔 티스토리 왜 그림이던 코드던 저 아래가서 붙는거지 -_- 열받아...........
- 장점은 셧다운 시키기가 위 방법보다 편하다는 것
- ssc.stop(true, true)
- 방법 3 - jvm shutdown hook
- sys.ShutdownHookThread { ssc.stop(true,true) }
- 위와 같이 셧다운 훅 스레드를 구현 하는 방법이 있지만 데드락 이슈가 있어서 안쓴다고 한다.
ㅁㅇㄹ
sdf
ssc.start()
val checkIntervalMillis = 2 * 1000 * 1
var isStopped = false
while (!isStopped) {
println("check stopped or not")
isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis)
if (isStopped)
println("Stopped")
else
println("Running")
checkShutdownMarker
if (!isStopped && stopMarker) {
println("Ready to stop Spark Streaming")
ssc.stop(true, true)
}
}
}
def checkShutdownMarker = {
if (!stopMarker) {
val fs = FileSystem.get(hadoopConf)
stopMarker = fs.exists(shutdownMarkerPath)
}
}
'일 > spark' 카테고리의 다른 글
Spark Streaming 성능 Tuning (0) | 2022.05.24 |
---|---|
Spark streaming :: Kafka Dstream (0) | 2022.02.22 |
Spark #2 :: RDD (0) | 2022.02.05 |
Spark #1 :: Spark 개념, 구조 (0) | 2022.02.05 |