컨슈머 스레드 예제 코드
컨슈머 스레드인데 general 하게 사용 가능할 것 같아 공유 한다.
- 파티션 갯수만큼 스레드를 생성
- 셧다운 훅을 통한 graceful shutdown 구현
- OOM 방지를 위한 wait logic
- 리밸런스 리스너 구현
public static void initCollect () {
collectExecutorService = Executors.newCachedThreadPool();
String partitionNum = PropertiesUtil.getProperty("kafka.partitionNum");
int partitionLength = Integer.parseInt(partitionNum);
for(String topic :topics) {
for (int i = 0; i < partitionLength; i++) {
ConsumerThread worker = new ConsumerThread("client" + i, topic, i);
workerThreads.add(worker);
collectExecutorService.execute(worker);
logger.info("client topic "+ topic+" partition " + i + " thread is launched");
}
}
Runtime.getRuntime().addShutdownHook(new ShutdownThread());
}
정의 된 컨슈머 스레드를 토픽과 파티션 수 만큼 생성한다.
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
public class ConsumerThread implements Runnable {
private static final Logger logger = LoggerFactory.getLogger ( ConsumerThread.class );
private String consumerId = null;
private String topic = null;
private int partitionnum ;
private Properties props = null;
private KafkaConsumer<String, String> consumer = null;
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
private QueueInstance topicQueue = null;
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.info("Lost partitions in rebalance. " +
"Committing current offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
}
public ConsumerThread ( String id, String topic, int partitionnum) {
this.consumerId = id+ " " + topic;
this.topic = topic;
this.partitionnum = partitionnum;
this.props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getProperty("kafka.servers"));
props.put(ConsumerConfig.GROUP_ID_CONFIG, PropertiesUtil.getProperty("kafka.group.id"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, PropertiesUtil.getProperty("kafka.offset.reset.config"));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 600000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
topicQueue = QueueInstanceGroup.getInstance().getQueueInstance(topic);
consumer = new KafkaConsumer<String, String>(this.props);
}
@Override
public void run () {
int interval =0;
try {
consumer.subscribe(Collections.singletonList(this.topic), new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for ( ConsumerRecord<String, String> record : records ) {
if (!isTodayRecord(record.value())) continue; //entrytime 오늘 아니면 스킵
topicQueue.offer(record.value());
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, null)
);
}
consumer.commitAsync(currentOffsets, null);
if (interval >= 1000 ) {
logger.info("[collect] topic : " + this.topic + " :: " + records.count() + " records are committed");
if (QueueInstanceGroup.getInstance().getQueueInstance(this.topic).getSize() > 1000000){
logger.info("큐에 100만개 이상 누적됨 3초 기다림");
Thread.sleep(3000);
}
interval = 0;
}
else interval++;
}
} catch (Exception exception) {
LogUtil.exceptionErrorLog(exception);
logger.warn("[collect] "+exception);
} finally {
try {
consumer.commitSync(currentOffsets);
}finally {
consumer.close();
}
}
}
public void shutdown () {
consumer.wakeup();
}
public boolean isTodayRecord (String record){
ObjectMapper mapper = new ObjectMapper();
JsonNode log;
try {
log = mapper.readTree(record);
}catch (Exception e){
logger.warn(String.valueOf(e));
return true; // 파싱 실패 일단 처리 (수정 필요?)
}
Instant instant = Instant.ofEpochMilli(Long.parseLong(log.get("entrytime").asText()));
LocalDateTime ldt = instant.atZone(ZoneId.systemDefault()).toLocalDateTime();
LocalDateTime now = LocalDateTime.now();
return ldt.getYear() == now.getYear() && ldt.getDayOfYear() == now.getDayOfYear();
}
}
로직 플로우
셧다운 훅 클래스 다이어그램 (graceful shutdown)
셧다운 훅 플로우 (graceful shutdown)
'일 > kafka' 카테고리의 다른 글
Kafka lag evaluation (0) | 2022.02.22 |
---|---|
Kafka Lag Monitoring System (0) | 2021.09.26 |
카프카 프로듀서 (0) | 2021.09.26 |
Kafka 리밸런싱 리스너 동작 test (0) | 2021.09.04 |
Kafka 성능측정 툴 (0) | 2021.09.04 |