본문 바로가기
일/spark

Spark Streaming Graceful Shut Down

by blair2dev 2022. 5. 1.
  1. 개요
    1. 스파크 스트리밍의 데이터 유실중복을 막기 위해 셧다운  정상적인 종료가 필요
      1. 우리의 카프카 컨슈밍, 스트리밍 어플리케이션의 데이터, 유실 중복 가능성은 극히 낮다.
        하지만 데이터의 100% 보장과 다른 메세지를 사용하는 경우에 사용하게  것을 대비하며 graceful shutdown  구현 하였다
      2. 우리 어플리케이션의 데이터 중복 case  s3 업로드  commit  강제종료 (극히 낮은 ms 단위의 찰라에 걸려야 한다. )
        -> 
        offset commit 안됐기 때문에 다음 실행  같은 데이터를  가져온다
  2. 방법 1 - 옵션을 넣은  드라이버 노드에서 종료  (사용
    1. spark 옵션을 설정한다 spark.streaming.stopGracefullyOnShutdown = true 
      1. 옵션 넣고 
      2. 드라이버에서 시그텀 주면 되는데 
        1. 드라이버가 어디 노드에 뜨는  랜덤하다
        2. client mode  하면 실행시킨 서버에서 하면된다
          1. yarn application -kill 하고 spark UI  에서 kill 하는 거하고 같다  경우는 kill -9 이다 
            1. yarn application -kill 하면 아래옵션에 의해  SIGTERM  있은  250 밀리초 이후 SIGKILL  있다.
              yarn.nodemanager.sleep-delay-before-sigkill.ms ( default 250ms ) (옵션을 바꿔도 적용이 안된다는 말이 있다.)
            2. 따라서 셧다운 로직 전에 강제종료   있기 때문에 맞는 종료 방법이 아니다
        3. 드라이버로 들어가서 명령어 주는 방법 
          1. yarn application -list | grep gtm_logs_processor
          2. application id 알아 낸다
          3. yarn application -status <application Id>
          4. AM Host  값이 driver node  주소 이다.
          5. ssh -i EMR-CLUSTER.key ec2-user@AM Host
          6. ps -ef | grep <app name>
          7. kill -15 <PID> 
            1. 이거   줘야 한다  이유는 spark.yarn.maxAppAttempt 라는 옵션이 2 이기 때문에 자동으로 되살아난다.
            2. 커맨드를 날리면 약간의 시간 이후에 종료된다
  3. 방법 2 - ssc.stop(true, true), MARKER FILE 활용
    • ssc.stop(true, true)
      1. 라는 구문이 돌아가게 스파크 어플리케이션 내부에 구현한다
      2.  번째 파라미터가 gracefulshutdown = true 이다
      3. ssc.start() 구문 다음에 무한루프로 조건을 체킹하는 로직을 넣어주면 되는데 조건은 marker file  유무로 하는 경우가 많다
        1. marker file  s3 특정 위치에 업로드 하면 
        2. 무한루프 조건문 로직에서 해당 마커파일을 모니터링 하다가 해당파일이 업로드 되었다는 것을 알게되고 
        3. ssc.stop(true, true)  동작하기 하는 것이다

    • 아놔 티스토리 왜 그림이던 코드던 저 아래가서 붙는거지 -_- 열받아...........
       
      1. 장점은 셧다운 시키기가  방법보다 편하다는  
  4. 방법 3 - jvm shutdown hook
    1. sys.ShutdownHookThread { ssc.stop(true,true) }
    2. 위와 같이 셧다운  스레드를 구현 하는 방법이 있지만 데드락 이슈가 있어서 안쓴다고 한다
ㅁㅇㄹ
sdf
ssc.start()
    val checkIntervalMillis = 2 * 1000 * 1
    var isStopped = false

    while (!isStopped) {
      println("check stopped or not")
      isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis)
      if (isStopped)
        println("Stopped")
      else
        println("Running")
      checkShutdownMarker
      if (!isStopped && stopMarker) {
        println("Ready to stop Spark Streaming")
        ssc.stop(true, true)
      }
    }
  }

  def checkShutdownMarker = {
    if (!stopMarker) {
      val fs = FileSystem.get(hadoopConf)
      stopMarker = fs.exists(shutdownMarkerPath)
    }
  }

 

 

' > spark' 카테고리의 다른 글

Spark Streaming 성능 Tuning  (0) 2022.05.24
Spark streaming :: Kafka Dstream  (0) 2022.02.22
Spark #2 :: RDD  (0) 2022.02.05
Spark #1 :: Spark 개념, 구조  (0) 2022.02.05