- 스파크 스트리밍
- 짧은 주기의 배치 처리를 통해 이전 배치와 다음 배치 사이의 데이터 크기를 최소화 하는 방법
- 각각의 배치에 생성된 데이터가 하나의 RDD 로 처리됨
- 일정 주기 마다 새로운 RDD를 읽어 와서 이전에 생성했던 RDD 와 연결해서 필요한 처리를 어플리케이션이 종료될 때까지 무한히 반복한다.
- 스트리밍컨텍스트
- 스파크 스트리밍을 사용하기 위해 스트리밍 컨텍스트 인스턴스를 먼저 생성해야 한다. 스파크를 사용할 때 이전 버전의 spark context 와 현 버전의 spark session 을 사용하듯
JavaStreamingContext ssc = new JavaStreamingContext( conf, new Duration(batchDuration));
- 또한 SparkConf 객체를 생성 해서 인자로 넣어주어야 한다.
여기서 set master 에 들어가는 인자는 local[*] 와 같은 인자 값인데 인자 당 의미하는 바는 아래 공식문서에 있다.SparkConf conf = new SparkConf().setAppName("gtm_streaming").setMaster(setMaster);
https://spark.apache.org/docs/latest/submitting-applications.html - 스파크 스트리밍은 아래와 같이 여러 솔루션에서 연속적인 데이터를 얻어낼 수 있도록 라이브러리가 개발되어있고 이를 사용하면 편하다.
- 스파크 스트리밍을 사용하기 위해 스트리밍 컨텍스트 인스턴스를 먼저 생성해야 한다. 스파크를 사용할 때 이전 버전의 spark context 와 현 버전의 spark session 을 사용하듯
- 스트리밍컨텍스트
- 카프카 스파크 스트리밍과 Dstream
- 카프카 컨슈머로서 스파크 스트리밍을 사용하기 위해 카프카 프로퍼티를 만들어야 한다.
-
Map<String,Object> kafkaParams = new HashMap<String,Object>(); kafkaParams.put("bootstrap.servers", property.getProperty("kafka.servers")); kafkaParams.put("key.deserializer", StringDeserializer.class.getName()); kafkaParams.put("value.deserializer", StringDeserializer.class.getName()); kafkaParams.put("group.id", property.getProperty("kafka.consumer.group")); kafkaParams.put("auto.offset.reset", property.getProperty("kafka.consumer.offset.reset")); kafkaParams.put("enable.auto.commit", false); String topicsPattern = property.getProperty("kafka.topics");
- Spark Dstream (Discretized Streams)
- 스파크는 Dstream 이라는 데이터모델을 사용한다. 이는 연속적인 데이터를 나타내는 추상 모델이다.
- Dstream 은 일정한 시간 간격 사이에 새로 생성된 데이터를 모아서 한꺼번에 처리하는 로직이다.
- 일정시간마다 데이터를 모아서 RDD 만드는데 이 RDD 컬렉션을 Dstream 이라고 한다.
-
Collection<String> topics = Arrays.asList(property.getProperty("kafka.topics")); JavaInputDStream<ConsumerRecord<String,String>> stream = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String,String>SubscribePattern(topicPat, kafkaParams));
- 위와 같이 kafka 를 활용하는 경우 topic regex 패턴과 kafka 설정, 위에서 생성한 Spark 스트리밍 컨텍스트를 인자로 넣어준다.
- 이후 Dstream 에 들어있는 RDD의 record 들 마다 operation 을 적용할 수 있다.
-
stream.map(raw-> {// 스키마 변형 로직 예시 ObjectMapper mapper = new ObjectMapper(); ObjectNode result_json = mapper.createObjectNode(); JsonNode temp_json = mapper.readTree(raw.value()); Iterator<Map.Entry<String, JsonNode>> iter = temp_json.fields(); while ( iter.hasNext() ) {// 필드마다 키밸류를 가져와서 result_json 에 넣음 Map.Entry<String, JsonNode> item = iter.next(); result_json.put(item.getKey(), item.getValue()); //field 한개 씩 record에 put 한다 } return result_json.toString(); })
- foreachRDD
- dstream.foreachRDD 는 외부로 데이터를 전달하는 강력한 메서드이다.
- Dstream에 포함된 rdd에 직접 접근해서 필요한 작업 가능
- 여기서 실행하는 작업은 개별 클러스터 노드의 익스큐터에서 실행된다. 드라이버에서 실행할 로직인지 개별 익스큐터에서 rdd로 처리할 로직인지 구분해서 개발해야 한다.
- 데이터 프레임 작업을 하기 위해 메서드 내부에서 rdd를 사용해 데이터프레임을 생성할 수 있다. 이때 스파크 세션이 중복 생성 되는 것을 방지하기 위해 builder 의 getOrCreate() 메소드를 사용한다.
.foreachRDD( rdd -> {// 이쪽은 아웃풋 오퍼레이션 dstream의 각 rdd 처리 try{ SparkSession spark = SparkSession.builder().config(rdd.rdd().sparkContext().getConf()).getOrCreate(); spark.sparkContext().hadoopConfiguration().set("fs.s3a.access.key", property.getProperty("s3.access.key")); spark.sparkContext().hadoopConfiguration().set("fs.s3a.secret.key", property.getProperty("s3.secret.key")); if (!rdd.isEmpty()) { // rdd는 레코드들의 집합 Dataset<Row> df = spark.read().json(rdd.rdd()); logger.info("################ S3 load start"); df = df.withColumn("s3EntryTime", functions.lit(String.valueOf(LocalDateTime.now()))); df.printSchema(); df.write() .partitionBy("topic", "year", "month", "day") .option("mergeSchema", true) .mode(SaveMode.Append) .parquet(property.getProperty("s3.save.dir")); } }catch (Exception exception) { logger.info("[collect] "+exception); } });
- Kafka Offset commit
- 스파크 스트리밍 Dstream 에서 exactly-once(정확히 한번) commit을 달성하기 위한 정책은 공식 문서에서 세 가지를 추천한다.
- spark check point
- 장점 : 설정하기 쉽다.
- 단점 : 코드 변경이 있을 경우 체크 포인트를 리커버 해줘야 함
- kafka offset topic 을 활용
- 장점 : 코드가 변경되어도 상관 없음, 구현이 용이
- 단점 : kafka 자체가 transactional 하지 않음 따라서 failure 상황에서 100% 보장이 안됨
- 자신만의 스토리지 이용
- 장점 : transactional 한 스토리지를 사용하면 문제 발생 확률이 낮다.
- 단점 : 구현, 운영하는 것이 번거롭고, 성능 문제 가능성이 있다.
- spark check point
- 위 세 가지 방법 중 kafka offset topic 활용방식을 택했다.
- 스파크 스트리밍 Dstream 에서 exactly-once(정확히 한번) commit을 달성하기 위한 정책은 공식 문서에서 세 가지를 추천한다.
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});
- 배치시간 튜닝
- s3 업로드 하는데 걸리는 시간 <= 배치 만드는 시간 duration
- 거꾸로 되면 배치만드는 것이 점차 쌓이고 메모리 풀 될 수 있다.
- 업로드 시간과 최적의 시간으로 맞춰야 함
- s3 업로드 하는데 걸리는 시간 <= 배치 만드는 시간 duration
- shutdown hook 안만든 이유
- 받은 곳 까지 s3 를 업로드 하는 로직을 만들어야 함
- 컨슈머 에서 받은 곳 까지 끊어서 rdd 생성 힘듬
- 왜냐면 rdd 를 스팍 dstream 에서 배치 시간 단위로 컨슈밍해서 짤라서 알아서 만들어 주기 때문에 받은 곳까지의 rdd를 만들어서 map() 하고 s3 에 업로드 하는 로직을 따로 만들어야 하는데 이렇게 하려면 dstream 못쓰고 커스텀으로 새로 만들어야 할 것 같음
- shutdown hook 이 들어와서 프로세스가 불시 중지되어도 데이터 중복/누락 될 확률이 극히 낮음
- rdd 를 번들로 만든 후 rdd 자체를 s3 에 통으로 업로드. 업로드 완료되면 offset range 통으로 커밋한다.
- 따라서 컨슈밍 중간에 끊겨도 어차피 s3에 올라간 데이터가 없음
- 다시 이전 오프셋부터 중복으로 받아도 s3 에는 데이터 중복이 일어나지 않는다.
- 중복/누락될 가능성이 있는 case
- s3 업로드 완료 후 commit 중 종료
- 그 다음 로직인 offset commit 이 일어나지 않기 때문에 중복 가능성 있다
- 참고: s3 업로드 중간에 프로세스 중지 되어도 중복 없음
- s3 tmp 쪽에 쌓다가 한번에 머지
- tmp 쌓다가 끊기면 삭제되고 리셋됨
- 다시 처음부터 올리기 때문에 중복/손실 없음
- s3 업로드 완료 후 commit 중 종료
- rdd 를 번들로 만든 후 rdd 자체를 s3 에 통으로 업로드. 업로드 완료되면 offset range 통으로 커밋한다.
- 컨슈머 에서 받은 곳 까지 끊어서 rdd 생성 힘듬
- 받은 곳 까지 s3 를 업로드 하는 로직을 만들어야 함
'일 > spark' 카테고리의 다른 글
Spark Streaming 성능 Tuning (0) | 2022.05.24 |
---|---|
Spark Streaming Graceful Shut Down (0) | 2022.05.01 |
Spark #2 :: RDD (0) | 2022.02.05 |
Spark #1 :: Spark 개념, 구조 (0) | 2022.02.05 |