본문 바로가기
일/kafka

Kafka Consumer Thread 예제

by blair2dev 2022. 2. 23.

컨슈머 스레드 예제 코드 

컨슈머 스레드인데 general 하게 사용 가능할 것 같아 공유 한다. 

  • 파티션 갯수만큼 스레드를 생성
  • 셧다운 훅을 통한 graceful shutdown 구현
  • OOM 방지를 위한 wait logic
  • 리밸런스 리스너 구현
public static void initCollect () {
        collectExecutorService = Executors.newCachedThreadPool();
        String partitionNum = PropertiesUtil.getProperty("kafka.partitionNum");
        int partitionLength = Integer.parseInt(partitionNum);

        for(String topic :topics) {
            for (int i = 0; i < partitionLength; i++) {
                ConsumerThread worker = new ConsumerThread("client" + i, topic, i);
                workerThreads.add(worker);
                collectExecutorService.execute(worker);
                logger.info("client topic "+ topic+" partition " +  i + " thread is launched");
            }
        }
        Runtime.getRuntime().addShutdownHook(new ShutdownThread());
    }

정의 된 컨슈머 스레드를 토픽과 파티션 수 만큼 생성한다. 

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;

public class ConsumerThread implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger ( ConsumerThread.class );

    private String consumerId = null;
    private String topic = null;
    private int partitionnum ;
    private Properties props = null;
    private KafkaConsumer<String, String> consumer = null;
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    private QueueInstance topicQueue = null;


    private class HandleRebalance implements ConsumerRebalanceListener {
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            logger.info("Lost partitions in rebalance. " +
                    "Committing current offsets:" + currentOffsets);
            consumer.commitSync(currentOffsets);
            currentOffsets.clear();
        }
    }

    public ConsumerThread ( String id, String topic, int partitionnum) {
        this.consumerId = id+ " " + topic;
        this.topic = topic;
        this.partitionnum = partitionnum;
        this.props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getProperty("kafka.servers"));
        props.put(ConsumerConfig.GROUP_ID_CONFIG, PropertiesUtil.getProperty("kafka.group.id"));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, PropertiesUtil.getProperty("kafka.offset.reset.config"));
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 600000);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
        topicQueue = QueueInstanceGroup.getInstance().getQueueInstance(topic);

        consumer = new KafkaConsumer<String, String>(this.props);
    }

    @Override
    public void run () {
        int interval =0;
        try {
            consumer.subscribe(Collections.singletonList(this.topic), new HandleRebalance());
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for ( ConsumerRecord<String, String> record : records ) {
                    if (!isTodayRecord(record.value())) continue; //entrytime 오늘 아니면 스킵
                    topicQueue.offer(record.value());
                    currentOffsets.put(
                            new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1, null)
                    );
                }
                consumer.commitAsync(currentOffsets, null);
                
                if (interval >= 1000 ) {
                    logger.info("[collect] topic : " + this.topic + " ::  " + records.count() + " records are committed");
                    if (QueueInstanceGroup.getInstance().getQueueInstance(this.topic).getSize() > 1000000){
                        logger.info("큐에 100만개 이상 누적됨 3초 기다림");
                        Thread.sleep(3000);
                    }
                    interval = 0;
                }
                else interval++;

            }
        } catch (Exception exception) {
            LogUtil.exceptionErrorLog(exception);
            logger.warn("[collect] "+exception);
        } finally {
            try {
                consumer.commitSync(currentOffsets);
            }finally {
                consumer.close();
            }
        }
    }
    public void shutdown () {
        consumer.wakeup();
    }

    public boolean isTodayRecord (String record){
        ObjectMapper mapper = new ObjectMapper();
        JsonNode log;
        try {
            log = mapper.readTree(record);
        }catch (Exception e){
            logger.warn(String.valueOf(e));
            return true; // 파싱 실패 일단 처리  (수정 필요?)
        }
        Instant instant = Instant.ofEpochMilli(Long.parseLong(log.get("entrytime").asText()));
        LocalDateTime ldt = instant.atZone(ZoneId.systemDefault()).toLocalDateTime();
        LocalDateTime now = LocalDateTime.now();
        return ldt.getYear() == now.getYear() && ldt.getDayOfYear() == now.getDayOfYear();
    }
}

 

 

 

로직 플로우

셧다운 훅 클래스 다이어그램  (graceful shutdown)

셧다운 훅 플로우 (graceful shutdown)

' > kafka' 카테고리의 다른 글

Kafka lag evaluation  (0) 2022.02.22
Kafka Lag Monitoring System  (0) 2021.09.26
카프카 프로듀서  (0) 2021.09.26
Kafka 리밸런싱 리스너 동작 test  (0) 2021.09.04
Kafka 성능측정 툴  (0) 2021.09.04