Kafka Summary
Kafka
1. Kafka 기본 개념
0) MOM (Message Oriented Middleware)
- 메세지를 통해 여러 분산되어 있는 시스템간의 Connector 역할을 진행하여, 서로 실시간 비동기식으로 데이터를 교환할 수 있도록 하는 소프트웨어이다.
- Kafka는 이러한 MOM 소프트웨어 중 하나이며 원천 시스템으로부터 데이터가 발생했을 시 중간에 데이터를 버퍼링하면서 목적지에 안정적으로 전송해주도록 한다.
1) Kafka란?
- Publish-Subscribe 모델로 구현한 분산 메시징 시스템 (메시징 시스템은 중앙에 메시징 시스템 서버를 두고 메시지를 송수신하는 형태의 통신방식)
- 실시간으로 스트림을 게시, 구독, 저장 및 처리할 수 있는 분산 데이터 스트리밍 플랫폼
- Publisher : Producer로 데이터를 보내는 기능
- Subscriber : Consumer로 메시지를 소비하는 기능
2) 왜 Kafka를 쓰는가?
(1) 데이터 유실 방지
- disk에 적재되기 때문에, 불의의 사고로 서버가 다운되었어도 데이터 유실없이 재시작하여 기존 데이터를 안정적 처리 가능
(2) 병렬처리에 의한 데이터 처리율 향상
- disk에 순차적으로 데이터를 적재하기 때문에 Random access 방식보다 훨씬 빠르게 데이터 처리
(3) 클러스터링에 의한 고가용성 서비스
- Scale-out이 가능하여 시스템 확장이 용이. 어떤 하나 혹은 몇개의 서버가 다운되도 서비스 자체가 중단될 일 없이 시스템 운용 가능
3) Kafka의 구성요소
- Kafka cluster : 카프카 서버로 이루어진 클러스터를 말함.
- Broker : 카프카 서버를 의미
- Controller : 파티션 관리를 책임지는 Broker. 만일, 한 Broker에 장애가 발생하였을 때, 그 Broker에 Leader급 파티션이 존재한다면, 다른 follower파티션들중 하나에게 Leader 역할을 재분배 한다.
- Coordinator : Consumer Group의 상태를 확인. Consumer Group내의 Consumer에 장애가 발생하여 매칭된 파티션 데이터를 읽지 못할 경우, 해당 파티션 데이터를 정상 작동하는 다른 Consumer에게 매칭시킨다.
- Topic : 카프카 클러스터에 데이터를 관리할 시 그 기준이 되는 개념. 토픽은 카프카 클러스터에 여러개를 만들 수 있으며 하나의 Topic은 1개 이상의 파티션(Partition)으로 구성
- Partition : 각 토픽 당 데이터를 분산 처리하는 단위. 카프카 에서는 토픽 안에 파티션을 나누어 그 수대로 데이터를 분산 처리. (replica 수 만큼 파티션이 각 서버들에게 복제된다)
- Broker : 카프카 서버를 의미
Zookeeper : 분산 코디네이션 시스템. 카프카 브로커를 하나의 클러스터로 코디네이팅 하는 역할을 진행. → broker의 메타 데이터 관리, 장애 관리, partition leader 선출, broker leader 선출, 기록 등의 기능 진행 → broker의 장애 발생 시 새 controller를 선출하는 역할을 하며, topic의 파티션 수, 특정 설정 등의 메타데이터 관리를 담당
Producer : 데이터를 발생시키고 카프카 클러스터(Kafka Cluster)에 적재
- Option → acks : Topic의 Leader에게 메시지를 보낸 후 요청을 완료하기 전 ack 갯수
- 옵션의 수가 적으면 성능은 좋으나 메시지 손실률 증가
- 반대로, 옵션의 수가 많으면 메시지 손실 가능성은 적어지나 성능은 저하
acks = 0 : 자신이 보낸 메시지에 대한 ack를 기다리지 않고 전송
acks = 1 : 토픽 Leader로 부터 잘 받았는지 확인하고 전송
acks = all : 토픽 Leader와 follower로 부터 잘 받았는지 확인하고 전송
- Consumer Group : 컨슈머의 집합을 구성하는 단위. (group.id 라는 값으로 구분된다) 카프카에서는 컨슈머 그룹으로서 데이터를 처리하며 컨슈머 그룹 내의 컨슈머 수만큼 파티션의 데이터를 분산처리 (참고 : https://www.popit.kr/kafka-consumer-group/)
Case 1) Partition = 4, Consumer = 2 - Consumer 하나가 처리해야 할 파티션 갯수가 2개 이상이다
Case 2) Partition = 4. Consumer = 4 (가장 이상적)
- Consumer와 Partition 갯수가 일치하여 서로 1:1 맵핑 된다
- Case 3) Partition = 4, Consumer = 5
- Partition갯수 보다 Consumer가 더 많기에, 아무것도 하지 않는 Consumer가 발생한다.
3) 파티션 읽기, 쓰기
- Apache Kafka에서의 Read, Write 연산은 카프카 클러스터 내의 Leader 파티션에게만 적용 된다.
- 하늘색으로 칠해진 파티션이 Leader 파티션이며 이 파티션들에게 Producer가 Write를 진행한다.
- Write가 진행되고 나면 업데이트 된 데이터는 각 파티션들의 replica에 복사된다.
- Consumer Group은 서로 관여하지 않고, 본인 Group에 대해서만 처리한다.
- 카프카는 데이터를 순차적으로 디스크에 저장
- 따라서, Producer는 순차적으로 저장된 데이터 뒤에 부이는 append형식으로 write연산을 진행
- 이 때, 파티션들은 각각의 데이터들의 순차적인 집합인 offset으로 구성
- Log : Partition의 한 칸. Key, value, timestamp로 구성
- Offset : Partition의 각 메시지를 식별할 수 있는 유니크 값 (메시지를 소비하는 Consumer가 현재 어느 위치를 읽고 있는지에 대한 위치 값. 0부터 읽음)
2. Kafka QuickStart
1) docker-compose.yml로 kafka 올리기 (Standalone 기준)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
version: "3.9"
services:
zoo:
image: zookeeper:latest
container_name : zoo
hostname: zoo
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181
volumes:
- zoo_data:/data
- zoo_datalog:/datalog
- zoo_logs:/logs
kafka1:
image: confluentinc/cp-kafka:latest
container_name : kafka1
hostname: kafka1
restart: always
links:
- zoo:zoo
ports:
- "9092:9092"
environment:
# PLAINTEXT://{public ip혹은 hostname}(consumer나 producer에서 접속할 ip혹은 도메인):9092 kafka 브로커를 가리키는 사용 가능 주소로 초기연결시에 클라이언트에 전달되는 메타 데이터
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://nifi:8443
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
# 브로커 간 통신에 사용할 리스너를 정의. KAFKA_ADVERTISED_LISTENERS 가 여러개인 경우 꼭 사용해야함
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
# 브로커의 메타데이터를 주키퍼에 저장하기 위한 위치. 호스트에 이름을 추가하면 {호스트명}:{포트}로 작성. 주키퍼 여러개일때는 여러개 작성
KAFKA_ZOOKEEPER_CONNECT: "zoo:2181"
# broker.id 에 설정되는 정수값(식별자).
KAFKA_BROKER_ID: 1
# log 레벨 설정
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
# default 3. cluster 내 broker에 토픽이 분산되어 저장된다. 싱글노드에서 테스트 할 때는 1로 설정
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# 토픽이 몇 개의 파티션으로 생성되는지 설정. 기본 값은 1.
KAFKA_NUM_PARTITIONS: 1
# 토픽 자동생성 설정. false로 지정
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
# log 저장 지속시간
KAFKA_LOG_RETENTION_HOURS: 24
# 오프셋을 포함한 토픽 정보가 저장되는 경로. docker volume으로 따로 잡아줘야 한다
KAFKA_LOG_DIRS: /var/lib/kafka/data/kafka-logs
volumes:
- kafka-data1:/var/lib/kafka/data
- kafka-secrets1:/etc/kafka/secrets
- kafka-installed1:/home/appuser
depends_on:
- zoo
volumes :
zoo_data :
external : true
zoo_datalog :
external : true
zoo_logs:
external : true
kafka_data1:
external : true
kafka_secrets1:
external : true
kafka_installed1:
external : true
2) kafka download and Execute
1
2
3
4
5
6
7
8
9
10
11
12
$ cd /kafka
$ wget [https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz](https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz)
$ tar -xvzf kafka_2.13-3.2.0.tgz
$ mv kafka_2.13-3.2.0 homekafka
$ cd homekafka
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
3) Kafka Topic list
1
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
4) Kafka Topic Create
1
2
3
4
5
6
7
8
9
10
$ bin/kafka-topics.sh --create \
--topic quickstart-events \
--bootstrap-server \
localhost:9092
$ bin/kafka-topics.sh --describe \
--topic quickstart-events \
--bootstrap-server localhost:9092
Topic: quickstart-events TopicId: OpKmM8fXSeGzUG3DCvasMQ PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
5) Kafka Producer
1
2
3
$ bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic quickstart-events
6) Kafka Consumer
1
2
3
4
$ bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic quickstart-events \
--group mygroup
7) IMPORT/EXPORT DATA AS STREAMS OF EVENTS WITH KAFKA CONNECT
1
2
3
4
5
6
7
8
9
10
11
12
$ echo "plugin.path=/home/appuser/homekafka/libs/connect-file-3.2.0.jar" >> config/connect-standalone.properties
$ echo -e "foo\nbar" > test.txt
$ bin/connect-standalone.sh config/connect-standalone.properties \
config/connect-file-source.properties \
config/connect-file-sink.properties
$ ls -l *.txt
-rw-r--r-- 1 root root 8 Jun 1 01:29 test.sink.txt
-rw-r--r-- 1 root root 8 Jun 1 01:26 test.txt
$ more test.sink.txt
foo
bar
1
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
1
2
$ echo team >> test.txt
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
8) PROCESS EVENTS WITH KAFKA STREAMS 데모 : https://kafka.apache.org/documentation/streams/quickstart
WordCountClass 코드 : https://github.com/apache/kafka/blob/3.2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
(1) plaintext-input, wordcount-ouput topic을 생성
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$ bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-plaintext-input
Created topic "streams-plaintext-input".
$ bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-wordcount-output \
--config cleanup.policy=compact
Created topic "streams-wordcount-output".
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact,segment.bytes=1073741824
Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824
Topic: streams-plaintext-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0
(2) WordCountDemo 클래스를 실행시킨 후, producer와 consumer를 실행 ex1) “all streams lead to kafka” 문자열 추가
1
2
3
4
5
6
7
8
9
10
11
12
13
$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
# WordCountDemo를 작동시키면 입력한 내용들을 Serialize하여 전송가능한 형태로 만든다
# key.deserializer : key의 자료형을 역직렬화하여 객체형태로 만듦
# value.deserializer : value의 자료형을 역직렬화하여 객체형태로 만듦
1
2
3
4
5
6
7
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
>all streams lead to kafka
all 1
streams 1
lead 1
to 1
kafka 1
ex2) “hello kafka streams” 문자열 추가
1
2
3
4
5
6
7
8
9
10
11
12
13
$ bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic streams-plaintext-input
>all streams lead to kafka
>hello kafka streams
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
ex3) “join kafka summit” 문자열 추가
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic streams-plaintext-input
>all streams lead to kafka
>hello kafka streams
>join kafka summit
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
join 1
kafka 3
summit 1
3. nifi에서 kafka에 메시지 보낸 후 확인
(모든 Kafka 테스트는 standalone Kafka broker 환경에서 진행하였음)
1) confluentinc/cp-kafka 이미지를 이용하여 docker-compose.yml 작성
참고 : https://hub.docker.com/r/confluentinc/cp-kafka
참고2 : https://twowinsh87.github.io/etc/2019/09/28/etc-kafka2019-2/
참고3 : https://sejoung.github.io/2021/04/2021-04-14-kafka_docker/#카프카-kafka-도커로-설치
- 카프카 브로커들은 주키퍼 안에 그들을 등록하고, listeners 설정을 사용하면서 서로 커뮤니케이션을 함. 그래서 본인이 세팅한 listeners 설정대로 모든 내부 클러스터 커뮤니케이션이 발생.
- 그러나, 예를 들어서 내부 네트워크와 외부 IP를 가지는 클라우드에서 본인의 클러스터가 구성된 상황이라면, advertised.listeners(confluent kafka에서는 KAFKA_ADVERTISED_LISTENERS)에 {외부IP}://{외부 전용 포트}를 세팅.
- ex) 내부 IP가 10.xxx.xx.xx이고 포트가 1111, 외부 IP가 10.101.xx.xx이고 포트가 3032이면 KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://10.xxx.xx.xx:1111, LISTENER_DOCKER_EXTERNAL://10.101.xx.xx:3032
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# nifi, kafka 컨테이너를 모두 올리는 docker-compose
version: "3.9"
services:
zoo:
image: zookeeper:latest
container_name : zoo
hostname: zoo
restart: always
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181
volumes:
- zoo_data:/data
- zoo_datalog:/datalog
- zoo_logs:/logs
kafka1:
image: confluentinc/cp-kafka:latest
container_name : kafka1
hostname: kafka1
restart: always
links:
- zoo:zoo
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://nifi:8443
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_LOG_RETENTION_HOURS: 24
KAFKA_LOG_DIRS: /var/lib/kafka/data/kafka-logs
volumes:
- kafka_data1:/var/lib/kafka/data
- kafka_secrets1:/etc/kafka/secrets
- kafka_installed1:/home/appuser
depends_on:
- zoo
nifi:
container_name : nifi
hostname : nifi
image : apache/nifi:latest
ports :
- "8443:8443"
environment :
- NIFI_WEB_HTTP_PORT=8443
- TZ=Asia/Seoul
volumes :
- vol_logs:/opt/nifi/nifi-current/logs
- vol_state:/opt/nifi/nifi-current/state
- vol_db:/opt/nifi/nifi-current/database_repository
- vol_flowfile:/opt/nifi/nifi-current/flowfile_repository
- vol_content:/opt/nifi/nifi-current/content_repository
- vol_provenance:/opt/nifi/nifi-current/provenance_repository
- vol_conf:/opt/nifi/nifi-current/conf
volumes :
zoo_data :
external : true
zoo_datalog :
external : true
zoo_logs :
external : true
kafka_data1:
external : true
kafka_secrets1:
external : true
kafka_installed1:
external : true
vol_logs:
external : true
vol_state:
external : true
vol_db:
external : true
vol_flowfile:
external : true
vol_content:
external : true
vol_provenance:
external : true
vol_conf:
external : true
2) nifi container와 kafka를 연결하여 진행
1
$ ls /home/nifi
3) 220531 이슈
wurstmeister kafka 이미지는 nifi 연결 시에
- ****No Resolvable Bootstrap Urls” Error in Kafka
- org.apache.kafka.common.errors.TimeoutException: Topic quickstart-events not present in metadata after 5000 ms**** 등의 에러가 발생. 즉, 연결 자체에 이슈가 있지만 잠시 보류하고 다른 이미지 사용
1
2
$ git clone [https://github.com/wurstmeister/kafka-docker.git](https://github.com/wurstmeister/kafka-docker.git)
$ cd kafka-docker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# 하단 docker-compose.yml은 nifi 연결은 되지 않아서, 다른 image를 이용해 올림
version: "3.9"
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
volumes :
- zookeeper-conf:/opt/zookeeper-3.4.13/conf
- zookeeper-data:/opt/zookeeper-3.4.13/data
kafka:
build: .
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- kafka-vol:/var/run/docker.sock
- kafka-dir:/kafka
nifi:
container_name : local-nifi
hostname : local-nifi
image : apache/nifi:latest
ports :
- "8443:8443"
environment :
- NIFI_WEB_HTTP_PORT=8443
- TZ=Asia/Seoul
volumes :
- vol_logs:/opt/nifi/nifi-current/logs
- vol_state:/opt/nifi/nifi-current/state
- vol_db:/opt/nifi/nifi-current/database_repository
- vol_flowfile:/opt/nifi/nifi-current/flowfile_repository
- vol_content:/opt/nifi/nifi-current/content_repository
- vol_provenance:/opt/nifi/nifi-current/provenance_repository
- vol_conf:/opt/nifi/nifi-current/conf
volumes :
vol_logs :
external : true
vol_state :
external : true
vol_db :
external : true
vol_flowfile :
external : true
vol_content :
external : true
vol_provenance :
external : true
vol_conf :
external : true
zookeeper-conf :
external : true
zookeeper-data :
external : true
kafka-dir :
external : true
kafka-vol :
external : true
4. kafka broker 3대 클러스터 셋팅
(kafka1:9092, kafka2:9093, kafka3:9094)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
version: "3.9"
services:
zoo:
image: zookeeper:latest
container_name : zoo
hostname: zoo
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181
volumes:
- zoo_data:/data
- zoo_datalog:/datalog
- zoo_logs:/logs
kafka1:
image: confluentinc/cp-kafka:latest
container_name : kafka1
hostname: kafka1
links:
- zoo:zoo
ports:
- "9092:9092"
environment:
# PLAINTEXT://{public ip혹은 hostname}(consumer나 producer에서 접속할 ip혹은 도메인):9092 kafka 브로커를 가리키는 사용 가능 주소로 초기연결시에 클라이언트에 전달되는 메타 데이터
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://{docker_host_IP}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
#브로커 간 통신에 사용할 리스너를 정의. KAFKA_ADVERTISED_LISTENERS 가 여러개인 경우 꼭 사용해야함
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
#브로커의 메타데이터를 주키퍼에 저장하기 위한 위치. 호스트에 이름을 추가하면 {호스트명}:{포트}로 작성. 주키퍼 여러개일때는 여러개 작성
KAFKA_ZOOKEEPER_CONNECT: "zoo:2181"
#broker.id 에 설정되는 정수값(식별자).
KAFKA_BROKER_ID: 1
# log 레벨 설정
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
# default 3. cluster 내 broker에 토픽이 분산되어 저장된다. 싱글노드에서 테스트 할 때는 1로 설정
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
#토픽이 몇 개의 파티션으로 생성되는지 설정. 기본 값은 1.
KAFKA_NUM_PARTITIONS: 3
# 토픽 자동생성 설정. false로 지정
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
# log 저장 지속시간
KAFKA_LOG_RETENTION_HOURS: 24
#오프셋을 포함한 토픽 정보가 저장되는 경로. docker volume으로 따로 잡아줘야 한다
KAFKA_LOG_DIRS: /var/lib/kafka/data/kafka-logs
volumes:
- kafka_data1:/var/lib/kafka/data
- kafka_secrets1:/etc/kafka/secrets
- kafka_installed1:/home/appuser
depends_on:
- zoo
kafka2:
image: confluentinc/cp-kafka:latest
container_name : kafka2
hostname: kafka2
links:
- zoo:zoo
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://{docker_host_IP}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo:2181"
KAFKA_BROKER_ID: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_LOG_RETENTION_HOURS: 24
KAFKA_LOG_DIRS: /var/lib/kafka/data/kafka-logs
volumes:
- kafka_data2:/var/lib/kafka/data
- kafka_secrets2:/etc/kafka/secrets
- kafka_installed2:/home/appuser
depends_on:
- zoo
kafka3:
image: confluentinc/cp-kafka:latest
container_name : kafka3
hostname: kafka3
links:
- zoo:zoo
ports:
- "9094:9094"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://{docker_host_IP}:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo:2181"
KAFKA_BROKER_ID: 3
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_LOG_RETENTION_HOURS: 24
KAFKA_LOG_DIRS: /var/lib/kafka/data/kafka-logs
volumes:
- kafka_data3:/var/lib/kafka/data
- kafka_secrets3:/etc/kafka/secrets
- kafka_installed3:/home/appuser
depends_on:
- zoo
nifi:
container_name : nifi
hostname : nifi
image : apache/nifi:latest
ports :
- "8443:8443"
environment :
- NIFI_WEB_HTTP_PORT=8443
- TZ=Asia/Seoul
volumes :
- vol_logs:/opt/nifi/nifi-current/logs
- vol_state:/opt/nifi/nifi-current/state
- vol_db:/opt/nifi/nifi-current/database_repository
- vol_flowfile:/opt/nifi/nifi-current/flowfile_repository
- vol_content:/opt/nifi/nifi-current/content_repository
- vol_provenance:/opt/nifi/nifi-current/provenance_repository
- vol_conf:/opt/nifi/nifi-current/conf
volumes :
zoo_data :
external : true
zoo_datalog :
external : true
zoo_logs:
external : true
kafka_data1:
external : true
kafka_secrets1:
external : true
kafka_installed1:
external : true
kafka_logs1:
external : true
kafka_data2:
external : true
kafka_secrets2:
external : true
kafka_installed2:
external : true
kafka_data3:
external : true
kafka_secrets3:
external : true
kafka_installed3:
external : true
vol_logs:
external : true
vol_state:
external : true
vol_db:
external : true
vol_flowfile:
external : true
vol_content:
external : true
vol_provenance:
external : true
vol_conf:
external : true
- partition 3개 설정 에서의 “quickstart-events” Topic (이슈 : KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=3 인데, 복제 본이 1개밖에 없음) (직접 topic을 만들 시, —relplication-factor 옵션을 직접 주어서 만드는 것은 정상 작동)
1
2
3
4
$ bin/kafka-topics.sh --describe \
--topic quickstart-events \
--bootstrap-server \
localhost:9092
- Consumer ID와 partition 갯수, CIient ID 확인 가능
1
2
3
4
$ bin/kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 \
--describe \
--group mygroup
- mygroup이라는 Consumer Group에 3개의 consumer를 배치 (각 consumer가 각 파티션을 1:1 대응하여 처리)
- mygroup이라는 Consumer Group에 2개의 consumer를 배치 ( 한 consumer가 2개의 파티션을 처리)
- offset 초기화 하고 싶을 땐 아래 명령어
1
2
3
4
$ bin/kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 \
--topic quickstart-events --reset-offsets --to-earliest \
--execute --group mygroup
- replica 3, partition 3을 준 topic의 모습
- 각 kafka 브로커마다 replica가 일정한 것을 확인
1
2
3
4
5
6
7
8
9
10
$ bin/kafka-topics.sh --create \
--topic replica-events \
--replication-factor 3 \
--partitions 3 \
--bootstrap-server \
localhost:9092
$ bin/kafka-topics.sh --describe \
--topic replica-events \
--bootstrap-server localhost:9092
2) Partition : Topic에 대한 partition 번호. (현재 partition은 3개로 분할되어 있다. partition 0, partiton 1, partition 2)
3) Leader : Partition의 Leader가 위치한 장소. (broker 1,2,3에 각각 위치해 있음)
4) Replica : partition별 replica가 위치한 장소. (broker 1,2,3에 각각 위치해 있음. replica는 partition당 3개씩 되어 있다.)
5) IsR(In Sync Replica) : partition replica들의 group (ex : part0의 replica 멤버 1,2 / part1의 replica 멤버 2,3/ part2의 replica 멤버 3,1) (다만, replicafactor가 3인데, Isr 2라서 재확인 필요)
- 토픽 삭제 방법
1
2
3
4
5
6
7
8
9
10
11
12
13
# 방법 1
$ bin/kafka-topics.sh --delete \
--topic test-events \
--bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094
# 방법 2 (방법1이 안통할 경우)
# kafka Container 접속
$ /home/appuser/homekafka/config/server.properties
delete.topic.enable = true # 추가
# Zookeeper Container 접속
$ ./zkCli.sh
$ deleteall /brokers/topics/{topic이름}
# 그 후, kafka 재시작
https://jusunglee.tistory.com/entry/토픽-삭제-Topic-delete
https://daddyprogrammer.org/post/12087/apache-kafka-install-by-docker/#Consumer_Group-2
에러
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP 미 지정시 하단 에러 발생 후 container 종료 java.lang.IllegalArgumentException: Error creating broker listeners from ‘LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://{docker_hostIP}:9092’: No security protocol defined for listener LISTENER_DOCKER_INTERNAL
- kafka 1,2,3에 대해서 LISTENER_DOCKER_EXTERNAL을 모두 nifi:8443으로 맞추면, 이미 해당 address가 bind되었다고 표기 후 container 종료 Configured end points nifi:8443 in advertised listeners are already registered by broker 1
4. Kafdrop
- kafka broker 및 topic들의 상태를 UI에서도 확인토록 만든 툴
- ex) docker-compose-kafka.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
version: '2'
services:
# wurstmeister/zookeeper:3.4.6
zookeeper:
hostname: zookeeper
container_name: zookeeper
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
# wurstmeister/kafka:2.13-2.8.1 - broker 1
kafka1:
hostname: kafka1
container_name: kafka1
image: wurstmeister/kafka:2.13-2.8.1
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka1
BOOTSTRAP_SERVERS: kafka1:9092, kafka2:9093, kafka3:9094
KAFKA_CREATE_TOPICS: "test1:1:1"
ports:
- "9092:9092"
depends_on:
- zookeeper
# wurstmeister/kafka:2.13-2.8.1 - broker 2
kafka2:
hostname: kafka2
container_name: kafka2
image: wurstmeister/kafka:2.13-2.8.1
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka2
BOOTSTRAP_SERVERS: kafka1:9092, kafka2:9093, kafka3:9094
KAFKA_CREATE_TOPICS: "test2:1:1"
ports:
- "9093:9092"
depends_on:
- zookeeper
# wurstmeister/kafka:2.13-2.8.1 - broker 3
kafka3:
hostname: kafka3
container_name: kafka3
image: wurstmeister/kafka:2.13-2.8.1
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka3
BOOTSTRAP_SERVERS: kafka1:9092, kafka2:9093, kafka3:9094
KAFKA_CREATE_TOPICS: "test3:1:1"
ports:
- "9094:9092"
depends_on:
- zookeeper
# obsidiandynamics/kafdrop
kafdrop:
hostname: kafdrop
container_name: kafdrop
image: obsidiandynamics/kafdrop
restart: "always"
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka1:9092, kafka2:9092, kafka3:9092"
JVM_OPTS: "-Xms32M -Xmx64M"
depends_on:
- zookeeper
- kafka1
- kafka2
- kafka3
- topic 생성하기
→ Topic name : topic 이름
→ Number of partitions : 파티션 갯수 지정
→ Replication factor : 레플리카(복제본) 갯수 지정
- topic 삭제하기