일/kafka9 Kafka Consumer Thread 예제 컨슈머 스레드 예제 코드 컨슈머 스레드인데 general 하게 사용 가능할 것 같아 공유 한다. 파티션 갯수만큼 스레드를 생성 셧다운 훅을 통한 graceful shutdown 구현 OOM 방지를 위한 wait logic 리밸런스 리스너 구현 public static void initCollect () { collectExecutorService = Executors.newCachedThreadPool(); String partitionNum = PropertiesUtil.getProperty("kafka.partitionNum"); int partitionLength = Integer.parseInt(partitionNum); for(String topic :topics) { for (int i = 0.. 2022. 2. 23. Kafka lag evaluation 각 서비스의 role Burrow : kafka 로부터 offset등의 데이터를 수집, API response 로 데이터 전달 Telegraf : Burrow 에 request 를 통해서 데이터를 받아, ES 에 적재 (index 설정 ) es: telegraf 로 부터 받은 데이터 적재 grafana : es의 데이터 조회 및 시각화 Grafana Data source 설정 데이터를 받아오기 위해 DATA source 설정 URL : 데이터가 적재된 ES의 주소 Index name : es의 index 이름 (RDB의 테이블 개념) Time field name : 시간정보가 있는 field (column 개념) grafana 데이터 확인 Explorer → datasource : burrow_es_da.. 2022. 2. 22. Kafka Lag Monitoring System Kafka 의 lag(프로듀서 offset, 컨슈머 offset 간의 차이) 을 모니터링 하여 이상 감지를 할 수 있는 시스템을 구축하는 법. Kafka - burrow - telegraf - Elasticsearch - Grafana, Kibana 각 서비스의 role Burrow : kafka 로부터 offset등의 데이터를 수집, API response 로 데이터 전달 Telegraf : Burrow 에 request 를 통해서 데이터를 받아, ES 에 적재 (index 설정 ) es: telegraf 로 부터 받은 데이터 적재 grafana : es의 데이터 조회 및 시각화 Zookeeper cluster 설치 # mkdir ~/local # cd ~/local # wget https://down.. 2021. 9. 26. 카프카 프로듀서 프로듀서란 ? 카프카 브로커로 보내주는 역할을 하는 것 1. ProducerRecord 객체를 Serializer 를 사용하여 전송에 적합하게 직렬화 한다. 2. 파티셔너가 해당 레코드가 저장될 파티션을 결정 짓는다. 3. 파티션이 결정되면 프로듀서가 저장될 토픽과 레코드를 알게 된다. 4. 레코드가 해당하는 토픽의 파티션에 저장된다. 5. 브로커는 수신된 레코드의 메세지를 처리한 후 응답을 전송한다. 6. 성공하면 RecordMetadata 객체를 반환하고 실패하면 에러를 반환하고, 프로듀서는 리트라이를 설정에 따라하게 된다. 카프카 프로듀서 만들기 ProducerRecord 의 메세지를 2021. 9. 26. Kafka 리밸런싱 리스너 동작 test 리밸런스 리스너를 달았을 때 실재로 컨슈머 커밋 및 이어서 컨슈밍이 중복/손실 없이 일어나는지 실제 로그를 보며 테스트 해보았다. 그리고 리밸런스 리스너의 효용성에 대해 정리해보았다. 0. 테스트 환경 및 방법 현 커밋 방식 : 폴링 루프 완료 이후 어싱크 커밋 데이터 프로듀싱 : 퍼포먼스 프로듀싱 툴 사용 ./kafka-producer-perf-test.sh --topic test --record-size 10 --num-records 9999999999 --producer- props bootstrap.server=[브로커]:9092 --throughput 300 그라파나 랙 정보 확인 : http://브로커:3000/d/5nhADrDWR/kafka-lag?orgId=1&refresh=30s 1. 리.. 2021. 9. 4. Kafka 성능측정 툴 아래와 같이 카프카 빌트인 툴로 퍼포먼스 TEST 가능 PRODUCER TEST [root@f6fb6ac1b445 bin]# ./kafka-producer-perf-test.sh --topic test --record-size 1000 --num-records 5000 -- producer-props bootstrap.servers=n1:9092,n2:9092,n3:9092 --throughput 100 501 records sent, 100.2 records/sec (0.10 MB/sec), 2.7 ms avg latency, 335.0 ms max latency. 501 records sent, 100.2 records/sec (0.10 MB/sec), 0.8 ms avg latency, 6.0 ms.. 2021. 9. 4. Kafka consumer 개발 1. Properties 생성 private Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getProperty("kafka.servers")); props.put(ConsumerConfig.GROUP_ID_CONFIG, PropertiesUtil.getProperty("kafka.group.id")); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,.. 2021. 9. 4. Kafka Consumer 개념 :: 컨슈머, 컨슈머그룹, 리밸런싱 카프카는 프로듀서 / 커슈머가 핵심이다. 프로듀서 : 데이터를 카프카에 넣어줌 컨슈머 : 데이터를 카프카에서 가져옴 카프카는 사실 쉽게말하면 걍 버퍼 다. 여기서 컨슈머를 정리해본다. 컨슈머와 컨슈머 그룹 컨슈머에 가장 큰 컨슈머와 컨슈머 그룹을 이해해야 한다. 위딩에서 짐작이 되겠지만 컨슈머들을 묶고 있는 것이 컨슈머 그룹이다. 컨슈머에서 가져가는 병목을 줄이기 위해 여러 컨슈머를 실행해 둘 수 있다. 이 여러 컨슈머들이 가져가는 데이터들의 오프셋을 공유하는 것이 컨슈머 그룹이다. 같은 컨슈머 그룹에 속한 컨슈머들은 오프셋들을 공유하며, 토픽의 파티션을 분배해서 컨슈밍 한다. 아래 그림괴 같이 컨슈머하나는 여러 파티션에 연결되어 컨슈밍 하는 것이 가능하다. 하지만 파티션 입장에서는 각 컨슈머 그룹의 컨.. 2021. 8. 28. kafka 요약 메세지 발행과 구독하기 메세지 발행/구독 시스템에서는 데이터를 발행자가 구독자에게 직접 보내지 않는다. 대신 메세지를 구분해서 발행구독 시스템 에게 보내면 구독자가 특정 메세지를 구독 하게 된다. 발행된 메세지를 저장하고 중계하는 역할을 브로커라고 한다. 초기의 발행 구독 시스템의 발전과정 phase 1 초기 프론트엔드 서버에서 대시보드를 보여주는 메트릭 서버로 직접 데이터를 밀어 주는 방식 단순하기 때문에문제가 없다 phase 2 데시보드를 보여주는 서버 이외에 데이터 분석 서버등 데이터를 수신받기 위한 서버 요구가 추가 된다면 아래와 같이 아키텍쳐가 복잡해지고 운영에 문제가 생긴다. phase 3 모든 어플리케이션의 메트릭을 하나의 애플리케이션이 수신하게 하고 하나의 서버로 제공하면 어떤 시스템에서도.. 2021. 8. 19. 이전 1 다음