JMS – 从一个消费者到多个消费者

我有一个JMS客户端,它生成消息并通过JMS队列发送给它的唯一消费者。

我想要的是不止一个消费者获得这些消息。 我想到的第一件事就是将队列转换为主题,因此当前和新的消费者可以订阅并获得传递给所有消息的相同消息。

这显然涉及在生产者和消费者方面修改当前客户端代码。

我还想看看其他选项,比如创建第二个队列,这样我就不必修改现有的消费者了。 我相信这种方法有一些优点(如果我错了,请纠正我)平衡两个不同队列之间的负载而不是一个,这可能会对性能产生积极影响。

我想就你可能会看到的这些选项和缺点/专业人士提出建议。 任何反馈都非常感谢。

你有一些选择,如你所说。

如果将其转换为主题以获得相同的效果,则需要使消费者成为持久消费者。 如果您的消费者不活着,那么队列提供的一件事就是持久性。 这取决于您使用的MQ系统。

如果您想坚持使用队列,您将为每个使用者创建一个队列,并为将侦听原始队列的调度程序创建一个队列。

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1 -> Queue_Consumer_2 <- Consumer_2 -> Queue_Consumer_3 <- Consumer_3 

主题的优点

  • 更容易动态添加新的消费者。 所有消费者都将获得新消息而无需任何工作。
  • 您可以创建循环主题,以便Consumer_1将收到消息,然后是Consumer_2,然后是Consumer_3
  • 消费者可以推送新消息,而不必查询队列,使其成为被动的。

主题的缺点

  • 除非您的代理支持此配置,否则消息不会持久。 如果消费者下线并返回,除非设置了持久性消费者,否则可能会丢失消息。
  • 很难让Consumer_1和Consumer_2接收消息而不是Consumer_3。 使用Dispatcher和Queues,Dispatcher无法将消息放入Consumer_3的队列中。

队列的优点

  • 在消费者删除消息之前,消息是持久的
  • 调度程序可以通过不将消息放入相应的消费者队列来过滤哪些消费者获得哪些消息。 这可以通过filter来完成主题。

队列的缺点

  • 需要创建其他队列以支持多个消费者。 在动态环境中,这不会有效。

在开发消息传递系统时,我更喜欢主题,因为它给了我最大的权力,但是当你已经在使用队列时,它会要求你改变系统的工作方式来实现主题。

具有多个消费者的队列系统的设计与实现

 Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1 -> Queue_Consumer_2 <- Consumer_2 -> Queue_Consumer_3 <- Consumer_3 

资源

请记住,还有其他需要处理的事情,例如问题exception处理,重新连接到连接和队列,如果你失去连接等等。这只是为了让你知道如何完成我的工作描述。

在一个真实的系统中,我可能不会在第一个exception时退出。 我会允许系统继续尽可能地运行并记录错误。 正如在此代码中所示,如果将消息放入单个消费者队列中,则整个调度程序将停止。

Dispatcher.java

 /* * To change this template, choose Tools | Templates * and open the template in the editor. */ package stackoverflow_4615895; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSession; import javax.jms.Session; public class Dispatcher { private static long QUEUE_WAIT_TIME = 1000; private boolean mStop = false; private QueueConnectionFactory mFactory; private String mSourceQueueName; private String[] mConsumerQueueNames; /** * Create a dispatcher * @param factory * The QueueConnectionFactory in which new connections, session, and consumers * will be created. This is needed to ensure the connection is associated * with the correct thread. * @param source * * @param consumerQueues */ public Dispatcher( QueueConnectionFactory factory, String sourceQueue, String[] consumerQueues) { mFactory = factory; mSourceQueueName = sourceQueue; mConsumerQueueNames = consumerQueues; } public void start() { Thread thread = new Thread(new Runnable() { public void run() { Dispatcher.this.run(); } }); thread.setName("Queue Dispatcher"); thread.start(); } public void stop() { mStop = true; } private void run() { QueueConnection connection = null; MessageProducer producer = null; MessageConsumer consumer = null; QueueSession session = null; try { // Setup connection and queues for receiving the messages connection = mFactory.createQueueConnection(); session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE); Queue sourceQueue = session.createQueue(mSourceQueueName); consumer = session.createConsumer(sourceQueue); // Create a null producer allowing us to send messages // to any queue. producer = session.createProducer(null); // Create the destination queues based on the consumer names we // were given. Queue[] destinationQueues = new Queue[mConsumerQueueNames.length]; for (int index = 0; index < mConsumerQueueNames.length; ++index) { destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]); } connection.start(); while (!mStop) { // Only wait QUEUE_WAIT_TIME in order to give // the dispatcher a chance to see if it should // quit Message m = consumer.receive(QUEUE_WAIT_TIME); if (m == null) { continue; } // Take the message we received and put // it in each of the consumers destination // queues for them to process for (Queue q : destinationQueues) { producer.send(q, m); } } } catch (JMSException ex) { // Do wonderful things here } finally { if (producer != null) { try { producer.close(); } catch (JMSException ex) { } } if (consumer != null) { try { consumer.close(); } catch (JMSException ex) { } } if (session != null) { try { session.close(); } catch (JMSException ex) { } } if (connection != null) { try { connection.close(); } catch (JMSException ex) { } } } } } 

Main.java

  QueueConnectionFactory factory = ...; Dispatcher dispatcher = new Dispatcher( factory, "Queue_Original", new String[]{ "Consumer_Queue_1", "Consumer_Queue_2", "Consumer_Queue_3"}); dispatcher.start(); 

您可能不必修改代码; 这取决于你如何写它。

例如,如果您的代码使用MessageProducer而不是QueueSender发送消息,那么它将适用于主题和队列。 同样,如果您使用MessageConsumer而不是QueueReceiver

从本质上讲,在JMS应用程序中使用非特定接口与JMS系统进行交互是很好的做法,例如MessageProducerMessageConsumerDestination等。如果是这种情况,那么它只是一个“仅仅”的配置问题。