Tag: spring kafka

带有transactionIdPrefix的DefaultKafkaProducerFactory会在引导服务器关闭时等待

HY, 我正在使用spring-kafka 1.3.0.RELEASE创建一个事务生成器。 当引导服务器关闭时,DefaultKafkaProducerFactory会无休止地等待,直到引导服务器启动。 我究竟做错了什么 ? 我可以设置超时和/或其他类似属性吗? 这是我重现场景的代码示例: public static void main(String[] args) { final DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(producerConfigs()); producerFactory.setTransactionIdPrefix(“transactionIdPrefix”); final Producer producer = producerFactory.createProducer(); System.out.println(“Created producer:” + producer); } private static Map producerConfigs() { final Map props = new HashMap(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “192.168.56.1:9092”); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringSerializer”); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringSerializer”); props.put(ProducerConfig.RETRIES_CONFIG, 1); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, […]

Kafurn in Kubernetes – 将协调员标记为团体死亡

我对Kubernetes很新,并想用它来设置Kafka和zookeeper。 我能够使用StatefulSets在Kubernetes中设置Apache Kafka和Zookeeper。 我按照这个来构建我的清单文件。 我制作了1张kafka和zookeeper的复制品,并且还使用了持久卷。 所有pod都在运行并准备就绪。 我尝试通过指定nodePort(30010)来公开kafka并使用Service 。 看起来这会将kafka暴露给外界,在那里他们可以向kafka经纪人发送消息并从中消费。 但是在我的Java应用程序中,我创建了一个使用者并将bootstrapServer添加为:30010 ,显示了以下日志: INFO oakcciAbstractCoordinator – Discovered coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) for group workerListener. INFO oakcciAbstractCoordinator – Marking the coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) dead for group workerListener 有趣的是,当我使用kubectl命令测试集群时,我能够生成和使用消息: kubectl run -ti –image=gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 produce –restart=Never –rm \ — kafka-console-producer.sh –topic test –broker-list kafka-0.kafka-hs.default.svc.cluster.local:9093 done; […]