Spring集成 – Queue / Poller似乎在没有任何动作的情况下耗尽线程池
我有一个Spring集成应用程序,附加到AMQP代理。
我想从amqp队列接收消息,并更新db记录。
为了提高性能,我有一个工作池允许多个更新同时发生。
我有以下配置:
如果我开始运行,没有在AMQP频道上处理的入站消息,我很快就会看到thredpool耗尽,并开始拒绝。
这是日志:
[Thu Apr 2013 23:41:51.153] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-w4qPp60jVEQOIEovR4cERv], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0 [Thu Apr 2013 23:41:51.160] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-Q3Lq4R9g9E8WBNVLYzaFmq], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), acknowledgeMode=AUTO local queue size=0 [Thu Apr 2013 23:41:51.166] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-w8bg7ltEV2mot8QXDPCmfK], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), acknowledgeMode=AUTO local queue size=0 [Thu Apr 2013 23:41:51.170] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-A-0KdqhFjpc-Hvjmv7aZAc], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,4), acknowledgeMode=AUTO local queue size=0 [Thu Apr 2013 23:41:51.180] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false' [Thu Apr 2013 23:41:51.180] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false' [Thu Apr 2013 23:41:51.199] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false' [Thu Apr 2013 23:41:51.200] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false' [Thu Apr 2013 23:41:51.220] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
很快,线程池开始拒绝执行:
[Thu Apr 2013 23:47:15.363] ERROR [] (org.springframework.integration.handler.LoggingHandler:126) - org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6ff3cb0e] did not accept task: org.springframework.integration.util.ErrorHandlingTaskExecutor$1@78615c8b at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:244) at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:231) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53) at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:680) Caused by: java.util.concurrent.RejectedExecutionException at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1768) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658) at org.springframework.sched
uling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:241)… 12个其他
我怀疑罪魁祸首在于: BlockingQueueConsumer
– 指示消息的每个轮询都会阻塞线程,直到消息到达…导致线程池快速耗尽。
配置它的正确方法是什么?
而不是使用QueueChannel
和poller,为什么不简单地增加入站适配器上的concurrent-consumers
属性?
Specify the number of concurrent consumers to create. Default is 1. Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume queues.
并删除
和
。
另外,我总是建议在日志中包含线程名称(log4J为%t
); 它使调试线程问题变得更容易。
编辑:
使用轮询器,您的线程用完的原因是轮询器的默认receive-timeout
为1秒。 您每50ms调度一个线程,但每个线程在QueueChannel
等待1秒钟。 最终你的任务队列填满了。
为避免这种情况,如果您希望继续使用此技术,只需在
上将receive-timeout
设置为0
– 但在适配器中使用更高的并发性更有效,因为没有轮询或切换到另一个线程。
看起来我需要一个桥接器来映射amqp入站队列(它是一个发布/子样式队列)和一个队列通道。
这似乎是很多代码来完成一项相当简单的任务 – 所以如果有人有更好的解决方案或改进建议,我很乐意看到它们。