- Spark Streaming 성능 Tuning Kafka의 레코드 를 컨슈밍한 후 s3 로 적재하는 Spark streaming 의 배치 duration 1분으로 줄여야 하는 미션이 있다. 실험 1 EXECUTOR : 5 EXECUTOR MEM : 3G EXECUTOR CORE : 4 DRIVER CORE : 1 ==== PROCESSING TIME : 2~3m Line 별 오퍼레이션 124 : JSON을 df로 변환 123 : !rdd.isEmpty 비교 141 : 카프카에 오프셋 커밋 131 : df.write Stage 131 라인 : 테이블 파티셔닝 하고 업로드하는 구간에서 병목이 일어난다. 위와 같이 대부분의 시간을 s3 업로드에 쓰고 있다. 실험 2 Executor를 10개로 늘렸다 EXECUTOR : 10 EXECUTOR MEM : 3G.. 2022.05.24
- Spark Streaming Graceful Shut Down 개요 스파크 스트리밍의 데이터 유실, 중복을 막기 위해 셧다운 시 정상적인 종료가 필요 우리의 카프카 컨슈밍, 스트리밍 어플리케이션의 데이터, 유실 중복 가능성은 극히 낮다. 하지만 데이터의 100% 보장과 다른 메세지를 사용하는 경우에 사용하게 될 것을 대비하며 graceful shutdown 을 구현 하였다. 우리 어플리케이션의 데이터 중복 case 는 s3 업로드 후 commit 중 강제종료 (극히 낮은 ms 단위의 찰라에 걸려야 한다. ) -> offset commit이 안됐기 때문에 다음 실행 시 같은 데이터를 또 가져온다. 방법 1 - 옵션을 넣은 후 드라이버 노드에서 종료 (사용) spark 옵션을 설정한다. spark.streaming.stopGracefullyOnShutdown = tru.. 2022.05.01
- AWS Glue 활용기 #4 :: Glue Crawler 트러블 슈팅 2/2 이전 포스팅 의 결과에 따라 특정 칼럼을 활용하여 버전을 파티셔닝 하고 각각 자동으로 테이블을 생성 하기로 하였다. ai 팀에서 api_version 이라는 칼럼에 스키마 버전을 표기해 주기로 하였다. 해당 칼럼으로 파티셔닝 (spark streaming 코드 수정) 해당 칼럼 위치 기준 단일 스키마 크롤링 크롤러가 칼럼의 값 마다 테이블 생성 함 ai 팀에서 버전 표기를 놓친 경우(실수로) 이러한 경우에만 아래 방법을 적용한다. 수동 athena DDL 로 새로운 테이블 따로 생성 헤서 바뀐 스키마로 조회가 가능 하도록 한다. 크롤러에 스키마 변경 옵션 "새 열만 추가"로 바꾼다. 새로운 테이블로 조회한다. 다시 스키마 변경되는 경우에 복구한다. CREATE EXTERNAL TABLE `TableNam.. 2022.05.01
- AWS Glue 활용기 #4 :: Glue Crawler 트러블 슈팅 1/2 Gluw crawler 는 s3 데이터를 분석하여 글루 데이터 카탈로그 스키마를 만들어 내는 서비스 이다. 문제점 gtm 수집 쪽에서 스키마를 변경할 경우 s3 에 기존 데이터와 스키마가 혼재되어 아테나 쿼리할 때 에러가 발생한다. 필드의 데이터 타입을 변경 하였을 때 크롤러를 돌려도 변경감지가 안돼고 기존의 데이터 타입으로 인식한다. 이를 해결하고 정책 수립 한 테이블에서 혼재된 스키마 활용가능한지 한 테이블에서 사용하지 못한다면 대체 방법 ? 1. 연구 1 : 스키마가 혼재 되어 있을 때 기존 테이블에서 인식하여 한 테이블로 처리가 가능한지 확인 1.1 먼저 데이터 타입을 변경한 것이 반영되지 않는 것을 해결해본다. 1.1.1 크롤러 옵션 변경을 통해 시도 crawl new folders only -.. 2022.05.01
- AWS Glue 활용기 #3 :: Glue workflow 1. 워크플로 생성 크롤러가 완료되면 글루 잡을 실행 시키는 워크 플로 만들기 워크플로 추가 선택 2. 워크 플로의 이름을 정해준다. 나머지는 빈칸으로 두면 된다. 3. 생성한 워크 플로를 선택한 후 하단의 트리거 추가를 클릭한다. 4. 새로 추가 탭에서 트리거 이름을 정한 후 온디멘드 (수동 실행) 5. 그래프 탭에서 노드 추가를 선택한다. 6. 크롤러 탭에서 크롤러를 선택한다. 7. 그래프 탭에서 작업 -> 트리거 추가를 클릭 8. 이름을 정해주고 트리거 유형은 event, 트리거 로직은 "모든 감시 이후에 시작"을 선택한다. 9. 그래프에 아래와 같이 나타난다. 왼편에 노드 추가를 선택한다. 10. 크롤러 탭에서 크롤러를 선택한 후 추가를 선택한다. 11. 오른 편에 노드 추가를 클릭 12. 작업 .. 2022.03.27
- AWS Glue 활용기 #2 :: Glue job 개발 개요 Spark streaming의 마이크로 배치로 과하게 많이 쌓인 parquet file merge 하는 프로세스 개발 개념 glue 는 spark context 를 래핑하는 glue context 라는 것이 있다. glue job 에서는 아래 dataframe 변환 과정을 거친다. glue dynamic frame 으로 데이터를 가져온다. spark dataframe 으로 변환 후 데이터 처리를 한다. glue dynamic frame 으로 변환하여 저장 스크립트 import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext fro.. 2022.03.27
- AWS Glue 활용기 #1 :: Architect 개요 Spark Streaming 에서 마이크로 배치(https://blair2dev.tistory.com/29)로 쌓이는 parquet 파일들의 갯수가 너무 많아서 daily로 merge 하여 다른 테이블로 일 단위로 적재하는 프로세스 개발 요청이 있었다. AWS 에서 제공하는 glue 를 활용하여 문제를 해결해보기로 하였다. 설계 2.1 daily 스케쥴링으로 글루 크롤러가 마이크로 배치가 쌓이는 stream-source 에서 소스 데이터 카탈로그 테이블을 정의한다. 2.2 크롤러가 완료되면 parquet merge 프로세스가 개발된 Glue job 을 실행시킨다. 2.3 daily merge된 parquet 들이 stream-target s3 로 저장된다. 2.4 글루잡이 완료되면 크롤러가 발동되고.. 2022.03.27
- Kafka Consumer Thread 예제 컨슈머 스레드 예제 코드 컨슈머 스레드인데 general 하게 사용 가능할 것 같아 공유 한다. 파티션 갯수만큼 스레드를 생성 셧다운 훅을 통한 graceful shutdown 구현 OOM 방지를 위한 wait logic 리밸런스 리스너 구현 public static void initCollect () { collectExecutorService = Executors.newCachedThreadPool(); String partitionNum = PropertiesUtil.getProperty("kafka.partitionNum"); int partitionLength = Integer.parseInt(partitionNum); for(String topic :topics) { for (int i = 0.. 2022.02.23
- Spark streaming :: Kafka Dstream 스파크 스트리밍 짧은 주기의 배치 처리를 통해 이전 배치와 다음 배치 사이의 데이터 크기를 최소화 하는 방법 각각의 배치에 생성된 데이터가 하나의 RDD 로 처리됨 일정 주기 마다 새로운 RDD를 읽어 와서 이전에 생성했던 RDD 와 연결해서 필요한 처리를 어플리케이션이 종료될 때까지 무한히 반복한다. 스트리밍컨텍스트 스파크 스트리밍을 사용하기 위해 스트리밍 컨텍스트 인스턴스를 먼저 생성해야 한다. 스파크를 사용할 때 이전 버전의 spark context 와 현 버전의 spark session 을 사용하듯 JavaStreamingContext ssc = new JavaStreamingContext( conf, new Duration(batchDuration)); 또한 SparkConf 객체를 생성 해서.. 2022.02.22
- Kafka lag evaluation 각 서비스의 role Burrow : kafka 로부터 offset등의 데이터를 수집, API response 로 데이터 전달 Telegraf : Burrow 에 request 를 통해서 데이터를 받아, ES 에 적재 (index 설정 ) es: telegraf 로 부터 받은 데이터 적재 grafana : es의 데이터 조회 및 시각화 Grafana Data source 설정 데이터를 받아오기 위해 DATA source 설정 URL : 데이터가 적재된 ES의 주소 Index name : es의 index 이름 (RDB의 테이블 개념) Time field name : 시간정보가 있는 field (column 개념) grafana 데이터 확인 Explorer → datasource : burrow_es_da.. 2022.02.22
- Java 비동기 서버 Trouble Shooting :: Heap MEM, Thread Pool Stress Test 컨슈밍 방법을 earliest 로 바꾸고 15일간 데이터를 한번에 땡겨 오도록 한다. 현상 관찰 카프카 Offset commit with offsets {A.ETM.book-8=OffsetAndMetadata{offset=11364522, leaderEpoch=null, metadata=''}} failed org. apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.error.. 2022.02.21
- java 동시성 제어 2022.02.09
- Spark #2 :: RDD 1. RDD 1. 스파크에서 쓰는 기본 데이터 구조 2. 스파크는 내부에서 처리하는 데이터 들을 모두 RDD 타입으로 한다. 3. RDD 의 특징 1. Immutable 변형 불가 1. 변형 불가이기 때문에 생성과정을 되짚으면 데이터가 꺠져도 복구할 수 있다. 2. 여러 노드에 분산됨 3. 다수의 파티션으로 관리됨 1. 효율적으로 클러스터 노드에 분산됨 4. RDD 생성 1. 외부 (s3, hdfs) 에서 데이터를 로딩할 때 2. 코드에서 생성되는 데이터를 저장할 때 5. RDD 연산 의 두가지 타입 1. Transformation 1. RDD 에서 새로운 RDD 를 생성한다. 1. filter 2. map 2. Action 1. RDD에서 다른 타입의 데이터를 뽑아냄 1. count() 2. colle.. 2022.02.05
- Spark #1 :: Spark 개념, 구조 하둡의 문제점 대용량 일괄 배치 에느 효과적이지만 디스크 기반 이므로 많은 입출력 , 네트워크 트래픽 발생 실시간 데이터 처리에 단점 비동기적으로 발생하는 데이터 처리에 비효율적 반복작업에 약함 하둡 문제점 해결방안 데이터 처리 방식을 disk 방식에서 memory 방식으로 전환 최초 데이터 로드와 최종결과 저장시에만 disk 사용 중간결과는 memory 활용 disk IO -> MR -> disk IO -> MR -> …. disk IO -> MR -> memory -> MR -> memory -> … 인메모리 기반의 데이터 처리 SW -> SPARK Spark SQL SQL 기반으로 Spark Streaming 실시간 데이터 처리 MLlib 머신러닝 처리 GraphX Spark core 스파크의 분산.. 2022.02.05
- Amazon EMR 구축 #2 :: 프로비저닝 아키텍처 구성 Airflow , edge node airflow 를 통해 spark job submit이 가능하도록 따로 인스턴스를 두었다. EMR Cluster master 와 core를 하나씩 두고 task 노드를 spot 으로 하여 처리량에 따라 최소 비용으로 scalble 이 가능하도록 auto scaling 을 설정 Bastion 외부에서 ssh 를 통해 작업이 가능하도록 해준다. NAT gateway private 망의 노드들이 패키지 설치 등을 위해 외부 망에 접근이 필요하므로 outbound 를 허용해준다. EFS 소스 등 파일시스템 공유를 위해 EMR 클러스터 생성 네트워크 인터페이스(eni) 생성 마스터 노드의 IP를 고정하기 위해서 IP 를 홀딩 해두는 목적으로 사용한다. EMR 이 .. 2022.02.01
- Amazon EMR 구축 #1 :: 개념 정리 짤막 history... 팀에서 사용중이던 cloudera 솔루션이 사용 요금을 올리기로 정해지면서 Spark 워크로드를 aws EMR 로 마이그레이션해야 하는 미션이 생겼다. EMR 이란? AWS 는 맵리듀스 를 위한 클라우드 인프라 프로비저닝을 위한 서비스를 만들어 두었고, 그것을 Elastic Map Reduce 라고 이름지었다. 이 EMR 은 사용자의 설정에 따라 scalable 하게 인스턴스를 프로비저닝 하면서 맵리듀스를 사용하기 위한 성능 최적화를 쉽게 이룰 수 있게 해준다. EMR은 하둡 구조의 인프라를 활용하는 대부분의 app 을 사용할 수 있도록 프로비저닝 부트스트랩 단계에서 자동 설치 해준다. EMR 구성 노드 EMR 클러스터를 구축하려면 노드에 대한 개념부터 인지하고 있어야 한다. -.. 2022.02.01
- Java Spring Boot Multi Threading @Async Annotation을 활용하면 비동기 메소드를 손쉽게 작성할 수 있다. @Async 어노테이션이 선언된 메소드는 비동기 메소드로 동작하게 된다. (AOP) @Async 어노테이션이 선언된 메소드는 리턴 타입에 따라 내부적으로 상이하게 동작한다. void : 별도의 쓰레드로 실행되며, 리턴이 없으므로 결과를 기다리지 않고 다음 로직이 수행된다. ThreadPoolTaskExcutor에 의해 자동으로 스레드를 새로 생성해서 비동기 블록 로직을 수행한다. logger.info(“start”) @Async public void t1 (){ logger.info(“start async logic”) Thread.sleep(1000); logger.info(“end async logic”) } logg.. 2022.01.11
- AWS DMS Consistency Checking Application Concept aurora DB 와 s3 의 정합성을 체크 하는 애플리케이션이 필요하다. =============================================================== aws doc에 뭔가 있다… https://docs.aws.amazon.com/ko_kr/dms/latest/userguide/CHAP_Validating.html 정확히 마이그레이션 됐는지 데이터 검증함 데이터 전체 로드한 후 검증 시작 증분 변경 사항이 일어날 때 비교 (그럼 계속하겠다는건데? ) 원본, 타겟 행간 비교 후 불일치 보고 Data validation works with the following databases wherever AWS DMS supports them as source and targe.. 2022.01.11
- Spring Boot JPA #6 POST 이번에는 hits, likes 칼럼을 추가하여 디비에 넣어본다. 위와 같이 @Column 어노테이션을 붙여서 객체를 정의해준다. 컨트롤러에 hits, likes 를 추가한 후 여기서 @RequestBody 는 post request json body를 객체로 매핑해준다. jpaRepository를 상속한 noticeRepository에 요청들어온 데이터를 notice 객체로 매핑하여 저장한다. GET 데이터를 요청하는 GET API 를 받아주는 method를 만들어 보자 먼저 디비생성하고 데이터를 넣어 준다. 데이터가 유지되도록 ddl-auto : none, generate-ddl: true 로 설정을 바꾼다. 위와 같이 메소드 작성 @PathVariable 은 Get 요청 주소에서 받아주는 파.. 2022.01.09
- AWS Code Deploy + Jenkins + GitLab AWS Code Deploy 사용자 -> 젠킨스 -> s3 배포, code deploy 실행 -> codeDeploy 가 배포그룹에 등록된 ec2 에 설치된 에이전트 를 실행 -> codeDeploy 에이전트가 (s3 -> ec2) 복사 진행 -> codeDeploy 에이전트가 appspec.yml 실행 -> appspec.yml 에 정의된 스크립트 실행 -> 배포 완료 Jenkins 아이템 만들기 codeDeploy 는 appspec.yml 을 통해 배포 위치, 실행할 스크립트를 관리한다. 프로젝트 트리의 최상단에 위치 시킨다. 서버에 ${DMSCONSISTENCY_DEPLOY_HOME}/src 이 경로가 존재 해야 함 sudo su - env 에 DMSCONSISTENCY_DEPLOY_HOME 나와야.. 2022.01.04
- Spring Boot JPA #5 h2 디비랑 연결하는 실습 h2?? h2 디비는 서버의 램을 사용하는 인메모리 디비이다. 따라서 서버 어플리케이션을 재시작하면 데이터가 사라진다. application.properties 여기 파일을 application.yml 로 바꾸고 아래 내용 복붙 실서비스 때는 아래 내용으로 바꾸고 쓴다. jpa: hibernate: ddl-auto: none generate-ddl: false https://deep-dive-dev.tistory.com/31 참고 h2는 기본적으로 create-drop이 디폴트이다. application.yml server: port:8083 spring: h2: console: enabled: true path: /h2-console datasource: url: jdbc:h.. 2021.12.12
- AWS DMS :: from aurora mySQL to S3 DMS는 데이터베이스의 데이터를 빠르고 안전하게 aws 로 마이그레이션 할 수 있도록 도와준다. DMS 는 이기종 간 마이그레이션도 지원한다. 여러 소스에서 s3 로 복제하여 데이터레이크 솔루션을 구축할 수도 있다. 여기서는 오로라 디비의 데이터 증분을 지속적으로 복제하는 DMS 를 구축 해본다. DMS 는 네 가지를 생성하면 된다. 복제 인스턴스 복제를 수행할 컴퓨팅 파워이다. 타겟 엔드포인트 데이터를 보낼 장소를 연결 소스 엔드포인트 데이터를 가져올 장소를 연결 태스크 복제 작업을 정의한다. 복제 인스턴스 생성 이름과 인스턴스 타입, 엔진 버전을 선택한다. 엔진은 3.4.1 이상으로 만들어야 s3 파티션 별 복제가 가능하다. 있어야 하는 VPC 를 선택하고 AZ 를선택한다. (운영은 다중 AZ) 엔드.. 2021.12.06
- Spring boot JPA #4 공지사항이 목록 요청처리 API 만들기 lombok은 보통 4가지의 아규먼트를 사용한다. @AllArgsConstructor @NoArgsConstructor @Builder @Data @NoArgsConstructor 이 애노테이션은 파라미터가 없는 생성자를 생성합니다. @AllArgsConstructor 이 애노테이션은 클래스에 존재하는 모든 필드에 대한 생성자를 자동으로 생성해줍니다. @Builder 빌더 패턴을 사용할 수 있게 해준다 빌더패턴 : https://jdm.kr/blog/217 위와 같이 빌더 패턴으로 객체를 만들어 줄 수 있다. 데이터 갯수 리턴 리턴 타입이 int 이던 String 이던 api 에서 반환할 떄는 똑같이 문자열로 반환된다. Post mapping 어노테이션 @Reque.. 2021.11.28
- Spring Boot JPA #3 게시판 - 공지사항 게시판 목록 요청 처리 API 작성 intelij 로 바꿈.. 설정은 이클립스와 같이 프로젝트를 만들어 주면 된다. 이클립스의 경우 메소드 옆에 콩버튼을 누르면 api 요청용 파일이 생성되고 옆에 플레이버튼을 누르면 api 요청 및 응담 폼이 출력된다. 공지사항 데이터 받아줄 모델(객체) 설정한다. lombok 으로 @data annotation 을 넣으면 getter setter 를 굳이 안 작성해도 자동으로 잡아준다. 객체를 리턴하는 메소드를 작성하고 Getmapping 어노테이션을 붙여주면 get 리퀘스트 시 이와 같은 응답을 받을 수 있다. 2021.11.21
- Spring Boot JPA #2 기본 주소 매핑 메소드 first-url 주소로 받고, GET 방식을 쓰는 메소드를 만듬 해당주소로 접속해보면 이처럼 로그인 창이 뜨는데 spring security 를 넣었기 때문이다. 따라서 해당 부분에 대한 수정 필요 spring Security 관련 클래스를 하나 생성하여 모든 리퀘스트에 대한 퍼미션을 주도록 위와같이 설정한다. http://localhost:8083/first-url 요청을 보내면 매핑되는 것을 확일할 수 있다. http://localhost:8083/helloworld string을 리턴하는 api 를 만들기 위해서는 @ResponceBody 라는 어노테이션이 필요하다 기본적으로 스프링 부트는 페이지를 리턴하기 때문에 이에 따른 어노테이션이 필요하다. ——————————————.. 2021.11.21
- Spring Boot JPA #1 환경 구축 eclipse 설치 - 구글 eclipse marketplace - sts 설치 project 생성 - spring boot/spring starter project 프로젝트가 만들어지면 실행을 해본다. 그럼 신기하게도 was 가 자동으로 세팅 및 실행되면 서버가 뜬다. 애플리케이션 실행 시 port 8080 already in use 뜨는 경우 해당프로세스를 죽이거나 -> lsof -i tcp:8080 kill pid 컨피그 수정해서 포트를 바꿈 2021.11.21
- Kafka Lag Monitoring System Kafka 의 lag(프로듀서 offset, 컨슈머 offset 간의 차이) 을 모니터링 하여 이상 감지를 할 수 있는 시스템을 구축하는 법. Kafka - burrow - telegraf - Elasticsearch - Grafana, Kibana 각 서비스의 role Burrow : kafka 로부터 offset등의 데이터를 수집, API response 로 데이터 전달 Telegraf : Burrow 에 request 를 통해서 데이터를 받아, ES 에 적재 (index 설정 ) es: telegraf 로 부터 받은 데이터 적재 grafana : es의 데이터 조회 및 시각화 Zookeeper cluster 설치 # mkdir ~/local # cd ~/local # wget https://down.. 2021.09.26
- 카프카 프로듀서 프로듀서란 ? 카프카 브로커로 보내주는 역할을 하는 것 1. ProducerRecord 객체를 Serializer 를 사용하여 전송에 적합하게 직렬화 한다. 2. 파티셔너가 해당 레코드가 저장될 파티션을 결정 짓는다. 3. 파티션이 결정되면 프로듀서가 저장될 토픽과 레코드를 알게 된다. 4. 레코드가 해당하는 토픽의 파티션에 저장된다. 5. 브로커는 수신된 레코드의 메세지를 처리한 후 응답을 전송한다. 6. 성공하면 RecordMetadata 객체를 반환하고 실패하면 에러를 반환하고, 프로듀서는 리트라이를 설정에 따라하게 된다. 카프카 프로듀서 만들기 ProducerRecord 의 메세지를 2021.09.26
- Kafka 리밸런싱 리스너 동작 test 리밸런스 리스너를 달았을 때 실재로 컨슈머 커밋 및 이어서 컨슈밍이 중복/손실 없이 일어나는지 실제 로그를 보며 테스트 해보았다. 그리고 리밸런스 리스너의 효용성에 대해 정리해보았다. 0. 테스트 환경 및 방법 현 커밋 방식 : 폴링 루프 완료 이후 어싱크 커밋 데이터 프로듀싱 : 퍼포먼스 프로듀싱 툴 사용 ./kafka-producer-perf-test.sh --topic test --record-size 10 --num-records 9999999999 --producer- props bootstrap.server=[브로커]:9092 --throughput 300 그라파나 랙 정보 확인 : http://브로커:3000/d/5nhADrDWR/kafka-lag?orgId=1&refresh=30s 1. 리.. 2021.09.04
- Kafka 성능측정 툴 아래와 같이 카프카 빌트인 툴로 퍼포먼스 TEST 가능 PRODUCER TEST [root@f6fb6ac1b445 bin]# ./kafka-producer-perf-test.sh --topic test --record-size 1000 --num-records 5000 -- producer-props bootstrap.servers=n1:9092,n2:9092,n3:9092 --throughput 100 501 records sent, 100.2 records/sec (0.10 MB/sec), 2.7 ms avg latency, 335.0 ms max latency. 501 records sent, 100.2 records/sec (0.10 MB/sec), 0.8 ms avg latency, 6.0 ms.. 2021.09.04