带有transactionIdPrefix的DefaultKafkaProducerFactory会在引导服务器关闭时等待

HY,

我正在使用spring-kafka 1.3.0.RELEASE创建一个事务生成器。 当引导服务器关闭时,DefaultKafkaProducerFactory会无休止地等待,直到引导服务器启动。

我究竟做错了什么 ? 我可以设置超时和/或其他类似属性吗?

这是我重现场景的代码示例:

public static void main(String[] args) { final DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(producerConfigs()); producerFactory.setTransactionIdPrefix("transactionIdPrefix"); final Producer producer = producerFactory.createProducer(); System.out.println("Created producer:" + producer); } private static Map producerConfigs() { final Map props = new HashMap(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.1:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.RETRIES_CONFIG, 1); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000); return props; } 

它是由工厂在创建生成器后调用initTransactions()上的initTransactions()引起的,例如,如果没有足够的代理来支持事务日志复制因子。

我不知道为什么超时不适用于该操作。

我们可能会更改工厂以推迟initTransactions()直到第一个beginTransaction() – 但这只会将问题推向下游。

我测试了kafka 1.0.0客户端(可以与1.3.1或更高版本一起使用 – 目前为1.3.2)并且它仍然存在问题。 我认为应该尊重TRANSACTION_TIMEOUT_CONFIG但似乎没有。

我建议在Kafka JIRA上开一个问题。