Kafka

kafka-console-consumer 在 Docker 中沒有收到消息

  • May 1, 2020

Kafka 消費者工具未連接到 Docker 中的集群

命令

docker-compose -f docker-compose.yaml up --force-recreate --remove-orphans 

Docker Compose 命令調出 3 個虛擬機:zookeeper, kafka, kafka-create-topics. 第三個 VM 生成一個新主題topic並將消息推送到該主題並嘗試使用。前兩個命令似乎可以工作並且沒有發出任何錯誤。然而kafka-console-consumer失敗了,似乎不管使用什麼配置參數

  • cub kafka-ready- 成功
  • kafka-topics- 成功
  • kafka-console-producer- 成功
  • kafka-console-consumer- 每次都失敗

錯誤

kafka-create-topics_1  | [2019-10-07 21:03:00,556] WARN [Consumer clientId=consumer-1, groupId=console-consumer-58118] Connection to node -1 (/172.21.0.3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)     
kafka-create-topics_1  | [2019-10-07 20:58:24,047] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)                                                         
kafka-create-topics_1  | org.apache.kafka.common.errors.TimeoutException                                                                                                                                 
kafka-create-topics_1  | Processed a total of 0 messages  

兩個文件

kafka-sample-generator.sh
docker-compose.yaml

Dockerfiledocker-compose.yaml

version: '2'
services:

 zookeeper:
   image: confluentinc/cp-zookeeper:5.3.1
   hostname: zookeeper
   ports:
     - '2181:2181'
   environment:
     ZOOKEEPER_CLIENT_PORT: 2181

 kafka:
   image: confluentinc/cp-enterprise-kafka:5.3.1
   hostname: kafka
   ports:
     - '9092:9092'
   depends_on:
     - zookeeper
   environment:
     KAFKA_BROKER_ID: 1
     KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
     KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
     KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
     KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
     CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:9092
     CONFLUENT_METRICS_ENABLE: 'true'
     CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

 kafka-create-topics:
   image: confluentinc/cp-enterprise-kafka:5.3.1
   depends_on:
     - zookeeper
     - kafka
   hostname: kafka-create-topics
   volumes:
     - './kafka-sample-generator.sh:/bin/kafka-sample-generator.sh'
   command: "/bin/kafka-sample-generator.sh"
   environment:
     KAFKA_TOPIC: topic
     KAFKA_BROKER: kafka

範例生成器腳本kafka-sample-generator.sh

#!/bin/sh

HOST=`host $KAFKA_BROKER | awk '/has address/ { print $4 ; exit }'`

## Wait until Kafka is ready then create demo topic
echo 'Waiting for Kafka to be ready...'
cub kafka-ready -b $HOST:9092 1 20 && \
sleep 1

echo "Creating Topic [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"
kafka-topics --create                   \
            --topic $KAFKA_TOPIC       \ 
            --if-not-exists            \
            --zookeeper zookeeper:2181 \
            --partitions 1             
            #--replication-factor 1
sleep 1

echo "Availalbe Topics"
kafka-topics --list --zookeeper zookeeper:2181
sleep 1

## Emit sample data stream
while true
   do echo "Sending Data [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"
   for i in `seq 1 10`;
       do echo "$HOST"
       DATA="{\"data\":\"sample-data-$i\"}"
       echo "$DATA"
       kafka-console-producer       \
           --broker-list $HOST:9092 \
           --topic $KAFKA_TOPIC     \
           "$DATA"
   done
   sleep 1.0


   echo ''
   echo "Receiving Data [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"

   ## FAILES HERE
   ## FAILES HERE
   ## FAILES HERE
   kafka-console-consumer                     \
       --bootstrap-server $HOST:9092          \
       --topic $KAFKA_TOPIC                  #\
       #--partition 0                         \
       #--from-beginning                      \
       #--max-messages 1                      \
       #--timeout-ms 45000                    \
       #--skip-message-on-error
done

這些是我為使其正常工作所做的更改:

  • 禁用指標(它堅持嘗試創建一個複制因子為 3 的主題)
  • 將 kafka-topics create 命令更改為使用複制因子 1。
  • 通過 STDIN 將消息數據傳遞給 kafka-console-producer
  • kafka-console-consumer 從一開始最多接收 10 條消息,因此它不會阻塞等待新消息
version: '2'
services:

 zookeeper:
   image: confluentinc/cp-zookeeper:5.3.1
   hostname: zookeeper
   ports:
     - '2181:2181'
   environment:
     ZOOKEEPER_CLIENT_PORT: 2181

 kafka:
   image: confluentinc/cp-enterprise-kafka:5.3.1
   hostname: kafka
   ports:
     - '9092:9092'
   depends_on:
     - zookeeper
   environment:
     KAFKA_BROKER_ID: 1
     KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
     KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka:9092
     KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
     CONFLUENT_METRICS_ENABLE: 'false'
     CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
     KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT
     KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB  
     KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

 kafka-create-topics:
   image: confluentinc/cp-enterprise-kafka:5.3.1
   depends_on:
     - zookeeper
     - kafka
   hostname: kafka-create-topics
   volumes:
     - './kafka-sample-generator.sh:/bin/kafka-sample-generator.sh'
   command: "/bin/kafka-sample-generator.sh"
   environment:
     KAFKA_TOPIC: topic
     KAFKA_BROKER: kafka
#!/bin/sh

HOST=`host $KAFKA_BROKER | awk '/has address/ { print $4 ; exit }'`

## Wait until Kafka is ready then create demo topic
echo 'Waiting for Kafka to be ready...'
cub kafka-ready -b $HOST:9092 1 20 && \
sleep 1

echo "Creating Topic [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"
kafka-topics --create  --topic $KAFKA_TOPIC   --if-not-exists --zookeeper zookeeper:2181 --partitions 1  --replication-factor 1
sleep 1

echo "Availalbe Topics"
kafka-topics --list --zookeeper zookeeper:2181
sleep 1

## Emit sample data stream
while true
   do echo "Sending Data [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"
   for i in `seq 1 10`;
   do

       echo "$HOST"
       echo "$DATA"

       DATA="{\"data\":\"sample-data-$i\"}"

       echo "$DATA" | kafka-console-producer   \
           --broker-list $HOST:9092            \
           --topic $KAFKA_TOPIC     
   done
   sleep 1.0

   echo ''
   echo "Receiving Data [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"

kafka-console-consumer              \
   --bootstrap-server $HOST:9092   \
   --topic $KAFKA_TOPIC            \
   --from-beginning                \
   --max-messages 10

done

引用自:https://unix.stackexchange.com/questions/545612