Post

Kafka Summary

Kafka

1. Kafka 기본 개념

0) MOM (Message Oriented Middleware)

  • 메세지를 통해 여러 분산되어 있는 시스템간의 Connector 역할을 진행하여, 서로 실시간 비동기식으로 데이터를 교환할 수 있도록 하는 소프트웨어이다.
  • Kafka는 이러한 MOM 소프트웨어 중 하나이며 원천 시스템으로부터 데이터가 발생했을 시 중간에 데이터를 버퍼링하면서 목적지에 안정적으로 전송해주도록 한다.

1) Kafka란?

  • Publish-Subscribe 모델로 구현한 분산 메시징 시스템 (메시징 시스템은 중앙에 메시징 시스템 서버를 두고 메시지를 송수신하는 형태의 통신방식)
  • 실시간으로 스트림을 게시, 구독, 저장 및 처리할 수 있는 분산 데이터 스트리밍 플랫폼
  • Publisher : Producer로 데이터를 보내는 기능
  • Subscriber : Consumer로 메시지를 소비하는 기능

Untitled

2) 왜 Kafka를 쓰는가?

(1) 데이터 유실 방지

  • disk에 적재되기 때문에, 불의의 사고로 서버가 다운되었어도 데이터 유실없이 재시작하여 기존 데이터를 안정적 처리 가능

(2) 병렬처리에 의한 데이터 처리율 향상

  • disk에 순차적으로 데이터를 적재하기 때문에 Random access 방식보다 훨씬 빠르게 데이터 처리

(3) 클러스터링에 의한 고가용성 서비스

  • Scale-out이 가능하여 시스템 확장이 용이. 어떤 하나 혹은 몇개의 서버가 다운되도 서비스 자체가 중단될 일 없이 시스템 운용 가능

3) Kafka의 구성요소

Untitled 1

  • Kafka cluster : 카프카 서버로 이루어진 클러스터를 말함.
    • Broker : 카프카 서버를 의미
      • Controller : 파티션 관리를 책임지는 Broker. 만일, 한 Broker에 장애가 발생하였을 때, 그 Broker에 Leader급 파티션이 존재한다면, 다른 follower파티션들중 하나에게 Leader 역할을 재분배 한다.
      • Coordinator : Consumer Group의 상태를 확인. Consumer Group내의 Consumer에 장애가 발생하여 매칭된 파티션 데이터를 읽지 못할 경우, 해당 파티션 데이터를 정상 작동하는 다른 Consumer에게 매칭시킨다.
    • Topic : 카프카 클러스터에 데이터를 관리할 시 그 기준이 되는 개념. 토픽은 카프카 클러스터에 여러개를 만들 수 있으며 하나의 Topic은 1개 이상의 파티션(Partition)으로 구성
    • Partition : 각 토픽 당 데이터를 분산 처리하는 단위. 카프카 에서는 토픽 안에 파티션을 나누어 그 수대로 데이터를 분산 처리. (replica 수 만큼 파티션이 각 서버들에게 복제된다)
  • Zookeeper : 분산 코디네이션 시스템. 카프카 브로커를 하나의 클러스터로 코디네이팅 하는 역할을 진행. → broker의 메타 데이터 관리, 장애 관리, partition leader 선출, broker leader 선출, 기록 등의 기능 진행 → broker의 장애 발생 시 새 controller를 선출하는 역할을 하며, topic의 파티션 수, 특정 설정 등의 메타데이터 관리를 담당

    Untitled 2

  • Producer : 데이터를 발생시키고 카프카 클러스터(Kafka Cluster)에 적재

    • Option → acks : Topic의 Leader에게 메시지를 보낸 후 요청을 완료하기 전 ack 갯수
    • 옵션의 수가 적으면 성능은 좋으나 메시지 손실률 증가
    • 반대로, 옵션의 수가 많으면 메시지 손실 가능성은 적어지나 성능은 저하
      • acks = 0 : 자신이 보낸 메시지에 대한 ack를 기다리지 않고 전송

      • acks = 1 : 토픽 Leader로 부터 잘 받았는지 확인하고 전송

      • acks = all : 토픽 Leader와 follower로 부터 잘 받았는지 확인하고 전송

  • Consumer Group : 컨슈머의 집합을 구성하는 단위. (group.id 라는 값으로 구분된다) 카프카에서는 컨슈머 그룹으로서 데이터를 처리하며 컨슈머 그룹 내의 컨슈머 수만큼 파티션의 데이터를 분산처리 (참고 : https://www.popit.kr/kafka-consumer-group/)
    • Case 1) Partition = 4, Consumer = 2 - Consumer 하나가 처리해야 할 파티션 갯수가 2개 이상이다 Untitled 3

    • Case 2) Partition = 4. Consumer = 4 (가장 이상적)

      • Consumer와 Partition 갯수가 일치하여 서로 1:1 맵핑 된다

    Untitled 4

    • Case 3) Partition = 4, Consumer = 5
      • Partition갯수 보다 Consumer가 더 많기에, 아무것도 하지 않는 Consumer가 발생한다.

    Untitled 5

3) 파티션 읽기, 쓰기

Untitled 6

Untitled 7

  • Apache Kafka에서의 Read, Write 연산은 카프카 클러스터 내의 Leader 파티션에게만 적용 된다.
  • 하늘색으로 칠해진 파티션이 Leader 파티션이며 이 파티션들에게 Producer가 Write를 진행한다.
  • Write가 진행되고 나면 업데이트 된 데이터는 각 파티션들의 replica에 복사된다.
  • Consumer Group은 서로 관여하지 않고, 본인 Group에 대해서만 처리한다.

Untitled 8

  • 카프카는 데이터를 순차적으로 디스크에 저장
  • 따라서, Producer는 순차적으로 저장된 데이터 뒤에 부이는 append형식으로 write연산을 진행
  • 이 때, 파티션들은 각각의 데이터들의 순차적인 집합인 offset으로 구성
  • Log : Partition의 한 칸. Key, value, timestamp로 구성
  • Offset : Partition의 각 메시지를 식별할 수 있는 유니크 값 (메시지를 소비하는 Consumer가 현재 어느 위치를 읽고 있는지에 대한 위치 값. 0부터 읽음)

2. Kafka QuickStart

1) docker-compose.yml로 kafka 올리기 (Standalone 기준)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
version: "3.9"
services:
  zoo:
    image: zookeeper:latest
    container_name : zoo
    hostname: zoo
    ports:
    - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181
    volumes:
      - zoo_data:/data
      - zoo_datalog:/datalog
			- zoo_logs:/logs
  kafka1:
    image: confluentinc/cp-kafka:latest
    container_name : kafka1
    hostname: kafka1
		restart: always
    links:
      - zoo:zoo
    ports:
      - "9092:9092"
    environment:
		  # PLAINTEXT://{public ip혹은 hostname}(consumer나 producer에서 접속할 ip혹은 도메인):9092 kafka 브로커를 가리키는 사용 가능 주소로 초기연결시에 클라이언트에 전달되는 메타 데이터
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://nifi:8443
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      # 브로커 간 통신에 사용할 리스너를 정의. KAFKA_ADVERTISED_LISTENERS 가 여러개인 경우 꼭 사용해야함
		  KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
		  # 브로커의 메타데이터를 주키퍼에 저장하기 위한 위치. 호스트에 이름을 추가하면 {호스트명}:{포트}로 작성. 주키퍼 여러개일때는 여러개 작성
      KAFKA_ZOOKEEPER_CONNECT: "zoo:2181"
		  # broker.id 에 설정되는 정수값(식별자).
      KAFKA_BROKER_ID: 1
      # log 레벨 설정
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
		  # default 3. cluster 내 broker에 토픽이 분산되어 저장된다. 싱글노드에서 테스트 할 때는 1로 설정
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
		  # 토픽이 몇 개의 파티션으로 생성되는지 설정. 기본 값은 1.
      KAFKA_NUM_PARTITIONS: 1
		  # 토픽 자동생성 설정. false로 지정
		  KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
		  # log 저장 지속시간
      KAFKA_LOG_RETENTION_HOURS: 24
		  # 오프셋을 포함한 토픽 정보가 저장되는 경로. docker volume으로 따로 잡아줘야 한다
		  KAFKA_LOG_DIRS: /var/lib/kafka/data/kafka-logs
    volumes:
      - kafka-data1:/var/lib/kafka/data
      - kafka-secrets1:/etc/kafka/secrets
      - kafka-installed1:/home/appuser
    depends_on:
      - zoo
volumes :
  zoo_data :
    external : true
  zoo_datalog :
    external : true
  zoo_logs:
    external : true
  kafka_data1:
    external : true
  kafka_secrets1:
    external : true
  kafka_installed1:
    external : true

2) kafka download and Execute

1
2
3
4
5
6
7
8
9
10
11
12
$ cd /kafka
$ wget [https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz](https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz)
$ tar -xvzf kafka_2.13-3.2.0.tgz
$ mv kafka_2.13-3.2.0 homekafka
$ cd homekafka

# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

3) Kafka Topic list

1
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Untitled 9

4) Kafka Topic Create

1
2
3
4
5
6
7
8
9
10
$ bin/kafka-topics.sh --create \
--topic quickstart-events \
--bootstrap-server \
localhost:9092
$ bin/kafka-topics.sh --describe \
--topic quickstart-events \
--bootstrap-server localhost:9092
Topic: quickstart-events        TopicId: OpKmM8fXSeGzUG3DCvasMQ PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: quickstart-events        Partition: 0    Leader: 0       Replicas: 0     Isr: 0

Untitled 10

5) Kafka Producer

1
2
3
$ bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic quickstart-events

Untitled 11

6) Kafka Consumer

1
2
3
4
$ bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic quickstart-events \
--group mygroup

Untitled 12

7) IMPORT/EXPORT DATA AS STREAMS OF EVENTS WITH KAFKA CONNECT

1
2
3
4
5
6
7
8
9
10
11
12
$ echo "plugin.path=/home/appuser/homekafka/libs/connect-file-3.2.0.jar" >> config/connect-standalone.properties
$ echo -e "foo\nbar" > test.txt
$ bin/connect-standalone.sh config/connect-standalone.properties \
config/connect-file-source.properties \
config/connect-file-sink.properties
$ ls -l *.txt
-rw-r--r-- 1 root root 8 Jun  1 01:29 test.sink.txt
-rw-r--r-- 1 root root 8 Jun  1 01:26 test.txt

$ more test.sink.txt
foo
bar

Untitled 13

1
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

Untitled 14

1
2
$ echo team >> test.txt
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

Untitled 15

8) PROCESS EVENTS WITH KAFKA STREAMS 데모 : https://kafka.apache.org/documentation/streams/quickstart

WordCountClass 코드 : https://github.com/apache/kafka/blob/3.2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java

(1) plaintext-input, wordcount-ouput topic을 생성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$ bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input
Created topic "streams-plaintext-input".

$ bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output \
    --config cleanup.policy=compact
Created topic "streams-wordcount-output".

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
Topic:streams-wordcount-output	PartitionCount:1	ReplicationFactor:1	Configs:cleanup.policy=compact,segment.bytes=1073741824
	Topic: streams-wordcount-output	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:streams-plaintext-input	PartitionCount:1	ReplicationFactor:1	Configs:segment.bytes=1073741824
	Topic: streams-plaintext-input	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

Untitled 16

(2) WordCountDemo 클래스를 실행시킨 후, producer와 consumer를 실행 ex1) “all streams lead to kafka 문자열 추가

1
2
3
4
5
6
7
8
9
10
11
12
13
$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
# WordCountDemo를 작동시키면 입력한 내용들을 Serialize하여 전송가능한 형태로 만든다
# key.deserializer : key의 자료형을 역직렬화하여 객체형태로 만듦
# value.deserializer : value의 자료형을 역직렬화하여 객체형태로 만듦
1
2
3
4
5
6
7
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
>all streams lead to kafka
all     1
streams 1
lead    1
to      1
kafka   1

Untitled 17

ex2) “hello kafka streams 문자열 추가

1
2
3
4
5
6
7
8
9
10
11
12
13
$ bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic streams-plaintext-input
>all streams lead to kafka
>hello kafka streams
all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2

Untitled 18

ex3) “join kafka summit 문자열 추가

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic streams-plaintext-input
>all streams lead to kafka
>hello kafka streams
>join kafka summit
all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
join    1
kafka   3
summit  1

Untitled 19

3. nifi에서 kafka에 메시지 보낸 후 확인

(모든 Kafka 테스트는 standalone Kafka broker 환경에서 진행하였음)

1) confluentinc/cp-kafka 이미지를 이용하여 docker-compose.yml 작성

참고 : https://hub.docker.com/r/confluentinc/cp-kafka

참고2 : https://twowinsh87.github.io/etc/2019/09/28/etc-kafka2019-2/

참고3 : https://sejoung.github.io/2021/04/2021-04-14-kafka_docker/#카프카-kafka-도커로-설치

  • 카프카 브로커들은 주키퍼 안에 그들을 등록하고, listeners 설정을 사용하면서 서로 커뮤니케이션을 함. 그래서 본인이 세팅한 listeners 설정대로 모든 내부 클러스터 커뮤니케이션이 발생.
  • 그러나, 예를 들어서 내부 네트워크와 외부 IP를 가지는 클라우드에서 본인의 클러스터가 구성된 상황이라면, advertised.listeners(confluent kafka에서는 KAFKA_ADVERTISED_LISTENERS)에 {외부IP}://{외부 전용 포트}를 세팅.
  • ex) 내부 IP가 10.xxx.xx.xx이고 포트가 1111, 외부 IP가 10.101.xx.xx이고 포트가 3032이면 KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://10.xxx.xx.xx:1111, LISTENER_DOCKER_EXTERNAL://10.101.xx.xx:3032
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# nifi, kafka 컨테이너를 모두 올리는 docker-compose
version: "3.9"
services:
  zoo:
    image: zookeeper:latest
    container_name : zoo
    hostname: zoo
    restart: always
    ports:
    - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181
    volumes:
      - zoo_data:/data
      - zoo_datalog:/datalog
			- zoo_logs:/logs
  kafka1:
    image: confluentinc/cp-kafka:latest
    container_name : kafka1
    hostname: kafka1
    restart: always
    links:
      - zoo:zoo
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://nifi:8443
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_LOG_RETENTION_HOURS: 24
			KAFKA_LOG_DIRS: /var/lib/kafka/data/kafka-logs
    volumes:
      - kafka_data1:/var/lib/kafka/data
      - kafka_secrets1:/etc/kafka/secrets
      - kafka_installed1:/home/appuser

    depends_on:
      - zoo
  nifi:
    container_name : nifi
    hostname : nifi
    image : apache/nifi:latest
    ports :
      - "8443:8443"
    environment :
      - NIFI_WEB_HTTP_PORT=8443
      - TZ=Asia/Seoul
    volumes :
      - vol_logs:/opt/nifi/nifi-current/logs
      - vol_state:/opt/nifi/nifi-current/state
      - vol_db:/opt/nifi/nifi-current/database_repository
      - vol_flowfile:/opt/nifi/nifi-current/flowfile_repository
      - vol_content:/opt/nifi/nifi-current/content_repository
      - vol_provenance:/opt/nifi/nifi-current/provenance_repository
      - vol_conf:/opt/nifi/nifi-current/conf

volumes :
  zoo_data :
    external : true
  zoo_datalog :
    external : true
	zoo_logs :
		external : true
  kafka_data1:
    external : true
  kafka_secrets1:
    external : true
  kafka_installed1:
    external : true
  vol_logs:
    external : true
  vol_state:
    external : true
  vol_db:
    external : true
  vol_flowfile:
    external : true
  vol_content:
    external : true
  vol_provenance:
    external : true
  vol_conf:
    external : true

2) nifi container와 kafka를 연결하여 진행

Untitled 20

Untitled 21

Untitled 22

Untitled 23

Untitled 24

1
$ ls /home/nifi

Untitled 25

Untitled 26

3) 220531 이슈

wurstmeister kafka 이미지는 nifi 연결 시에

  • ****No Resolvable Bootstrap Urls” Error in Kafka
  • org.apache.kafka.common.errors.TimeoutException: Topic quickstart-events not present in metadata after 5000 ms**** 등의 에러가 발생. 즉, 연결 자체에 이슈가 있지만 잠시 보류하고 다른 이미지 사용
1
2
$ git clone [https://github.com/wurstmeister/kafka-docker.git](https://github.com/wurstmeister/kafka-docker.git)
$ cd kafka-docker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# 하단 docker-compose.yml은 nifi 연결은 되지 않아서, 다른 image를 이용해 올림
version: "3.9"
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    volumes :
      - zookeeper-conf:/opt/zookeeper-3.4.13/conf
      - zookeeper-data:/opt/zookeeper-3.4.13/data
  kafka:
    build: .
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - kafka-vol:/var/run/docker.sock
      - kafka-dir:/kafka
  nifi:
    container_name : local-nifi
    hostname : local-nifi
    image : apache/nifi:latest
    ports :
      - "8443:8443"
    environment :
      - NIFI_WEB_HTTP_PORT=8443
      - TZ=Asia/Seoul
    volumes :
      - vol_logs:/opt/nifi/nifi-current/logs
      - vol_state:/opt/nifi/nifi-current/state
      - vol_db:/opt/nifi/nifi-current/database_repository
      - vol_flowfile:/opt/nifi/nifi-current/flowfile_repository
      - vol_content:/opt/nifi/nifi-current/content_repository
      - vol_provenance:/opt/nifi/nifi-current/provenance_repository
      - vol_conf:/opt/nifi/nifi-current/conf

volumes :
  vol_logs :
    external : true
  vol_state :
    external : true
  vol_db :
    external : true
  vol_flowfile :
    external : true
  vol_content :
    external : true
  vol_provenance :
    external : true
  vol_conf :
    external : true
  zookeeper-conf :
    external : true
  zookeeper-data :
    external : true
  kafka-dir :
    external : true
  kafka-vol :
    external : true

4. kafka broker 3대 클러스터 셋팅

(kafka1:9092, kafka2:9093, kafka3:9094)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
version: "3.9"
services:
  zoo:
    image: zookeeper:latest
    container_name : zoo
    hostname: zoo
    ports:
    - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181
    volumes:
      - zoo_data:/data
      - zoo_datalog:/datalog
      - zoo_logs:/logs
  kafka1:
    image: confluentinc/cp-kafka:latest
    container_name : kafka1
    hostname: kafka1
    links:
      - zoo:zoo
    ports:
      - "9092:9092"
    environment:
      # PLAINTEXT://{public ip혹은 hostname}(consumer나 producer에서 접속할 ip혹은 도메인):9092 kafka 브로커를 가리키는 사용 가능 주소로 초기연결시에 클라이언트에 전달되는 메타 데이터
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://{docker_host_IP}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      #브로커 간 통신에 사용할 리스너를 정의. KAFKA_ADVERTISED_LISTENERS 가 여러개인 경우 꼭 사용해야함
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      #브로커의 메타데이터를 주키퍼에 저장하기 위한 위치. 호스트에 이름을 추가하면 {호스트명}:{포트}로 작성. 주키퍼 여러개일때는 여러개 작성
      KAFKA_ZOOKEEPER_CONNECT: "zoo:2181"
      #broker.id 에 설정되는 정수값(식별자).
      KAFKA_BROKER_ID: 1
      # log 레벨 설정
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      # default 3. cluster 내 broker에 토픽이 분산되어 저장된다. 싱글노드에서 테스트 할 때는 1로 설정
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      #토픽이 몇 개의 파티션으로 생성되는지 설정. 기본 값은 1.
      KAFKA_NUM_PARTITIONS: 3
      # 토픽 자동생성 설정. false로 지정
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      # log 저장 지속시간
      KAFKA_LOG_RETENTION_HOURS: 24
      #오프셋을 포함한 토픽 정보가 저장되는 경로. docker volume으로 따로 잡아줘야 한다
      KAFKA_LOG_DIRS: /var/lib/kafka/data/kafka-logs
    volumes:
      - kafka_data1:/var/lib/kafka/data
      - kafka_secrets1:/etc/kafka/secrets
      - kafka_installed1:/home/appuser
    depends_on:
      - zoo
  kafka2:
    image: confluentinc/cp-kafka:latest
    container_name : kafka2
    hostname: kafka2
    links:
      - zoo:zoo
    ports:
      - "9093:9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://{docker_host_IP}:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo:2181"
      KAFKA_BROKER_ID: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_LOG_RETENTION_HOURS: 24
      KAFKA_LOG_DIRS: /var/lib/kafka/data/kafka-logs
    volumes:
      - kafka_data2:/var/lib/kafka/data
      - kafka_secrets2:/etc/kafka/secrets
      - kafka_installed2:/home/appuser
    depends_on:
      - zoo

  kafka3:
    image: confluentinc/cp-kafka:latest
    container_name : kafka3
    hostname: kafka3
    links:
      - zoo:zoo
    ports:
      - "9094:9094"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://{docker_host_IP}:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo:2181"
      KAFKA_BROKER_ID: 3
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_LOG_RETENTION_HOURS: 24
      KAFKA_LOG_DIRS: /var/lib/kafka/data/kafka-logs
    volumes:
      - kafka_data3:/var/lib/kafka/data
      - kafka_secrets3:/etc/kafka/secrets
      - kafka_installed3:/home/appuser
    depends_on:
      - zoo

  nifi:
    container_name : nifi
    hostname : nifi
    image : apache/nifi:latest
    ports :
      - "8443:8443"
    environment :
      - NIFI_WEB_HTTP_PORT=8443
      - TZ=Asia/Seoul
    volumes :
      - vol_logs:/opt/nifi/nifi-current/logs
      - vol_state:/opt/nifi/nifi-current/state
      - vol_db:/opt/nifi/nifi-current/database_repository
      - vol_flowfile:/opt/nifi/nifi-current/flowfile_repository
      - vol_content:/opt/nifi/nifi-current/content_repository
      - vol_provenance:/opt/nifi/nifi-current/provenance_repository
      - vol_conf:/opt/nifi/nifi-current/conf

volumes :
  zoo_data :
    external : true
  zoo_datalog :
    external : true
  zoo_logs:
    external : true
  kafka_data1:
    external : true
  kafka_secrets1:
    external : true
  kafka_installed1:
    external : true
  kafka_logs1:
    external : true
  kafka_data2:
    external : true
  kafka_secrets2:
    external : true
  kafka_installed2:
    external : true
  kafka_data3:
    external : true
  kafka_secrets3:
    external : true
  kafka_installed3:
    external : true
  vol_logs:
    external : true
  vol_state:
    external : true
  vol_db:
    external : true
  vol_flowfile:
    external : true
  vol_content:
    external : true
  vol_provenance:
    external : true
  vol_conf:
    external : true
  • partition 3개 설정 에서의 “quickstart-events” Topic (이슈 : KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=3 인데, 복제 본이 1개밖에 없음) (직접 topic을 만들 시, —relplication-factor 옵션을 직접 주어서 만드는 것은 정상 작동)
1
2
3
4
$ bin/kafka-topics.sh --describe \
--topic quickstart-events \
--bootstrap-server \
localhost:9092

Untitled 27

  • Consumer ID와 partition 갯수, CIient ID 확인 가능
1
2
3
4
$ bin/kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 \
--describe \
--group mygroup

Untitled 28

  • mygroup이라는 Consumer Group에 3개의 consumer를 배치 (각 consumer가 각 파티션을 1:1 대응하여 처리)

Untitled 29

Untitled 30

  • mygroup이라는 Consumer Group에 2개의 consumer를 배치 ( 한 consumer가 2개의 파티션을 처리)

Untitled 31

Untitled 32

  • offset 초기화 하고 싶을 땐 아래 명령어
1
2
3
4
$ bin/kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 \
--topic quickstart-events --reset-offsets --to-earliest \
--execute --group mygroup

Untitled 33

Untitled 34

  • replica 3, partition 3을 준 topic의 모습
  • 각 kafka 브로커마다 replica가 일정한 것을 확인
1
2
3
4
5
6
7
8
9
10
$ bin/kafka-topics.sh --create \
--topic replica-events \
--replication-factor 3 \
--partitions 3 \
--bootstrap-server \
localhost:9092

$ bin/kafka-topics.sh --describe \
--topic replica-events \
--bootstrap-server localhost:9092

Untitled 35 1) Topic : 토픽 이름

2) Partition : Topic에 대한 partition 번호. (현재 partition은 3개로 분할되어 있다. partition 0, partiton 1, partition 2)

3) Leader : Partition의 Leader가 위치한 장소. (broker 1,2,3에 각각 위치해 있음)

4) Replica : partition별 replica가 위치한 장소. (broker 1,2,3에 각각 위치해 있음. replica는 partition당 3개씩 되어 있다.)

5) IsR(In Sync Replica) : partition replica들의 group (ex : part0의 replica 멤버 1,2 / part1의 replica 멤버 2,3/ part2의 replica 멤버 3,1) (다만, replicafactor가 3인데, Isr 2라서 재확인 필요)

  • 토픽 삭제 방법
1
2
3
4
5
6
7
8
9
10
11
12
13
# 방법 1
$ bin/kafka-topics.sh --delete \
--topic test-events \
--bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094

# 방법 2 (방법1이 안통할 경우)
# kafka Container 접속
$ /home/appuser/homekafka/config/server.properties
delete.topic.enable = true # 추가
# Zookeeper Container 접속
$ ./zkCli.sh
$ deleteall /brokers/topics/{topic이름}
# 그 후, kafka 재시작

https://jusunglee.tistory.com/entry/토픽-삭제-Topic-delete

https://daddyprogrammer.org/post/12087/apache-kafka-install-by-docker/#Consumer_Group-2

에러

  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP 미 지정시 하단 에러 발생 후 container 종료 java.lang.IllegalArgumentException: Error creating broker listeners from ‘LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://{docker_hostIP}:9092’: No security protocol defined for listener LISTENER_DOCKER_INTERNAL
  • kafka 1,2,3에 대해서 LISTENER_DOCKER_EXTERNAL을 모두 nifi:8443으로 맞추면, 이미 해당 address가 bind되었다고 표기 후 container 종료 Configured end points nifi:8443 in advertised listeners are already registered by broker 1

4. Kafdrop

  • kafka broker 및 topic들의 상태를 UI에서도 확인토록 만든 툴
  • ex) docker-compose-kafka.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
version: '2'

services:

  # wurstmeister/zookeeper:3.4.6
  zookeeper:
    hostname: zookeeper
    container_name: zookeeper
    image: wurstmeister/zookeeper:3.4.6
    ports:
      - "2181:2181"

  # wurstmeister/kafka:2.13-2.8.1 - broker 1
  kafka1:
    hostname: kafka1
    container_name: kafka1
    image: wurstmeister/kafka:2.13-2.8.1
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_HOST_NAME: kafka1
      BOOTSTRAP_SERVERS: kafka1:9092, kafka2:9093, kafka3:9094
      KAFKA_CREATE_TOPICS: "test1:1:1"
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper

  # wurstmeister/kafka:2.13-2.8.1 - broker 2
  kafka2:
    hostname: kafka2
    container_name: kafka2
    image: wurstmeister/kafka:2.13-2.8.1
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_HOST_NAME: kafka2
      BOOTSTRAP_SERVERS: kafka1:9092, kafka2:9093, kafka3:9094
      KAFKA_CREATE_TOPICS: "test2:1:1"
    ports:
      - "9093:9092"
    depends_on:
      - zookeeper

  # wurstmeister/kafka:2.13-2.8.1 - broker 3
  kafka3:
    hostname: kafka3
    container_name: kafka3
    image: wurstmeister/kafka:2.13-2.8.1
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_HOST_NAME: kafka3
      BOOTSTRAP_SERVERS: kafka1:9092, kafka2:9093, kafka3:9094
      KAFKA_CREATE_TOPICS: "test3:1:1"
    ports:
      - "9094:9092"
    depends_on:
      - zookeeper

  # obsidiandynamics/kafdrop
  kafdrop:
    hostname: kafdrop
    container_name: kafdrop
    image: obsidiandynamics/kafdrop
    restart: "always"
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka1:9092, kafka2:9092, kafka3:9092"
      JVM_OPTS: "-Xms32M -Xmx64M"
    depends_on:
      - zookeeper
      - kafka1
      - kafka2
      - kafka3

Untitled 36

  • topic 생성하기

Untitled 37

Untitled 38

→ Topic name : topic 이름

→ Number of partitions : 파티션 갯수 지정

→ Replication factor : 레플리카(복제본) 갯수 지정

Untitled 39

  • topic 삭제하기

Untitled 40

This post is licensed under CC BY 4.0 by the author.