Kafka – 如何在Producer类中获取失败的消息详细信息
Kafka允许通过Producer(KafkaProducer)类的以下方法发送异步消息:
public java.util.concurrent.Future send(ProducerRecord record) public java.util.concurrent.Future send(ProducerRecord record, Callback callback)
成功可以通过处理
1) Future
对象或
2)回调调用的onCompletion
方法。 完整方法签名和onCompletion
使用如下( 取自kafka docs )
`
ProducerRecord record = new ProducerRecord("the-topic", key, value); producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) e.printStackTrace(); System.out.println("The offset of the record we just sent is: " + metadata.offset()); } });
虽然故障需要通过传递给onCompletion
方法的Exception e
来处理
到目前为止,每件事看起来都不错。
但是如果我做对了,可以从exception
或e
对象获得的任何合理信息都是stacktrace和exception message。 我在这里指出的是, e
不包含发送的实际记录的任何信息。 或者换句话说,它不包含对发送给kafka代理的实际record
的引用。 因此,如果未成功发送record
,生产者可以进行哪些有用的处理或处理。 真的不多。 为什么我这样说 – 理想情况下我想在某个地方记录失败的消息,然后尝试重新发送它。 但是由于框架提供的信息很少( e
),我觉得这是不可能的。
有人可以指出我是对还是错?
您可以轻松地创建一个接收producerRecord
作为构造函数参数的回调。 因此,在onCompletion
有一个例外,你可以完全了解生产者记录,甚至尝试再次发送它。
我处理了同样的问题。 创建了一个回调,它获取了producerRecord
和一个使用执行程序服务再次发送记录的回调处理程序。 所以最终,我可以容忍任何数量的失败(例如网络问题或kafka失败),并从中恢复。