본문 바로가기
일/kafka

Kafka Lag Monitoring System

by blair2dev 2021. 9. 26.

Kafka 의 lag(프로듀서 offset, 컨슈머 offset 간의 차이) 을 모니터링 하여 이상 감지를 할 수 있는 시스템을 구축하는 법.

 

Kafka - burrow - telegraf - Elasticsearch - Grafana, Kibana

System architeture

각 서비스의 role

Burrow : kafka 로부터 offset등의 데이터를 수집, API response 로 데이터 전달

Telegraf : Burrow 에 request 를 통해서 데이터를 받아, ES 에 적재 (index 설정 )

es: telegraf 로 부터 받은 데이터 적재
grafana : es의 데이터 조회 및 시각화

 

 

Zookeeper cluster 설치 

# mkdir ~/local
# cd ~/local
# wget https://downloads.apache.org/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
# tar xvfz apache-zookeeper-3.6.2-bin.tar.gz

Zookeeper Cluster1
# mkdir -p /root/local/lib/zookeeper
# vi ~/local/lib/zookeeper/myid
1
# vi ~/local/apache-zookeeper-3.6.2-bin/conf/zoo.cfg
tickTime=2000
dataDir=/root/local/lib/zookeeper
clientPort=2181
initLimit=20
syncLimit=5
server.1=n1:2777:3777
server.2=n2:2777:3777
server.3=n3:2777:3777

Zookeeper Cluster2
# mkdir -p ~/local/lib/zookeeper
# vi ~/local/lib/zookeeper/myid
2
# vi ~/local/apache-zookeeper-3.6.2-bin/conf/zoo.cfg
tickTime=2000
dataDir=/root/local/lib/zookeeper
clientPort=2181
initLimit=20
syncLimit=5
server.1=n1:2777:3777
server.2=n2:2777:3777
server.3=n3:2777:3777

Zookeeper Cluster3
# mkdir -p ~/local/lib/zookeeper
# vi ~/local/lib/zookeeper/myid
3
# vi ~/local/apache-zookeeper-3.6.2-bin/conf/zoo.cfg
tickTime=2000
dataDir=/root/local/lib/zookeeper
clientPort=2181
initLimit=20
syncLimit=5
server.1=n1:2777:3777
server.2=n2:2777:3777
server.3=n3:2777:3777
# ~/local/apache-zookeeper-3.6.2-bin/bin/zkServer.sh start
# ~/local/apache-zookeeper-3.6.2-bin/bin/zkServer.sh stop

 

Kafka Cluster 설치 

# mkdir ~/local
# cd ~/local
# wget https://downloads.apache.org/kafka/2.6.0/kafka_2.13-2.6.0.tgz
# tar xvfz kafka_2.13-2.6.0.tgz

Kafka Cluster1
# vi ~/local/kafka_2.13-2.6.0/config/server.properties
broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://n1:9092
zookeeper.connect=n1:2182,n2:2182,n3:2182/test
delete.topic.enable = true

Kafka Cluster2
# vi ~/local/kafka_2.13-2.6.0/config/server.properties
broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://n2:9092
zookeeper.connect=n1:2182,n2:2182,n3:2182/test
delete.topic.enable = true

Kafka Cluster3
# vi ~/local/kafka_2.13-2.6.0/config/server.properties
broker.id=2
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://n3:9092
zookeeper.connect=n1:2182,n2:2182,n3:2182/test
delete.topic.enable = true


# ~/local/kafka_2.13-2.6.0/bin/kafka-server-start.sh ~/local/kafka_2.13-2.6.0/config/server.properties &
# ~/local/kafka_2.13-2.6.0/bin/kafka-server-stop.sh
topic
# ~/local/kafka_2.13-2.6.0/bin/kafka-topics.sh --create --bootstrap-server n1:9092,n2:9092,n3:9092 --replication-factor 3 --partitions 3 --topic lab1
topic
# ~/local/kafka_2.13-2.6.0/bin/kafka-topics.sh --list --bootstrap-server n1:9092,n2:9092,n3:9092
# ~/local/kafka_2.13-2.6.0/bin/kafka-console-producer.sh --bootstrap-server n1:9092,n2:9092,n3:9092:9092 --topic lab1
# ~/local/kafka_2.13-2.6.0/bin/kafka-console-consumer.sh --bootstrap-server n1:9092,n2:9092,n3:9092:9092 --topic lab1--from-beginning
# ~/local/kafka_2.13-2.6.0/bin/kafka-console-consumer.sh --bootstrap-server n1:9092,n2:9092,n3:9092:9092 --topic lab1-group testgroup --from-beginning
# ~/local/kafka_2.13-2.6.0/bin/kafka-consumer-groups.sh --bootstrap-server n1:9092,n2:9092,n3:9092:9092 --list
# ~/local/kafka_2.13-2.6.0/bin/kafka-consumer-groups.sh --bootstrap-server n1:9092,n2:9092,n3:9092:9092 --group testgroup --describe
# ~/local/kafka_2.13-2.6.0/bin/kafka-consumer-groups.sh --bootstrap-server n1:9092,n2:9092,n3:9092:9092 --group testgroup --topic lab1--reset-offsets --to-earliest --execute
# ~/local/kafka_2.13-2.6.0/bin/kafka-consumer-groups.sh --bootstrap-server n1:9092,n2:9092,n3:9092:9092 --group testgroup --topic lab1:1 --reset-offsets --to-offset 10 --execute

 

Burrow 설치 

# mkdir ~/local
# cd ~/local


GO
# wget https://golang.org/dl/go1.15.3.linux-amd64.tar.gz
# tar -C /usr/local -xzf go1.15.3.linux-amd64.tar.gz
# vi ~/.bash_profile
PATH=$PATH:/usr/local/go/bin
export PATH
# source ~/.bash_profile
# go version


Burrow
# cd ~/local
# git clone https://github.com/linkedin/Burrow.git
# cd ~/local/Burrow
# go mod tidy
# go install


# cp ~/local/Burrow/config/burrow.toml ~/go/bin/
# vi ~/go/bin/burrow.toml
[general]
pidfile="burrow.pid"
stdout-logfile="burrow.out"
access-control-allow-origin="*"

[logging]
filename="logs/burrow.log"
level="info"
maxsize=100
maxbackups=30
maxage=10
use-localtime=false
use-compression=true

[zookeeper]
servers=[ "n1:2182", "n2:2182", "n3:2182" ]
timeout=6
root-path="/burrow"

[client-profile.test]
client-id="burrow-test"
kafka-version="0.10.0"

[cluster.local]
class-name="kafka"
servers=[ "n1:9092", "n2:9092", "n3:9092" ]
client-profile="test"
topic-refresh=120
offset-refresh=30

[consumer.local]
class-name="kafka"
cluster="local"
servers=[ "n1:9092", "n2:9092", "n3:9092" ]
client-profile="test"
group-denylist="^(console-consumer-|python-kafka-consumer-|quick-).*$"
group-allowlist=""

[consumer.local_zk]
class-name="kafka_zk"
cluster="local"
servers=[ "n1:2182", "n2:2182", "n3:2182" ]
zookeeper-path="/kafka-cluster"
zookeeper-timeout=30
group-denylist="^(console-consumer-|python-kafka-consumer-|quick-).*$"
group-allowlist=""

[httpserver.default]
address=":8000"

[storage.default]
class-name="inmemory"
workers=20
intervals=15
expire-group=604800
min-distance=1

# ~/go/bin/Burrow &
# ~/go/bin/Burrow --config-dir "Conf Path"

Burrow
http://n1:8000/burrow/admin
http://n1:8000/v3/kafka/local

Consumer
http://n1:8000/v3/kafka/local/consumer

topic
http://n1:8000/v3/kafka/local/topic
topic - lab1
http://n1:8000/v3/kafka/local/topic/lab1

 

Telegraf 

# mkdir ~/local
# cd ~/local
# wget https://dl.influxdata.com/telegraf/releases/telegraf-1.16.2-1.x86_64.rpm
# sudo yum localinstall telegraf-1.16.2-1.x86_64.rpm
# vi /etc/telegraf/telegraf.conf

[[inputs.burrow]]
  servers = ["http://n1:8000"]
  topics_exclude = [ "__consumer_offsets" ]
  groups_exclude = ["console-*"]
  
[[outputs.elasticsearch]]
  urls = [ "http://n1:9200", "http://n2:9200" ]
  timeout = "5s"
  enable_sniffer = false
  health_check_interval = "10s"
  index_name = "burrow-%Y.%m.%d"
  manage_template = false
 
telegraf systemctl ///
# systemctl enable telegraf
# systemctl start telegraf
# systemctl stop telegraf
# systemctl status telegraf
systemctl conf path
# vi /etc/systemd/system/multi-user.target.wants/telegraf.service

 

 

 

Grafana 

# mkdir ~/local
# cd ~/local
# wget https://dl.grafana.com/oss/release/grafana-7.3.2-1.x86_64.rpm
# sudo yum install grafana-7.3.2-1.x86_64.rpm

WEB ADMIN
https://github.com/AndersonChoi/kafka-lag-dashboard

grafana systemctl ///
# systemctl enable grafana-server
# systemctl start grafana-server
# systemctl stop grafana-server
# systemctl status grafana-server

 

 

Kibana

Install
Official: https://www.elastic.co/guide/en/kibana/7.9/rpm.html#rpm-repo

# vi /etc/yum.repos.d/kibana.repo
[kibana-7.x]
name=Kibana repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
# sudo yum install kibana

# vi /etc/kibana/kibana.yml
server.port: 5601
server.host: "n1"
elasticsearch.hosts: ["http://n1:9200", "http://n2:9200"]
logging.dest: /var/log/kibana/kibana.log

# systemctl enable kibana
# vi /etc/systemd/system/kibana.service
[Unit]
Description=Kibana
[Service]
Type=simple
ExecStart=/usr/share/kibana/bin/kibana --allow-root -c /etc/kibana/kibana.yml
Restart=on-failure
RestartSec=3
StartLimitBurst=3
StartLimitInterval=60
WorkingDirectory=/

[Install]
WantedBy=multi-user.target
# systemctl daemon-reload

# systemctl start kibana

# netstat -an | grep 5601
Web
http://n1:5601

 

' > kafka' 카테고리의 다른 글

Kafka Consumer Thread 예제  (0) 2022.02.23
Kafka lag evaluation  (0) 2022.02.22
카프카 프로듀서  (0) 2021.09.26
Kafka 리밸런싱 리스너 동작 test  (0) 2021.09.04
Kafka 성능측정 툴  (0) 2021.09.04