리밸런스 리스너를 달았을 때 실재로 컨슈머 커밋 및 이어서 컨슈밍이 중복/손실 없이 일어나는지 실제 로그를 보며 테스트 해보았다.
그리고 리밸런스 리스너의 효용성에 대해 정리해보았다.
0. 테스트 환경 및 방법
현 커밋 방식 : 폴링 루프 완료 이후 어싱크 커밋
데이터 프로듀싱 : 퍼포먼스 프로듀싱 툴 사용 ./kafka-producer-perf-test.sh --topic test --record-size 10 --num-records 9999999999 --producer- props bootstrap.server=[브로커]:9092 --throughput 300
그라파나 랙 정보 확인 : http://브로커:3000/d/5nhADrDWR/kafka-lag?orgId=1&refresh=30s
1. 리밸런스 리스너 없음
a. 한 개 스레드가 3개 컨슈밍 → 스레드 두개짜리 프로세스 나중에 조인 : 오프셋이 이어져서 손실/중복이 없다.
오프셋 손실 없이 이어진다. (녹색 부분)
- 브로커에서 리밸런싱 시작 (하트비트 스레드는 알고있음)
- 컨슈머 스레드 최근 폴한거까지 완료
- 커밋
- 다음 폴에서 리밸런싱 알게 되고 모든 파티션의 오프셋 커밋 - 리밸런싱 완료 후 이어서 시작
-> 로그
16:12:31.928 [main] INFO com.tacademy.SimpleConsumer - 313167 16:12:31.957 [main] INFO com.tacademy.SimpleConsumer - 313168
16:12:31.985 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Committing offsets: {testlag- 0=OffsetAndMetadata{offset=313169, leaderEpoch=0, metadata=''}, testlag-1=OffsetAndMetadata{offset=301356, leaderEpoch=0, metadata=''}, testlag-2=OffsetAndMetadata{offset=353377, leaderEpoch=0, metadata=''}}
16:12:31.985 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Not replacing existing epoch 0 with new epoch 0 for partition testlag-0
16:12:31.985 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Not replacing existing epoch 0 with new epoch 0 for partition testlag-1
16:12:31.985 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Not replacing existing epoch 0 with new epoch 0 for partition testlag-2
16:12:31.986 [main] INFO com.tacademy.SimpleConsumer - @@@@@@@@@@@@@@@@@@@ commit!!!
16:12:31.986 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Executing onJoinPrepare with generation 59 and memberId consumer-UBAS_CONSUMER_GROUP001-1-6b1238f9-c9b7-4a06-9e36-d868c2f7de70
16:12:31.986 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Revoke previously assigned partitions testlag-0, testlag-1, testlag-2
16:12:31.986 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Disabling heartbeat thread
16:12:31.986 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] (Re-)joining group
16:12:31.986 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Joining group with current subscription: [testlag]
16:12:31.986 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Sending JoinGroup (JoinGroupRequestData(groupId='UBAS_CONSUMER_GROUP001', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId='consumer-UBAS_CONSUMER_GROUP001-1-6b1238f9-c9b7-4a06-9e36- d868c2f7de70', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 108, 97, 103, -1, -1, -1, -1, 0, 0, 0, 0])])) to coordinator [브로커1]: 9092 (id: 2147483647 rack: null)
16:12:31.987 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Not returning fetched records for partition testlag-0 since it is no longer assigned
16:12:31.987 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Ignoring fetched records for partition testlag-2 since it no longer has valid position
16:12:31.987 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Ignoring fetched records for partition testlag-1 since it no longer has valid position
16:12:31.989 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Committed offset 353377 for partition testlag-2
16:12:31.989 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Committed offset 313169 for partition testlag-0
16:12:31.989 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Committed offset 301356 for partition testlag-1
16:12:31.989 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=60, protocolType='consumer', protocolName='range', leader='consumer-UBAS_CONSUMER_GROUP001-1-6b1238f9-c9b7-4a06-9e36-d868c2f7de70', memberId='consumer-UBAS_CONSUMER_GROUP001-1-6b1238f9-c9b7-4a06-9e36-d868c2f7de70', members= [JoinGroupResponseMember(memberId='consumer-UBAS_CONSUMER_GROUP001-1-72bc9360-4fdf-4804-bbe8- 8fcff4a523fc', groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 108, 97, 103, -1, -1, -1, -1, 0, 0, 0, 0]), JoinGroupResponseMember(memberId='consumer-UBAS_CONSUMER_GROUP001-1-6b1238f9-c9b7-4a06-9e36- d868c2f7de70', groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 108, 97, 103, -1, -1, -1, -1, 0, 0, 0, 0]), JoinGroupResponseMember(memberId='consumer-UBAS_CONSUMER_GROUP001-2-05ebb3d5-65d9-440f-9dca- 1c1d63e7e7e8', groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 108, 97, 103, -1, -1, -1, -1, 0, 0, 0, 0])])
16:12:31.989 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Performing assignment using strategy range with subscriptions {consumer-UBAS_CONSUMER_GROUP001-1-72bc9360-4fdf-4804-bbe8-8fcff4a523fc=org. apache.kafka.clients.consumer.ConsumerPartitionAssignor$Subscription@44c03695, consumer- UBAS_CONSUMER_GROUP001-1-6b1238f9-c9b7-4a06-9e36-d868c2f7de70=org.apache.kafka.clients.consumer. ConsumerPartitionAssignor$Subscription@7e6f74c, consumer-UBAS_CONSUMER_GROUP001-2-05ebb3d5-65d9-440f-9dca- 1c1d63e7e7e8=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Subscription@dd05255}
16:12:31.989 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Finished assignment for group at generation 60: {consumer-UBAS_CONSUMER_GROUP001-1-72bc9360-4fdf-4804-bbe8-8fcff4a523fc=Assignment(partitions=[testlag-1]), consumer-UBAS_CONSUMER_GROUP001-1-6b1238f9-c9b7-4a06-9e36-d868c2f7de70=Assignment(partitions=[testlag-0]), consumer-UBAS_CONSUMER_GROUP001-2-05ebb3d5-65d9-440f-9dca-1c1d63e7e7e8=Assignment(partitions=[testlag-2])}
16:12:31.989 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Sending leader SyncGroup to coordinator [브로커1]:9092 (id: 2147483647 rack: null) at generation Generation{generationId=60, memberId='consumer-UBAS_CONSUMER_GROUP001-1-6b1238f9-c9b7-4a06-9e36-d868c2f7de70', protocol='range'}: SyncGroupRequestData(groupId='UBAS_CONSUMER_GROUP001', generationId=60, memberId='consumer- UBAS_CONSUMER_GROUP001-1-6b1238f9-c9b7-4a06-9e36-d868c2f7de70', groupInstanceId=null, protocolType='consumer', protocolName='range', assignments=[SyncGroupRequestAssignment(memberId='consumer- UBAS_CONSUMER_GROUP001-1-72bc9360-4fdf-4804-bbe8-8fcff4a523fc', assignment=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 108, 97, 103, 0, 0, 0, 1, 0, 0, 0, 1, -1, -1, -1, -1]), SyncGroupRequestAssignment(memberId='consumer- UBAS_CONSUMER_GROUP001-1-6b1238f9-c9b7-4a06-9e36-d868c2f7de70', assignment=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 108, 97, 103, 0, 0, 0, 1, 0, 0, 0, 0, -1, -1, -1, -1]), SyncGroupRequestAssignment(memberId='consumer- UBAS_CONSUMER_GROUP001-2-05ebb3d5-65d9-440f-9dca-1c1d63e7e7e8', assignment=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 108, 97, 103, 0, 0, 0, 1, 0, 0, 0, 2, -1, -1, -1, -1])])
16:12:31.992 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Received successful SyncGroup response: org.apache.kafka.common.requests.SyncGroupResponse@6a78afa0
16:12:31.992 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Successfully joined group with generation 60
16:12:31.992 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Enabling heartbeat thread
16:12:31.992 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Executing onJoinComplete with generation 60 and memberId consumer-UBAS_CONSUMER_GROUP001-1-6b1238f9-c9b7-4a06-9e36-d868c2f7de70
16:12:31.992 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Adding newly assigned partitions: testlag-0
16:12:31.992 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Fetching committed offsets for partitions: [testlag-0]
16:12:31.995 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Not replacing existing epoch 0 with new epoch 0 for partition testlag-0
16:12:31.995 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Setting offset for partition testlag-0 to the committed offset FetchPosition{offset=313169, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional [브로커2]:9092 (id: 1 rack: null)], epoch=0}}
16:12:32.104 [main] DEBUG org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Handling OffsetsForLeaderEpoch response for testlag-0. Got offset 1048110 for epoch 0
16:12:32.105 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Added READ_UNCOMMITTED fetch request for partition testlag-0 at position FetchPosition{offset=313169, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch {leader=Optional[[브로커2]:9092 (id: 1 rack: null)], epoch=0}} to node [브로커2]:9092 (id: 1 rack: null)
16:12:32.105 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Built incremental fetch (sessionId=1975151249, epoch=1) for node 1. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
16:12:32.105 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(testlag-0), toForget=(), implied=()) to broker [브로커2]:9092 (id: 1 rack: null)
16:12:32.290 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Node 0 sent an incremental fetch response with throttleTimeMs = 1 for session 1975151249 with 1 response partition(s)
16:12:32.290 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer- UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Fetch READ_UNCOMMITTED at offset 313169 for partition testlag-0 returned fetch data (error=NONE, highWaterMark=1048110, lastStableOffset = 1048110, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
16:12:32.295 [main] INFO com.tacademy.SimpleConsumer - 313169
16:12:32.323 [main] INFO com.tacademy.SimpleConsumer - 313170
16:12:32.349 [main] INFO com.tacademy.SimpleConsumer - 313171
:31] INFO pool-2-thread-2 @@@@@@@@@@@@@@@@@@@ commit!!!
[2021-05-25 16:12:31] INFO pool-2-thread-1 [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Successfully joined group with generation 60
[2021-05-25 16:12:31] INFO pool-2-thread-2 [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-2, groupId=UBAS_CONSUMER_GROUP001] Successfully joined group with generation 60
[2021-05-25 16:12:31] INFO pool-2-thread-2 [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-2, groupId=UBAS_CONSUMER_GROUP001] Adding newly assigned partitions: testlag-2
[2021-05-25 16:12:31] INFO pool-2-thread-1 [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Adding newly assigned partitions: testlag-1
[2021-05-25 16:12:32] INFO pool-2-thread-1 [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Setting offset for partition testlag-1 to the committed offset FetchPosition{offset=30 1356, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=[브로커3]:9092 (id: 2 rack: null), epoch=0}}
[2021-05-25 16:12:32] INFO pool-2-thread-2 [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-2, groupId=UBAS_CONSUMER_GROUP001] Setting offset for partition testlag-2 to the committed offset FetchPosition{offset=35 3377, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=[브로커1]:9092 (id: 0 rack: null), epoch=0}}
b. 컨슈머 세개 돌리다가 컨슈머 두개짜리 프로세스 죽이는 경우
이 경우에는 오프셋 이 중복처리 될 듯 함 오프셋이 이어지지 않는다.
-> 로그
18:07:52.736 [kafka-coordinator-heartbeat-thread | UBAS_CONSUMER_GROUP001] DEBUG org.apache.kafka.clients. consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Committed offset 463606 for partition testlag-2
18:07:52.736 [kafka-coordinator-heartbeat-thread | UBAS_CONSUMER_GROUP001] DEBUG org.apache.kafka.clients. consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Committed offset 439694 for partition testlag-0
18:07:52.736 [kafka-coordinator-heartbeat-thread | UBAS_CONSUMER_GROUP001] DEBUG org.apache.kafka.clients. consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Committed offset 394214 for partition testlag-1
[2021-05-25 18:06:03] INFO pool-2-thread-2 464008 [2021-05-25 18:06:03] INFO pool-2-thread-2 500 [2021-05-25 18:06:03] INFO pool-2-thread-1 394609 [2021-05-25 18:06:03] INFO pool-2-thread-1 500
18:07:52.736 [kafka-coordinator-heartbeat-thread | UBAS_CONSUMER_GROUP001] DEBUG org.apache.kafka.clients. consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Committed offset 463606 for partition testlag-2
18:07:52.736 [kafka-coordinator-heartbeat-thread | UBAS_CONSUMER_GROUP001] DEBUG org.apache.kafka.clients. consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Committed offset 439694 for partition testlag-0
18:07:52.736 [kafka-coordinator-heartbeat-thread | UBAS_CONSUMER_GROUP001] DEBUG org.apache.kafka.clients. consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Committed offset 394214 for partition testlag-1
[2021-05-25 18:06:03] INFO pool-2-thread-2 464008 [2021-05-25 18:06:03] INFO pool-2-thread-2 500 [2021-05-25 18:06:03] INFO pool-2-thread-1 394609 [2021-05-25 18:06:03] INFO pool-2-thread-1 500
2. 리밸런스 리스너 구현
- 아래 블로그를 참고하였음 리밸런싱 때 currentoffset 초기화를 안해주면 변경 전 할당되어있던 파티션에 커밋 시도 하므로 리밸런싱 시에 초기화를 꼭해야 한다.
https://blog.voidmainvoid.net/264?category=698302
- 리스너를 구현한 경우 배치 끝나고 커밋 한 후 컨슈머 스레드가 다음 폴 때 리밸런싱 중인 것을 확인, 리밸런스 리스너가 실행된다 → 컨슈머 조인, 이탈, 파티션 증가 시에도 위와 동일한 결과를 보여준다.
-> 로그
[2021-05-27 13:27:52] INFO main 3787205
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Committing offsets: {testlag-2=OffsetAndMetadata{offset=3787206, leaderEpoch=null, metadata=''}}
[2021-05-27 13:27:52] INFO main @@@@@@@@@@@@@@@@@@@ commit!!!
[2021-05-27 13:27:52] DEBUG ead | UBAS_CONSUMER_GROUP001 [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-
1, groupId=UBAS_CONSUMER_GROUP001] Committed offset 3787206 for partition testlag-2
[2021-05-27 13:27:52] DEBUG ead | UBAS_CONSUMER_GROUP001 [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001- 1, groupId=UBAS_CONSUMER_GROUP001] Sending Heartbeat request with generation 230 and member id consumer- UBAS_CONSUMER_GROUP001-1-ba787a73-02fd-4e93-b59a-0ef7accd5d83 to coordinator [브로커1]:9092 (id: 2147483647 rack: null)
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Executing onJoinPrepare with generation 230 and memberId consumer- UBAS_CONSUMER_GROUP001-1-ba787a73-02fd-4e93-b59a-0ef7accd5d83
[2021-05-27 13:27:52] INFO main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Revoke previously assigned partitions testlag-2
Lost partitions in rebalance. Committing current offsets:{testlag-2=OffsetAndMetadata{offset=3787206, leaderEpoch=null, metadata=''}}
[2021-05-27 13:27:52] INFO main ########################## 리스너 발동 현재 오프셋 커밋 ################ {testlag-2=OffsetAndMetadata{offset=3787206, leaderEpoch=null, metadata=''}}
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Disabling heartbeat thread
[2021-05-27 13:27:52] INFO main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] (Re-)joining group
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Joining group with current subscription: [testlag]
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Sending JoinGroup (JoinGroupRequestData(groupId='UBAS_CONSUMER_GROUP001', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId='consumer-UBAS_CONSUMER_GROUP001-1-ba787a73-02fd- 4e93-b59a-0ef7accd5d83', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 108, 97, 103, -1, -1, -1, -1, 0, 0, 0, 0])])) to coordinator [브로커1]:9092 (id: 2147483647 rack: null)
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Not returning fetched records for partition testlag-2 since it is no longer assigned
[2021-05-27 13:27:52] INFO main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Attempt to heartbeat failed since group is rebalancing
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=231, protocolType='consumer', protocolName='range', leader='consumer-UBAS_CONSUMER_GROUP001- 1-ba787a73-02fd-4e93-b59a-0ef7accd5d83', memberId='consumer-UBAS_CONSUMER_GROUP001-1-ba787a73-02fd-4e93-b59a- 0ef7accd5d83', members=[JoinGroupResponseMember(memberId='consumer-UBAS_CONSUMER_GROUP001-1-ba787a73-02fd- 4e93-b59a-0ef7accd5d83', groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 108, 97, 103, -1, -1, -1, -1, 0, 0, 0, 0])])
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Performing assignment using strategy range with subscriptions {consumer- UBAS_CONSUMER_GROUP001-1-ba787a73-02fd-4e93-b59a-0ef7accd5d83=org.apache.kafka.clients.consumer. ConsumerPartitionAssignor$Subscription@4da4253}
[2021-05-27 13:27:52] INFO main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Finished assignment for group at generation 231: {consumer- UBAS_CONSUMER_GROUP001-1-ba787a73-02fd-4e93-b59a-0ef7accd5d83=Assignment(partitions=[testlag-0, testlag-1, testlag-2, testlag-3])}
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Sending leader SyncGroup to coordinator [브로커1]:9092 (id: 2147483647 rack: null) at generation Generation{generationId=231, memberId='consumer-UBAS_CONSUMER_GROUP001-1-ba787a73-02fd-4e93-b59a- 0ef7accd5d83', protocol='range'}: SyncGroupRequestData(groupId='UBAS_CONSUMER_GROUP001', generationId=231, memberId='consumer-UBAS_CONSUMER_GROUP001-1-ba787a73-02fd-4e93-b59a-0ef7accd5d83', groupInstanceId=null, protocolType='consumer', protocolName='range', assignments=[SyncGroupRequestAssignment(memberId='consumer- UBAS_CONSUMER_GROUP001-1-ba787a73-02fd-4e93-b59a-0ef7accd5d83', assignment=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 108, 97, 103, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, -1, -1, -1, -1])])
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Received successful SyncGroup response: org.apache.kafka.common.requests. SyncGroupResponse@3972a855
[2021-05-27 13:27:52] INFO main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Successfully joined group with generation 231
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Enabling heartbeat thread
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Executing onJoinComplete with generation 231 and memberId consumer- UBAS_CONSUMER_GROUP001-1-ba787a73-02fd-4e93-b59a-0ef7accd5d83
[2021-05-27 13:27:52] INFO main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Adding newly assigned partitions: testlag-0, testlag-1, testlag-2, testlag-3
[2021-05-27 13:27:52] INFO main ########################## 리스너 발동 파티션 어싸인 ################{}
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Fetching committed offsets for partitions: [testlag-0, testlag-1, testlag-2, testlag-3]
[2021-05-27 13:27:52] INFO main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Setting offset for partition testlag-2 to the committed offset FetchPosition{offset=3787206, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[[브로커1]:9092 (id: 0 rack: null)], epoch=0}}
[2021-05-27 13:27:52] INFO main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Setting offset for partition testlag-3 to the committed offset FetchPosition{offset=57249, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[[브로커2]:9092 (id: 1 rack: null)], epoch=0}}
[2021-05-27 13:27:52] INFO main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Setting offset for partition testlag-0 to the committed offset FetchPosition{offset=3756455, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[[브로커2]:9092 (id: 1 rack: null)], epoch=0}}
[2021-05-27 13:27:52] INFO main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Setting offset for partition testlag-1 to the committed offset FetchPosition{offset=3684522, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[[브로커3]:9092 (id: 2 rack: null)], epoch=0}}
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Added READ_UNCOMMITTED fetch request for partition testlag-2 at position FetchPosition {offset=3787206, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[[브로커1]:9092 (id: 0 rack: null)], epoch=0}} to node [브로커1]:9092 (id: 0 rack: null)
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Added READ_UNCOMMITTED fetch request for partition testlag-3 at position FetchPosition {offset=57249, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[[브로커2]:9092 (id: 1 rack: null)], epoch=0}} to node [브로커2]:9092 (id: 1 rack: null)
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Added READ_UNCOMMITTED fetch request for partition testlag-0 at position FetchPosition {offset=3756455, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[[브로커2]:9092 (id: 1 rack: null)], epoch=0}} to node[브로커2]:9092 (id: 1 rack: null)
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Added READ_UNCOMMITTED fetch request for partition testlag-1 at position FetchPosition {offset=3684522, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[[브로커3]:9092 (id: 2 rack: null)], epoch=0}} to node [브로커3]:9092 (id: 2 rack: null)
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Built incremental fetch (sessionId=1417316140, epoch=2) for node 0. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Built incremental fetch (sessionId=1151513355, epoch=1) for node 1. Added 0 partition(s), altered 2 partition(s), removed 0 partition(s) out of 2 partition(s)
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Built incremental fetch (sessionId=2079003617, epoch=1) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(testlag-2), toForget=(), implied=()) to broker [브로커1]:9092 (id: 0 rack: null)
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(testlag-3, testlag-0), toForget=(), implied=()) to broker [브로커2]:9092 (id: 1 rack: null)
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(testlag-1), toForget=(), implied=()) to broker [브로커3]:9092 (id: 2 rack: null)
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Node 0 sent an incremental fetch response with throttleTimeMs = 2 for session 2079003617 with 1 response partition(s)
[2021-05-27 13:27:52] DEBUG main [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001-1, groupId=UBAS_CONSUMER_GROUP001] Fetch READ_UNCOMMITTED at offset 3684522 for partition testlag-1 returned fetch data (error=NONE, highWaterMark=4055088, lastStableOffset = 4055088, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
[2021-05-27 13:27:52] INFO main 3684522
[2021-05-27 13:27:52] DEBUG ead | UBAS_CONSUMER_GROUP001 [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001- 1, groupId=UBAS_CONSUMER_GROUP001] Node 0 sent an incremental fetch response with throttleTimeMs = 0 for session 1417316140 with 1 response partition(s)
[2021-05-27 13:27:52] DEBUG ead | UBAS_CONSUMER_GROUP001 [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001- 1, groupId=UBAS_CONSUMER_GROUP001] Fetch READ_UNCOMMITTED at offset 3787206 for partition testlag-2 returned fetch data (error=NONE, highWaterMark=4063859, lastStableOffset = 4063859, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
[2021-05-27 13:27:53] DEBUG ead | UBAS_CONSUMER_GROUP001 [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001- 1, groupId=UBAS_CONSUMER_GROUP001] Node 0 sent an incremental fetch response with throttleTimeMs = 1 for session 1151513355 with 2 response partition(s)
[2021-05-27 13:27:53] DEBUG ead | UBAS_CONSUMER_GROUP001 [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001- 1, groupId=UBAS_CONSUMER_GROUP001] Fetch READ_UNCOMMITTED at offset 57249 for partition testlag-3 returned fetch data (error=NONE, highWaterMark=192088, lastStableOffset = 192088, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
[2021-05-27 13:27:53] DEBUG ead | UBAS_CONSUMER_GROUP001 [Consumer clientId=consumer-UBAS_CONSUMER_GROUP001- 1, groupId=UBAS_CONSUMER_GROUP001] Fetch READ_UNCOMMITTED at offset 3756455 for partition testlag-0 returned fetch data (error=NONE, highWaterMark=4057267, lastStableOffset = 4057267, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
[2021-05-27 13:27:53] INFO main 3684523
3. 정리
a. 리밸런스 리스너를 달아주면 현 배치 프로세스 안끝내고 현재 오프셋으로 커밋하고 리밸런싱 바로 해주나???
-> 바로 안해주면 리스너 구현 의미가 없어보임.(컨슈머 추가 연결 및 해제 경우에) 리스너가 없어도 배치 다 기다린 다음에 오프셋 커밋하고 리밸 런싱 시작 하기 때문에 오프셋 연결된다.
-> - 테스트 결과 바로 안해줌 현재의 가져온 배치 전부 처리되고 커밋할 때까지 다른 컨슈머들이 대기 타다가 커밋할 때 연결되어서 컨슈밍 시 작함.
- 리밸런싱 리스너는 잘 작동한다. 순서) 데이터 전부 처리 -> 정상 커밋 -> 리밸런싱 시작 직전 리스너 커밋
b. 프로세스 죽이는 경우에 오프셋 연결?
-> 이 경우는 wakeup() 으로 익셉션 나올 때 현재의 오프셋 커밋을 하던가 해야 할 거 같음.
-> 죽이는 경우에도 현재의 배치 모두 완수 후에 정상 커밋 하고 리밸런싱 시작함
-> wakeup exception 활용 방법은 SIGTERM 일때 사용 가능함. intellij 상에서 종료버튼은 SIGTERM -> SIGKILL (기존은 잠깐사이에 wakeup exception 처리되는 거 였음)
-> 실 서버 운영 시에는 kill -term PID 활용해야 함. SIGTERM 일 때 wakeup exception 발생하고, 현재 오프셋 커밋을바로 해줘여 중복이 안일 어 나는지 test가 필요하다
c. 리밸런싱 리스너 구현의 의미?
- 컨슈머 in, out 시에는 의미 없음 (어차피 배치 다 돌고 poll 루프의 커밋이 해줘서 유실/ 중복 없음)\ - 새로운 파티션 추가/변경의 경우 -> 위와 같음
- 컨슈머 SIGKILL 시에도 익셉션 받아서 현재 오프셋 커밋 불가하므로 의미 없음
- 만약 개발을 배치 이후에 하지않고 오토커밋할 경우 오토 커밋 주기보다 빠르게 해줄 수 있음 (커밋 전에 폴을 할 수 있으므로)
- 오프셋을 자체 디비에서 관리하는 경우는 반드시 해야 함
- SIGTERM 에서는 의미 있을 가능성이 있음 (리눅스에서 TEST가 필요함)
- Async 의 경우 커밋이 실패 하는 경우에도 다음 폴을 수행하므로 오프셋 커밋이 안된 채로 리밸런싱이 시작될 수 있음 따라서 Async 의 경우는 필히 리밸런스 리스너가 필요
'일 > kafka' 카테고리의 다른 글
Kafka Lag Monitoring System (0) | 2021.09.26 |
---|---|
카프카 프로듀서 (0) | 2021.09.26 |
Kafka 성능측정 툴 (0) | 2021.09.04 |
Kafka consumer 개발 (0) | 2021.09.04 |
Kafka Consumer 개념 :: 컨슈머, 컨슈머그룹, 리밸런싱 (0) | 2021.08.28 |