Kafka 0.10 Java客户端TimeoutException:包含1条记录的批处理已过期

我有一个单节点,多(3)个代理Zookeeper / Kafka设置。 我正在使用Kafka 0.10 Java客户端。

我写了以下简单的远程(在与Kafka不同的服务器上)生产者(在代码中我用MYIP替换了我的公共IP地址):

Properties config = new Properties(); try { config.put(ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName()); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "MYIP:9092, MYIP:9093, MYIP:9094"); config.put(ProducerConfig.ACKS_CONFIG, "all"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); producer = new KafkaProducer(config); Schema.Parser parser = new Schema.Parser(); schema = parser.parse(GATEWAY_SCHEMA); recordInjection = GenericAvroCodecs.toBinary(schema); GenericData.Record avroRecord = new GenericData.Record(schema); //Filling in avroRecord (code not here) byte[] bytes = recordInjection.apply(avroRecord); Future future = producer.send(new ProducerRecord(datasetId+"", "testKey", bytes)); RecordMetadata data = future.get(); } catch (Exception e) { e.printStackTrace(); } 

我的3个代理的服务器属性如下所示(在3个不同的服务器属性文件中,broker.id为0,1,2,侦听器为PLAINTEXT://:9092,PLAINTEXT://:9093,PLAINTEXT://:9094和host.name是10.2.0.4,10.2.0.5,10.2.0.6)。 这是第一个服务器属性文件:

 broker.id=0 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka1-logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000 

当我执行代码时,我得到以下exception:

 java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0 at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) at com.nr.roles.gateway.GatewayManager.addTransaction(GatewayManager.java:212) at com.nr.roles.gateway.gw.service(gw.java:126) at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:821) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1158) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1090) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:109) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:119) at org.eclipse.jetty.server.Server.handle(Server.java:517) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:308) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:242) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:261) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95) at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:75) at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213) at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0 

有谁知道我错过了什么? 任何帮助,将不胜感激。 非常感谢

我遇到了同样的问题。

您应该更改kafka server.properties以指定IP地址。 例如:

YOUIP :// YOUIP :9093

如果没有,kafka将使用主机名,如果生产者无法获得主机,即使你可以telnet它们也无法向kafka发送消息。

BOOTSTRAP_SERVERS_CONFIG配置中的端口信息不正确(MYIP: 9092 )。

正如您在server.properties中提到的那样,“PLAINTEXT://:9093,PLAINTEXT://:9093,PLAINTEXT://:9094”。

这个答案有一些见解。 您可以增加request.timeout.ms生成器配置,这将允许客户端在到期之前将批次排队更长时间。

您可能还需要查看batch.size和linger.ms配置,并找到适合您情况的最佳配置。