본문 바로가기
일/spark

Spark streaming :: Kafka Dstream

by blair2dev 2022. 2. 22.
  • 스파크 스트리밍 
    • 짧은 주기의 배치 처리를 통해 이전 배치와 다음 배치 사이의 데이터 크기를 최소화 하는 방법 
    • 각각의 배치에 생성된 데이터가 하나의 RDD 로 처리됨
    • 일정 주기 마다 새로운 RDD를 읽어 와서 이전에 생성했던 RDD 와 연결해서 필요한 처리를 어플리케이션이 종료될 때까지 무한히 반복한다.
      • 스트리밍컨텍스트
        • 스파크 스트리밍을 사용하기 위해 스트리밍 컨텍스트 인스턴스를 먼저 생성해야 한다. 스파크를 사용할 때 이전 버전의 spark context 와 현 버전의 spark session 을 사용하듯
          JavaStreamingContext ssc = new JavaStreamingContext( conf, new Duration(batchDuration));​
        • 또한 SparkConf 객체를 생성 해서 인자로 넣어주어야 한다. 
          SparkConf conf = new SparkConf().setAppName("gtm_streaming").setMaster(setMaster);​
          여기서 set master 에 들어가는 인자는 local[*] 와 같은 인자 값인데 인자 당 의미하는 바는 아래 공식문서에 있다.
          https://spark.apache.org/docs/latest/submitting-applications.html
        • 스파크 스트리밍은 아래와 같이 여러 솔루션에서 연속적인 데이터를 얻어낼 수 있도록 라이브러리가 개발되어있고 이를 사용하면 편하다.

 

 

  • 카프카 스파크 스트리밍과 Dstream 
    • 카프카 컨슈머로서 스파크 스트리밍을 사용하기 위해 카프카 프로퍼티를 만들어야 한다. 
    • Map<String,Object> kafkaParams = new HashMap<String,Object>();
              kafkaParams.put("bootstrap.servers", property.getProperty("kafka.servers"));
              kafkaParams.put("key.deserializer", StringDeserializer.class.getName());
              kafkaParams.put("value.deserializer", StringDeserializer.class.getName());
              kafkaParams.put("group.id", property.getProperty("kafka.consumer.group"));
              kafkaParams.put("auto.offset.reset", property.getProperty("kafka.consumer.offset.reset"));
              kafkaParams.put("enable.auto.commit", false);
              String topicsPattern = property.getProperty("kafka.topics");
    • Spark Dstream (Discretized Streams)
      • 스파크는 Dstream 이라는 데이터모델을 사용한다. 이는 연속적인 데이터를 나타내는 추상 모델이다. 
      • Dstream 은 일정한 시간 간격 사이에 새로 생성된 데이터를 모아서 한꺼번에 처리하는 로직이다. 
      • 일정시간마다 데이터를 모아서 RDD 만드는데 이 RDD 컬렉션을 Dstream 이라고 한다. 
      • Collection<String> topics = Arrays.asList(property.getProperty("kafka.topics"));
                JavaInputDStream<ConsumerRecord<String,String>> stream = KafkaUtils.createDirectStream(
                        ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String,String>SubscribePattern(topicPat, kafkaParams));
      • 위와 같이 kafka 를 활용하는 경우 topic regex 패턴과 kafka 설정, 위에서 생성한 Spark 스트리밍 컨텍스트를 인자로 넣어준다.
      • 이후 Dstream 에 들어있는 RDD의 record 들 마다 operation 을 적용할 수 있다.

      • stream.map(raw-> {// 스키마 변형 로직 예시
                        ObjectMapper mapper = new ObjectMapper();
                        ObjectNode result_json = mapper.createObjectNode();
        
                        JsonNode temp_json = mapper.readTree(raw.value());
                        Iterator<Map.Entry<String, JsonNode>> iter = temp_json.fields();
        
                        while ( iter.hasNext() ) {// 필드마다 키밸류를 가져와서 result_json 에 넣음
                            Map.Entry<String, JsonNode> item = iter.next();
        
                            result_json.put(item.getKey(), item.getValue());
                            //field 한개 씩 record에 put 한다
                        }
                        return result_json.toString();
                })​
         
  • foreachRDD
    • dstream.foreachRDD 는 외부로 데이터를 전달하는 강력한 메서드이다. 
    • Dstream에 포함된 rdd에 직접 접근해서 필요한 작업 가능
    • 여기서 실행하는 작업은 개별 클러스터 노드의 익스큐터에서 실행된다. 드라이버에서 실행할 로직인지 개별 익스큐터에서 rdd로 처리할 로직인지 구분해서 개발해야 한다. 
    • 데이터 프레임 작업을 하기 위해 메서드 내부에서 rdd를 사용해 데이터프레임을 생성할 수 있다. 이때 스파크 세션이 중복 생성 되는 것을 방지하기 위해 builder 의 getOrCreate() 메소드를 사용한다.
      .foreachRDD( rdd -> {// 이쪽은 아웃풋 오퍼레이션 dstream의 각 rdd 처리
                  try{
      
                      SparkSession spark = SparkSession.builder().config(rdd.rdd().sparkContext().getConf()).getOrCreate();
                      spark.sparkContext().hadoopConfiguration().set("fs.s3a.access.key", property.getProperty("s3.access.key"));
                      spark.sparkContext().hadoopConfiguration().set("fs.s3a.secret.key", property.getProperty("s3.secret.key"));
      
                      if (!rdd.isEmpty()) { // rdd는 레코드들의 집합
                          Dataset<Row> df = spark.read().json(rdd.rdd());
                          logger.info("################    S3 load start");
      
                          df = df.withColumn("s3EntryTime", functions.lit(String.valueOf(LocalDateTime.now())));
                          df.printSchema();
                          df.write()
                                  .partitionBy("topic", "year", "month", "day")
                                  .option("mergeSchema", true)
                                  .mode(SaveMode.Append)
                                  .parquet(property.getProperty("s3.save.dir"));
      
                      }
                  }catch (Exception exception) {
                      logger.info("[collect] "+exception);
                  }
              });​
  • Kafka Offset commit 
    • 스파크 스트리밍 Dstream 에서 exactly-once(정확히 한번) commit을 달성하기 위한 정책은 공식 문서에서 세 가지를 추천한다. 
      • spark check point 
        • 장점 : 설정하기 쉽다.
        • 단점 : 코드 변경이 있을 경우 체크 포인트를 리커버 해줘야 함
      • kafka offset topic 을 활용
        • 장점 : 코드가 변경되어도 상관 없음, 구현이 용이
        • 단점 : kafka 자체가 transactional 하지 않음 따라서 failure 상황에서 100% 보장이 안됨
      • 자신만의 스토리지 이용 
        • 장점 : transactional 한 스토리지를 사용하면 문제 발생 확률이 낮다. 
        • 단점 : 구현, 운영하는 것이 번거롭고, 성능 문제 가능성이 있다. 
    • 위 세 가지 방법 중 kafka offset topic 활용방식을 택했다. 
stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // some time later, after outputs have completed
  ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});

 

 

    • 배치시간 튜닝
      • s3 업로드 하는데 걸리는 시간 <= 배치 만드는 시간 duration
        • 거꾸로 되면 배치만드는 것이 점차 쌓이고 메모리    있다. 
        • 업로드 시간과 최적의 시간으로 맞춰야 
    • shutdown hook 안만든 이유 
      • 받은  까지 s3  업로드 하는 로직을 만들어야  
        • 컨슈머 에서 받은  까지 끊어서 rdd 생성 힘듬
          • 왜냐면 rdd  스팍 dstream 에서 배치 시간 단위로 컨슈밍해서 짤라서 알아서 만들어 주기 때문에 받은 곳까지의 rdd 만들어서 map() 하고 s3  업로드 하는 로직을 따로 만들어야 하는데  이렇게 하려면 dstream  못쓰고 커스텀으로 새로 만들어야   같음
        • shutdown hook  들어와서 프로세스가 불시 중지되어도 데이터 중복/누락  확률이 극히 낮음
          • rdd  번들로 만든  rdd 자체를 s3  통으로 업로드. 업로드 완료되면 offset range 통으로 커밋한다. 
            • 따라서 컨슈밍 중간에 끊겨도 어차피 s3 올라간 데이터가 없음 
            • 다시 이전 오프셋부터 중복으로 받아도 s3 에는 데이터 중복이 일어나지 않는다.
          • 중복/누락될 가능성이 있는 case 
            • s3 업로드 완료  commit  종료
              •  다음 로직인 offset commit  일어나지 않기 때문에 중복 가능성 있다
              • 참고: s3 업로드 중간에 프로세스 중지 되어도 중복 없음
              • s3 tmp 쪽에 쌓다가 한번에 머지
              • tmp 쌓다가 끊기면 삭제되고 리셋됨
              • 다시 처음부터 올리기 때문에 중복/손실 없음

' > spark' 카테고리의 다른 글

Spark Streaming 성능 Tuning  (0) 2022.05.24
Spark Streaming Graceful Shut Down  (0) 2022.05.01
Spark #2 :: RDD  (0) 2022.02.05
Spark #1 :: Spark 개념, 구조  (0) 2022.02.05