본문 바로가기

35

Spark Streaming 성능 Tuning Kafka의 레코드 를 컨슈밍한 후 s3 로 적재하는 Spark streaming 의 배치 duration 1분으로 줄여야 하는 미션이 있다. 실험 1 EXECUTOR : 5 EXECUTOR MEM : 3G EXECUTOR CORE : 4 DRIVER CORE : 1 ==== PROCESSING TIME : 2~3m Line 별 오퍼레이션 124 : JSON을 df로 변환 123 : !rdd.isEmpty 비교 141 : 카프카에 오프셋 커밋 131 : df.write Stage 131 라인 : 테이블 파티셔닝 하고 업로드하는 구간에서 병목이 일어난다. 위와 같이 대부분의 시간을 s3 업로드에 쓰고 있다. 실험 2 Executor를 10개로 늘렸다 EXECUTOR : 10 EXECUTOR MEM : 3G.. 2022. 5. 24.
Spark Streaming Graceful Shut Down 개요 스파크 스트리밍의 데이터 유실, 중복을 막기 위해 셧다운 시 정상적인 종료가 필요 우리의 카프카 컨슈밍, 스트리밍 어플리케이션의 데이터, 유실 중복 가능성은 극히 낮다. 하지만 데이터의 100% 보장과 다른 메세지를 사용하는 경우에 사용하게 될 것을 대비하며 graceful shutdown 을 구현 하였다. 우리 어플리케이션의 데이터 중복 case 는 s3 업로드 후 commit 중 강제종료 (극히 낮은 ms 단위의 찰라에 걸려야 한다. ) -> offset commit이 안됐기 때문에 다음 실행 시 같은 데이터를 또 가져온다. 방법 1 - 옵션을 넣은 후 드라이버 노드에서 종료 (사용) spark 옵션을 설정한다. spark.streaming.stopGracefullyOnShutdown = tru.. 2022. 5. 1.
AWS Glue 활용기 #4 :: Glue Crawler 트러블 슈팅 2/2 이전 포스팅 의 결과에 따라 특정 칼럼을 활용하여 버전을 파티셔닝 하고 각각 자동으로 테이블을 생성 하기로 하였다. ai 팀에서 api_version 이라는 칼럼에 스키마 버전을 표기해 주기로 하였다. 해당 칼럼으로 파티셔닝 (spark streaming 코드 수정) 해당 칼럼 위치 기준 단일 스키마 크롤링 크롤러가 칼럼의 값 마다 테이블 생성 함 ai 팀에서 버전 표기를 놓친 경우(실수로) 이러한 경우에만 아래 방법을 적용한다. 수동 athena DDL 로 새로운 테이블 따로 생성 헤서 바뀐 스키마로 조회가 가능 하도록 한다. 크롤러에 스키마 변경 옵션 "새 열만 추가"로 바꾼다. 새로운 테이블로 조회한다. 다시 스키마 변경되는 경우에 복구한다. CREATE EXTERNAL TABLE `TableNam.. 2022. 5. 1.
AWS Glue 활용기 #4 :: Glue Crawler 트러블 슈팅 1/2 Gluw crawler 는 s3 데이터를 분석하여 글루 데이터 카탈로그 스키마를 만들어 내는 서비스 이다. 문제점 gtm 수집 쪽에서 스키마를 변경할 경우 s3 에 기존 데이터와 스키마가 혼재되어 아테나 쿼리할 때 에러가 발생한다. 필드의 데이터 타입을 변경 하였을 때 크롤러를 돌려도 변경감지가 안돼고 기존의 데이터 타입으로 인식한다. 이를 해결하고 정책 수립 한 테이블에서 혼재된 스키마 활용가능한지 한 테이블에서 사용하지 못한다면 대체 방법 ? 1. 연구 1 : 스키마가 혼재 되어 있을 때 기존 테이블에서 인식하여 한 테이블로 처리가 가능한지 확인 1.1 먼저 데이터 타입을 변경한 것이 반영되지 않는 것을 해결해본다. 1.1.1 크롤러 옵션 변경을 통해 시도 crawl new folders only -.. 2022. 5. 1.
AWS Glue 활용기 #3 :: Glue workflow 1. 워크플로 생성 크롤러가 완료되면 글루 잡을 실행 시키는 워크 플로 만들기 워크플로 추가 선택 2. 워크 플로의 이름을 정해준다. 나머지는 빈칸으로 두면 된다. 3. 생성한 워크 플로를 선택한 후 하단의 트리거 추가를 클릭한다. 4. 새로 추가 탭에서 트리거 이름을 정한 후 온디멘드 (수동 실행) 5. 그래프 탭에서 노드 추가를 선택한다. 6. 크롤러 탭에서 크롤러를 선택한다. 7. 그래프 탭에서 작업 -> 트리거 추가를 클릭 8. 이름을 정해주고 트리거 유형은 event, 트리거 로직은 "모든 감시 이후에 시작"을 선택한다. 9. 그래프에 아래와 같이 나타난다. 왼편에 노드 추가를 선택한다. 10. 크롤러 탭에서 크롤러를 선택한 후 추가를 선택한다. 11. 오른 편에 노드 추가를 클릭 12. 작업 .. 2022. 3. 27.
AWS Glue 활용기 #2 :: Glue job 개발 개요 Spark streaming의 마이크로 배치로 과하게 많이 쌓인 parquet file merge 하는 프로세스 개발 개념 glue 는 spark context 를 래핑하는 glue context 라는 것이 있다. glue job 에서는 아래 dataframe 변환 과정을 거친다. glue dynamic frame 으로 데이터를 가져온다. spark dataframe 으로 변환 후 데이터 처리를 한다. glue dynamic frame 으로 변환하여 저장 스크립트 import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext fro.. 2022. 3. 27.
AWS Glue 활용기 #1 :: Architect 개요 Spark Streaming 에서 마이크로 배치(https://blair2dev.tistory.com/29)로 쌓이는 parquet 파일들의 갯수가 너무 많아서 daily로 merge 하여 다른 테이블로 일 단위로 적재하는 프로세스 개발 요청이 있었다. AWS 에서 제공하는 glue 를 활용하여 문제를 해결해보기로 하였다. 설계 2.1 daily 스케쥴링으로 글루 크롤러가 마이크로 배치가 쌓이는 stream-source 에서 소스 데이터 카탈로그 테이블을 정의한다. 2.2 크롤러가 완료되면 parquet merge 프로세스가 개발된 Glue job 을 실행시킨다. 2.3 daily merge된 parquet 들이 stream-target s3 로 저장된다. 2.4 글루잡이 완료되면 크롤러가 발동되고.. 2022. 3. 27.
Kafka Consumer Thread 예제 컨슈머 스레드 예제 코드 컨슈머 스레드인데 general 하게 사용 가능할 것 같아 공유 한다. 파티션 갯수만큼 스레드를 생성 셧다운 훅을 통한 graceful shutdown 구현 OOM 방지를 위한 wait logic 리밸런스 리스너 구현 public static void initCollect () { collectExecutorService = Executors.newCachedThreadPool(); String partitionNum = PropertiesUtil.getProperty("kafka.partitionNum"); int partitionLength = Integer.parseInt(partitionNum); for(String topic :topics) { for (int i = 0.. 2022. 2. 23.
Spark streaming :: Kafka Dstream 스파크 스트리밍 짧은 주기의 배치 처리를 통해 이전 배치와 다음 배치 사이의 데이터 크기를 최소화 하는 방법 각각의 배치에 생성된 데이터가 하나의 RDD 로 처리됨 일정 주기 마다 새로운 RDD를 읽어 와서 이전에 생성했던 RDD 와 연결해서 필요한 처리를 어플리케이션이 종료될 때까지 무한히 반복한다. 스트리밍컨텍스트 스파크 스트리밍을 사용하기 위해 스트리밍 컨텍스트 인스턴스를 먼저 생성해야 한다. 스파크를 사용할 때 이전 버전의 spark context 와 현 버전의 spark session 을 사용하듯 JavaStreamingContext ssc = new JavaStreamingContext( conf, new Duration(batchDuration));​ 또한 SparkConf 객체를 생성 해서.. 2022. 2. 22.
Kafka lag evaluation 각 서비스의 role Burrow : kafka 로부터 offset등의 데이터를 수집, API response 로 데이터 전달 Telegraf : Burrow 에 request 를 통해서 데이터를 받아, ES 에 적재 (index 설정 ) es: telegraf 로 부터 받은 데이터 적재 grafana : es의 데이터 조회 및 시각화 Grafana Data source 설정 데이터를 받아오기 위해 DATA source 설정 URL : 데이터가 적재된 ES의 주소 Index name : es의 index 이름 (RDB의 테이블 개념) Time field name : 시간정보가 있는 field (column 개념) grafana 데이터 확인 Explorer → datasource : burrow_es_da.. 2022. 2. 22.
Java 비동기 서버 Trouble Shooting :: Heap MEM, Thread Pool Stress Test 컨슈밍 방법을 earliest 로 바꾸고 15일간 데이터를 한번에 땡겨 오도록 한다. 현상 관찰 카프카 Offset commit with offsets {A.ETM.book-8=OffsetAndMetadata{offset=11364522, leaderEpoch=null, metadata=''}} failed org. apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.error.. 2022. 2. 21.
java 동시성 제어 2022. 2. 9.
Spark #2 :: RDD 1. RDD 1. 스파크에서 쓰는 기본 데이터 구조 2. 스파크는 내부에서 처리하는 데이터 들을 모두 RDD 타입으로 한다. 3. RDD 의 특징 1. Immutable 변형 불가 1. 변형 불가이기 때문에 생성과정을 되짚으면 데이터가 꺠져도 복구할 수 있다. 2. 여러 노드에 분산됨 3. 다수의 파티션으로 관리됨 1. 효율적으로 클러스터 노드에 분산됨 4. RDD 생성 1. 외부 (s3, hdfs) 에서 데이터를 로딩할 때 2. 코드에서 생성되는 데이터를 저장할 때 5. RDD 연산 의 두가지 타입 1. Transformation 1. RDD 에서 새로운 RDD 를 생성한다. 1. filter 2. map 2. Action 1. RDD에서 다른 타입의 데이터를 뽑아냄 1. count() 2. colle.. 2022. 2. 5.
Spark #1 :: Spark 개념, 구조 하둡의 문제점 대용량 일괄 배치 에느 효과적이지만 디스크 기반 이므로 많은 입출력 , 네트워크 트래픽 발생 실시간 데이터 처리에 단점 비동기적으로 발생하는 데이터 처리에 비효율적 반복작업에 약함 하둡 문제점 해결방안 데이터 처리 방식을 disk 방식에서 memory 방식으로 전환 최초 데이터 로드와 최종결과 저장시에만 disk 사용 중간결과는 memory 활용 disk IO -> MR -> disk IO -> MR -> …. disk IO -> MR -> memory -> MR -> memory -> … 인메모리 기반의 데이터 처리 SW -> SPARK Spark SQL SQL 기반으로 Spark Streaming 실시간 데이터 처리 MLlib 머신러닝 처리 GraphX Spark core 스파크의 분산.. 2022. 2. 5.
Amazon EMR 구축 #2 :: 프로비저닝 아키텍처 구성 Airflow , edge node airflow 를 통해 spark job submit이 가능하도록 따로 인스턴스를 두었다. EMR Cluster master 와 core를 하나씩 두고 task 노드를 spot 으로 하여 처리량에 따라 최소 비용으로 scalble 이 가능하도록 auto scaling 을 설정 Bastion 외부에서 ssh 를 통해 작업이 가능하도록 해준다. NAT gateway private 망의 노드들이 패키지 설치 등을 위해 외부 망에 접근이 필요하므로 outbound 를 허용해준다. EFS 소스 등 파일시스템 공유를 위해 EMR 클러스터 생성 네트워크 인터페이스(eni) 생성 마스터 노드의 IP를 고정하기 위해서 IP 를 홀딩 해두는 목적으로 사용한다. EMR 이 .. 2022. 2. 1.
Amazon EMR 구축 #1 :: 개념 정리 짤막 history... 팀에서 사용중이던 cloudera 솔루션이 사용 요금을 올리기로 정해지면서 Spark 워크로드를 aws EMR 로 마이그레이션해야 하는 미션이 생겼다. EMR 이란? AWS 는 맵리듀스 를 위한 클라우드 인프라 프로비저닝을 위한 서비스를 만들어 두었고, 그것을 Elastic Map Reduce 라고 이름지었다. 이 EMR 은 사용자의 설정에 따라 scalable 하게 인스턴스를 프로비저닝 하면서 맵리듀스를 사용하기 위한 성능 최적화를 쉽게 이룰 수 있게 해준다. EMR은 하둡 구조의 인프라를 활용하는 대부분의 app 을 사용할 수 있도록 프로비저닝 부트스트랩 단계에서 자동 설치 해준다. EMR 구성 노드 EMR 클러스터를 구축하려면 노드에 대한 개념부터 인지하고 있어야 한다. -.. 2022. 2. 1.
Java Spring Boot Multi Threading @Async Annotation을 활용하면 비동기 메소드를 손쉽게 작성할 수 있다. @Async 어노테이션이 선언된 메소드는 비동기 메소드로 동작하게 된다. (AOP) @Async 어노테이션이 선언된 메소드는 리턴 타입에 따라 내부적으로 상이하게 동작한다. void : 별도의 쓰레드로 실행되며, 리턴이 없으므로 결과를 기다리지 않고 다음 로직이 수행된다. ThreadPoolTaskExcutor에 의해 자동으로 스레드를 새로 생성해서 비동기 블록 로직을 수행한다. logger.info(“start”) @Async public void t1 (){ logger.info(“start async logic”) Thread.sleep(1000); logger.info(“end async logic”) } logg.. 2022. 1. 11.
AWS DMS Consistency Checking Application Concept aurora DB 와 s3 의 정합성을 체크 하는 애플리케이션이 필요하다. =============================================================== aws doc에 뭔가 있다… https://docs.aws.amazon.com/ko_kr/dms/latest/userguide/CHAP_Validating.html 정확히 마이그레이션 됐는지 데이터 검증함 데이터 전체 로드한 후 검증 시작 증분 변경 사항이 일어날 때 비교 (그럼 계속하겠다는건데? ) 원본, 타겟 행간 비교 후 불일치 보고 Data validation works with the following databases wherever AWS DMS supports them as source and targe.. 2022. 1. 11.
Spring Boot JPA #6 POST 이번에는 hits, likes 칼럼을 추가하여 디비에 넣어본다. 위와 같이 @Column 어노테이션을 붙여서 객체를 정의해준다. 컨트롤러에 hits, likes 를 추가한 후 여기서 @RequestBody 는 post request json body를 객체로 매핑해준다. jpaRepository를 상속한 noticeRepository에 요청들어온 데이터를 notice 객체로 매핑하여 저장한다. GET 데이터를 요청하는 GET API 를 받아주는 method를 만들어 보자 먼저 디비생성하고 데이터를 넣어 준다. 데이터가 유지되도록 ddl-auto : none, generate-ddl: true 로 설정을 바꾼다. 위와 같이 메소드 작성 @PathVariable 은 Get 요청 주소에서 받아주는 파.. 2022. 1. 9.