본문 바로가기
일/kafka

Kafka consumer 개발

by blair2dev 2021. 9. 4.

1. Properties 생성 

private Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getProperty("kafka.servers"));
props.put(ConsumerConfig.GROUP_ID_CONFIG, PropertiesUtil.getProperty("kafka.group.id"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, PropertiesUtil.getProperty("kafka.offset.reset.config"));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 600000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
topicQueue = QueueInstanceGroup.getInstance().getQueueInstance(topic);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

프로듀서 객체 생성과 유사하게 속성 객체 (properties) 생성한 후 KafkaConsumer 객체로 전달하여 생성한다. 세게의 필수속정을 알아보자

bootstrap.servers는 서버에 연결하기위한 속성이고 key.deserializer와 value.deserializer에는 역직렬 처리기를 지정한다. 이외에도 컨슈머그룹을 나타내는 group.id 도 필요하다.

 

2. 토픽 구독하기 

consumer.subscribe(Collections.singletonList("customerCountries"));

 

컨슈머를 생성한 다음에는 하나 이상의 토픽을 구독 해야 한다. customerCountries 토픽을 엘리먼트로 갖는 리스트 생성하고 subscribe 메소드로 전달한다.

정규표현식을 매개변수로 전달하여 subscribe()메소드를 호출할 수도 있다.
정규표현식을 쓰면 다수의 토픽이 일치될 수 있고 다수의 토픽들의 데이터를 처리해야하는 애플리케이션 에서 정규 표현식을 쓰면 좋다. 카프카와 다른 시스템간에 데이터를 복제하는 애플리케이션 에서 정규표현식을 사용한 다수의 토픽구독을 가장 많이 사용한다.

 

 

3. 폴링 루프
컨슈머API의 핵심은 서버로 부터 연속적으로 많은 데이터를 읽기 위해 폴링(polling)하는 루프에 있다. 구독 요청이 정상 처리 되면 그다음 폴링루프에서 데이터를 읽는 데 필요한 작업을 처리한다.

 

poll()메서드에서 데이터 읽는 것 외에도 여러 일을 한다

 

-> 새로운 컨슈머에서 최초로 poll() 호출할 떄는 GroupCoordinator 를 찾음

-> 컨슈머그룹에 추가 

-> 해당 컨슈머에 할당된 파티션 내역을 받는다.

-> 이전 버전에서는 하트비트도 날려줬는데 버전 업 되면서 하트비트 스레드가 별도로 생김

 

try {
            consumer.subscribe(Collections.singletonList(this.topic), new HandleRebalance());
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for ( ConsumerRecord<String, String> record : records ) {
                    topicQueue.offer(record.value());
                    currentOffsets.put(
                            new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1, null)
                    );
                }
                consumer.commitAsync(currentOffsets, null);
            }
        } catch (Exception exception) {
            LogUtil.exceptionErrorLog(exception);
            logger.warn("[collect] "+exception);
        } finally {
            try {
                consumer.commitSync(currentOffsets);
            }finally {
                consumer.close();
            }
        }

 

4. 리밸런싱 리스너

컨슈머는 클린업 처리가 필요한 두가지 경우가 있다.

-> 종료되기 전

-> 리밸런싱 전 

 

위 같은경우에 컨슈머가 파티션의 소유권을 잃게 되므로 처리했던 마지막 메세지의 오프셋을 커밋해야 하며, 사용하던 파일핸들, 디비연결 등도 닫아야 한다.

 

ConsumerRebalanceListner 인터페이스를 구현하는 객체를 subscribe()의 인자로 추가한다. 이후 인터페이스의 아래 메소드를 구현하면 된다. 

public void onPartitionsRevoked(Collection<TopicPartition> partitions)

이 메서드는 리밸런싱이 시작되기 전에 컨슈머가 메세지 소비를 중단한 후 호출된다.

이 메서드에서 오프셋을 커밋해야 한다. 그래야 리밸런싱 후 새로운 컨슈머가 어디부터 읽어야 할지 알 수 있다.

public void onPartitionsAssigned(Collection<TopicPartition> partitions)
이 메서드는 파티션이 브로커에 할당된 후에, 컨슈머가 파티션을 새로 할당받아 메세지 소비 시작하기 전에 호출됨

 

    private class HandleRebalance implements ConsumerRebalanceListener {
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            logger.info("Lost partitions in rebalance. " +
                    "Committing current offsets:" + currentOffsets);
            consumer.commitSync(currentOffsets);
            currentOffsets.clear();
        }
    }

 

 

' > kafka' 카테고리의 다른 글

카프카 프로듀서  (0) 2021.09.26
Kafka 리밸런싱 리스너 동작 test  (0) 2021.09.04
Kafka 성능측정 툴  (0) 2021.09.04
Kafka Consumer 개념 :: 컨슈머, 컨슈머그룹, 리밸런싱  (0) 2021.08.28
kafka 요약  (0) 2021.08.19