Kafka
kafka-console-consumer 在 Docker 中沒有收到消息
Kafka 消費者工具未連接到 Docker 中的集群
命令
docker-compose -f docker-compose.yaml up --force-recreate --remove-orphans
Docker Compose 命令調出 3 個虛擬機:
zookeeper
,kafka
,kafka-create-topics
. 第三個 VM 生成一個新主題topic
並將消息推送到該主題並嘗試使用。前兩個命令似乎可以工作並且沒有發出任何錯誤。然而kafka-console-consumer
失敗了,似乎不管使用什麼配置參數。
cub kafka-ready
- 成功kafka-topics
- 成功kafka-console-producer
- 成功kafka-console-consumer
- 每次都失敗錯誤
kafka-create-topics_1 | [2019-10-07 21:03:00,556] WARN [Consumer clientId=consumer-1, groupId=console-consumer-58118] Connection to node -1 (/172.21.0.3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) kafka-create-topics_1 | [2019-10-07 20:58:24,047] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) kafka-create-topics_1 | org.apache.kafka.common.errors.TimeoutException kafka-create-topics_1 | Processed a total of 0 messages
兩個文件
kafka-sample-generator.sh docker-compose.yaml
Dockerfile
docker-compose.yaml
version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:5.3.1 hostname: zookeeper ports: - '2181:2181' environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-enterprise-kafka:5.3.1 hostname: kafka ports: - '9092:9092' depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:9092 CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' kafka-create-topics: image: confluentinc/cp-enterprise-kafka:5.3.1 depends_on: - zookeeper - kafka hostname: kafka-create-topics volumes: - './kafka-sample-generator.sh:/bin/kafka-sample-generator.sh' command: "/bin/kafka-sample-generator.sh" environment: KAFKA_TOPIC: topic KAFKA_BROKER: kafka
範例生成器腳本
kafka-sample-generator.sh
#!/bin/sh HOST=`host $KAFKA_BROKER | awk '/has address/ { print $4 ; exit }'` ## Wait until Kafka is ready then create demo topic echo 'Waiting for Kafka to be ready...' cub kafka-ready -b $HOST:9092 1 20 && \ sleep 1 echo "Creating Topic [$HOST:9092 <topic:'$KAFKA_TOPIC'>]" kafka-topics --create \ --topic $KAFKA_TOPIC \ --if-not-exists \ --zookeeper zookeeper:2181 \ --partitions 1 #--replication-factor 1 sleep 1 echo "Availalbe Topics" kafka-topics --list --zookeeper zookeeper:2181 sleep 1 ## Emit sample data stream while true do echo "Sending Data [$HOST:9092 <topic:'$KAFKA_TOPIC'>]" for i in `seq 1 10`; do echo "$HOST" DATA="{\"data\":\"sample-data-$i\"}" echo "$DATA" kafka-console-producer \ --broker-list $HOST:9092 \ --topic $KAFKA_TOPIC \ "$DATA" done sleep 1.0 echo '' echo "Receiving Data [$HOST:9092 <topic:'$KAFKA_TOPIC'>]" ## FAILES HERE ## FAILES HERE ## FAILES HERE kafka-console-consumer \ --bootstrap-server $HOST:9092 \ --topic $KAFKA_TOPIC #\ #--partition 0 \ #--from-beginning \ #--max-messages 1 \ #--timeout-ms 45000 \ #--skip-message-on-error done
這些是我為使其正常工作所做的更改:
- 禁用指標(它堅持嘗試創建一個複制因子為 3 的主題)
- 將 kafka-topics create 命令更改為使用複制因子 1。
- 通過 STDIN 將消息數據傳遞給 kafka-console-producer
- kafka-console-consumer 從一開始最多接收 10 條消息,因此它不會阻塞等待新消息
version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:5.3.1 hostname: zookeeper ports: - '2181:2181' environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-enterprise-kafka:5.3.1 hostname: kafka ports: - '9092:9092' depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka:9092 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" CONFLUENT_METRICS_ENABLE: 'false' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 kafka-create-topics: image: confluentinc/cp-enterprise-kafka:5.3.1 depends_on: - zookeeper - kafka hostname: kafka-create-topics volumes: - './kafka-sample-generator.sh:/bin/kafka-sample-generator.sh' command: "/bin/kafka-sample-generator.sh" environment: KAFKA_TOPIC: topic KAFKA_BROKER: kafka
#!/bin/sh HOST=`host $KAFKA_BROKER | awk '/has address/ { print $4 ; exit }'` ## Wait until Kafka is ready then create demo topic echo 'Waiting for Kafka to be ready...' cub kafka-ready -b $HOST:9092 1 20 && \ sleep 1 echo "Creating Topic [$HOST:9092 <topic:'$KAFKA_TOPIC'>]" kafka-topics --create --topic $KAFKA_TOPIC --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 sleep 1 echo "Availalbe Topics" kafka-topics --list --zookeeper zookeeper:2181 sleep 1 ## Emit sample data stream while true do echo "Sending Data [$HOST:9092 <topic:'$KAFKA_TOPIC'>]" for i in `seq 1 10`; do echo "$HOST" echo "$DATA" DATA="{\"data\":\"sample-data-$i\"}" echo "$DATA" | kafka-console-producer \ --broker-list $HOST:9092 \ --topic $KAFKA_TOPIC done sleep 1.0 echo '' echo "Receiving Data [$HOST:9092 <topic:'$KAFKA_TOPIC'>]" kafka-console-consumer \ --bootstrap-server $HOST:9092 \ --topic $KAFKA_TOPIC \ --from-beginning \ --max-messages 10 done