Kafka Streams:错误退出的正确方法
我已经成功地获得了一个使用,转换和生成数据的流应用程序,但我注意到,流处理器会定期转换到ERROR
状态,并且该进程将在不退出的情况下坐在那里。
显示我的日志:
All stream threads have died. The instance will be in error state and should be closed.
有没有办法告诉Streams应用程序一旦达到ERROR
状态就退出? 也许是各种监视器线程?
我看到Kafka Streams代码的注释中的引用给需要在应用程序达到此状态时关闭应用程序的用户,但是,我无法在文档中找到提及此任务的内容。
有一个简单的方法来执行此关闭步骤吗?
可能是错误的方式可能关闭错误
我的目的是在KafkaStreams
对象上设置UncaughtExceptionHandler
方法,以执行以下操作:
- 记录错误
- 使用原始
KafkaStreams
对象上的close
方法关闭流
结果是:
- 记录exception消息
-
INFO org.apache.kafka.streams.KafkaStreams ... State transition from ERROR to PENDING_SHUTDOWN
-
INFO org.apache.kafka.streams.processor.internals.StreamThread ... Informed to shut down
然后,不幸的是,这个过程似乎没有退出。
FWIW我觉得这可能是对setUncaughtExceptionHandler
的误用
使用UncaughtExceptionHandler
是正确的。 但是,如果使用处理程序回调调用KafkaStreams#close()
,则可能会遇到死锁。 因此,您应该只设置一个标志,并在回调之外调用#close()
,或者使用close()
和超时。 如果超时到期,则关闭是强制。
- Spark Kafka流媒体问题
- 如何在Spark中将JavaPairInputDStream转换为DataSet / DataFrame
- Kafka制作人发送无效字符
- 避免apache kafka使用者中重复消息的有效策略
- kafka 8和内存 – Java Runtime Environment没有足够的内存来继续
- 如何使用Java中的Structured Streaming从Kafka反序列化记录?
- 无法在jConsole中看到kafka.consumer和kafka.producer mBean
- 从0.7升级到0.8.1.1后生成嵌入式kafka队列时出错
- Kafka 0.8.2.2 – 无法发布消息