从JMS MessageListener发出回滚信号

我一直在使用JMS和ActiveMQ。 一切都在创造奇迹。 我不是用spring,也不是我。

接口javax.jms.MessageListener只有一个方法onMessage 。 在实现中,可能会抛出exception。 如果实际上抛出exception,那么我说消息没有正确处理,需要重新尝试。 所以,我需要ActiveMQ等待一段时间,然后重试。 即我需要抛出exception来回滚JMS事务。

我怎样才能完成这样的行为?

也许ActiveMQ中有一些我无法找到的配置。

或者……也许可以取消将MessageListener注册到消费者并自己使用消息,如下所示:

 while (true) { // ... some administrative stuff like ... session = connection.createSesstion(true, SESSION_TRANSACTED) try { Message m = receiver.receive(queue, 1000L); theMessageListener.onMessage(m); session.commit(); } catch (Exception e) { session.rollback(); Thread.sleep(someTimeDefinedSomewhereElse); } // ... some more administrative stuff } 

在几个线程中,而不是注册监听器。

或者……我可以以某种方式装饰/ AOP /字节操作MessageListener来自己做。

你会采取什么途径?为什么?

注意 :我没有完全控制MessageListener的代码。

编辑概念certificate的测试:

 @Test @Ignore("Interactive test, just a proof of concept") public void transaccionConListener() throws Exception { final AtomicInteger atomicInteger = new AtomicInteger(0); BrokerService brokerService = new BrokerService(); String bindAddress = "vm://localhost"; brokerService.addConnector(bindAddress); brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); brokerService.setUseJmx(false); brokerService.start(); ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress); RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); redeliveryPolicy.setInitialRedeliveryDelay(500); redeliveryPolicy.setBackOffMultiplier(2); redeliveryPolicy.setUseExponentialBackOff(true); redeliveryPolicy.setMaximumRedeliveries(2); activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); activeMQConnectionFactory.setUseRetroactiveConsumer(true); activeMQConnectionFactory.setClientIDPrefix("ID"); PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); pooledConnectionFactory.start(); Connection connection = pooledConnectionFactory.createConnection(); Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); Queue helloQueue = session.createQueue("Hello"); MessageConsumer consumer = session.createConsumer(helloQueue); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { switch (atomicInteger.getAndIncrement()) { case 0: System.out.println("OK, first message received " + textMessage.getText()); message.acknowledge(); break; case 1: System.out.println("NOPE, second must be retried " + textMessage.getText()); throw new RuntimeException("I failed, aaaaah"); case 2: System.out.println("OK, second message received " + textMessage.getText()); message.acknowledge(); } } catch (JMSException e) { e.printStackTrace(System.out); } } }); connection.start(); { // A client sends two messages... Connection connection1 = pooledConnectionFactory.createConnection(); Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection1.start(); MessageProducer producer = session1.createProducer(helloQueue); producer.send(session1.createTextMessage("Hello World 1")); producer.send(session1.createTextMessage("Hello World 2")); producer.close(); session1.close(); connection1.stop(); connection1.close(); } JOptionPane.showInputDialog("I will wait, you watch the log..."); consumer.close(); session.close(); connection.stop(); connection.close(); pooledConnectionFactory.stop(); brokerService.stop(); assertEquals(3, atomicInteger.get()); } 

如果要将SESSION_TRANSACTED用作确认模式,则需要在Connection / ConnectionFactory上设置RedeliveryPolicy 。 ActiveMQ网站上的这个页面还包含一些您可能需要做的好信息。

由于您没有使用Spring,因此您可以使用类似于以下代码的内容设置RedeliveryPolicy(取自上述链接之一):

 RedeliveryPolicy policy = connection.getRedeliveryPolicy(); policy.setInitialRedeliveryDelay(500); policy.setBackOffMultiplier(2); policy.setUseExponentialBackOff(true); policy.setMaximumRedeliveries(2); 

编辑将您的代码段添加到答案中,以下显示了它如何与事务一起使用。 尝试使用Session.rollback()方法注释掉此代码,您将看到使用SESION_TRANSACTED和Session.commit / rollback按预期工作:

 @Test public void test() throws Exception { final AtomicInteger atomicInteger = new AtomicInteger(0); BrokerService brokerService = new BrokerService(); String bindAddress = "vm://localhost"; brokerService.addConnector(bindAddress); brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); brokerService.setUseJmx(false); brokerService.start(); ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress); RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); redeliveryPolicy.setInitialRedeliveryDelay(500); redeliveryPolicy.setBackOffMultiplier(2); redeliveryPolicy.setUseExponentialBackOff(true); redeliveryPolicy.setMaximumRedeliveries(2); activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); activeMQConnectionFactory.setUseRetroactiveConsumer(true); activeMQConnectionFactory.setClientIDPrefix("ID"); PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); pooledConnectionFactory.start(); Connection connection = pooledConnectionFactory.createConnection(); final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue helloQueue = session.createQueue("Hello"); MessageConsumer consumer = session.createConsumer(helloQueue); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { switch (atomicInteger.getAndIncrement()) { case 0: System.out.println("OK, first message received " + textMessage.getText()); session.commit(); break; case 1: System.out.println("NOPE, second must be retried " + textMessage.getText()); session.rollback(); throw new RuntimeException("I failed, aaaaah"); case 2: System.out.println("OK, second message received " + textMessage.getText()); session.commit(); } } catch (JMSException e) { e.printStackTrace(System.out); } } }); connection.start(); { // A client sends two messages... Connection connection1 = pooledConnectionFactory.createConnection(); Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection1.start(); MessageProducer producer = session1.createProducer(helloQueue); producer.send(session1.createTextMessage("Hello World 1")); producer.send(session1.createTextMessage("Hello World 2")); producer.close(); session1.close(); connection1.stop(); connection1.close(); } JOptionPane.showInputDialog("I will wait, you watch the log..."); consumer.close(); session.close(); connection.stop(); connection.close(); pooledConnectionFactory.stop(); assertEquals(3, atomicInteger.get()); } 

}

您需要将确认模式设置为Session.CLIENT_ACKNOWLEDGE,客户端通过调用消息的确认方法来确认消耗的消息。

QueueSession session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

然后,在处理消息之后需要调用Message.acknowledge()方法以删除该消息。

 Message message = ...; // Processing message message.acknowledge(); 

如果您的会话是事务处理,那么无论如何都会忽略“acknowledgeMode”。因此,只需保持会话事务处理并使用session.rollback和session.commit提交或回滚您的事务。