连接到Apache Zookeeper的Apache Kafka客户端客户端:EndOfStreamException

尝试使用独立的Zookeer(3.4.5)从Kafka(2.9.2-0.8.1)“消费”消息时出现错误。 您可以看到下面的源代码以及来自Zookeeper的错误消息和日志文件。

我不确定Java库是否不兼容,因为我通过Maven添加了依赖关系kafka_0.9.2(0.8.1),它自动解决了zkclient(0.3)和zookeeper(3.3.4)的依赖关系。

消费者源代码:

import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.javaapi.consumer.ConsumerConnector; public class ConsumerTest { public static void main(String[] args) { try { Properties props = new Properties(); props.put("zookeeper.connect", "192.168.0.1:2181/kafka"); props.put("group.id", "my-consumer"); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig config = new ConsumerConfig(props); @SuppressWarnings("unused") ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config); } catch(Exception e) { System.out.println(e.getMessage()); e.printStackTrace(); } } } 

pom.xml:

 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>test.my</groupId> <artifactId>kafka-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <exclusions> <exclusion> <artifactId>jms</artifactId> <groupId>javax.jms</groupId> </exclusion> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> </exclusions> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1</version> </dependency> </dependencies> </dependencyManagement> </project> 

exception消息和堆栈跟踪:

 Unable to connect to zookeeper server within timeout: 400 org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 400 at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:880) at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98) at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:84) at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:156) at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:114) at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:65) at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:67) at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) at ConsumerTest.main(ConsumerTest.java:23) 

动物园pipe理员日志:

 2014-05-06 11:48:11,907 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /192.168.0.4:52568 2014-05-06 11:48:11,909 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@349] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x0, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:701) 2014-05-06 11:48:11,909 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1001] - Closed socket connection for client /192.168.0.4:52568 (no session established for client) 

注意我可以使用命令行工具成功地“生成”和“消费”来自Kafka节点的消息:

 $ sudo -u kafka bin/kafka-console-producer.sh --broker-list 192.168.0.2:9092,192.168.0.3:9092 --topic my-topic SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. This is a first message. This is a second message. $ sudo -u kafka bin/kafka-console-consumer.sh --zookeeper 192.168.0.1:2181/kafka --topic my-topic --from-beginning SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. This is a first message. This is a second message. 

我甚至可以成功地从Java客户端生产者生成消息。

这可能是因为您的系统驱动程序不允许您连接到其他主机上运行的zookeeper。 只需将主机添加到您的/etc/hosts

从Windows中,转到文件C:\Windows\System32\drivers\etc\hosts

并添加下面的行:
# 192.168.0.2 rhino.acme.com # source server
192.168.0.2 – >动物园pipe理员的ip运行