无法让ActiveMQ重新发送我的消息

我有一个用Java编写的单线程ActiveMQ使用者。 我所要做的就是从队列中接收()一个消息,尝试将其发送到Web服务,如果成功则确认()它。 如果Web服务调用失败,我希望消息保留在队列中并在超时后重新发送。

它或多或少都在工作,除了重发部分:每次重新启动我的消费者时,它会为每个仍然在队列中的消息收到一条消息,但是在发送它们之后,消息永远不会被重新发送。

我的代码看起来像:

public boolean init() throws JMSException, FileNotFoundException, IOException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); RedeliveryPolicy policy = new RedeliveryPolicy(); policy.setInitialRedeliveryDelay(500); policy.setBackOffMultiplier(2); policy.setUseExponentialBackOff(true); connectionFactory.setRedeliveryPolicy(policy); connectionFactory.setUseRetroactiveConsumer(true); // ???? Connection connection = connectionFactory.createConnection(); connection.setExceptionListener(this); connection.start(); session = connection.createSession(transacted, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); destination = session.createQueue(subject); //??? consumer = session.createConsumer(destination); //consumer.setMessageListener(this); // message listener had same behaviour } private void process() { while(true) { System.out.println("Waiting..."); try { Message message = consumer.receive(); onMessage(message); } catch (JMSException e) { e.printStackTrace(); } try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } @Override public void onMessage(Message message) { System.out.println("onMessage"); messagesReceived++; if (message instanceof TextMessage) { try { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); if(!client.sendMessage(msg)) { System.out.println("Webservice call failed. Keeping message"); //message. } else { message.acknowledge(); } if (transacted) { if ((messagesReceived % batch) == 0) { System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived); session.commit(); } } } catch (JMSException e) { e.printStackTrace(); } } } 

我目前没有使用交易(也许我应该这样做?)。

我确定我错过了一些简单的东西,很快就会拍打我的额头,但我似乎无法弄清楚这是怎么回事。 谢谢!


编辑:我自己也不能回答这个问题:

好的,经过一些实验,事实certificate交易是实现这一目标的唯一方法。 这是新代码:

 public boolean init() throws JMSException, FileNotFoundException, IOException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); RedeliveryPolicy policy = new RedeliveryPolicy(); policy.setInitialRedeliveryDelay(1000L); policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES); connectionFactory.setRedeliveryPolicy(policy); connectionFactory.setUseRetroactiveConsumer(true); Connection connection = connectionFactory.createConnection(); connection.setExceptionListener(this); connection.start(); session = connection.createSession(transacted, ActiveMQSession.CLIENT_ACKNOWLEDGE); destination = session.createQueue(subject); consumer = session.createConsumer(destination); } @Override public void onMessage(Message message) { System.out.println("onMessage"); messagesReceived++; if (message instanceof TextMessage) { try { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); if(client.sendMessage(msg)) { if(transacted) { System.out.println("Call succeeded - committing message"); session.commit(); } //message.acknowledge(); } else { if(transacted) { System.out.println("Webservice call failed. Rolling back message"); session.rollback(); } } } catch (JMSException e) { e.printStackTrace(); } } } 

现在,重新传送策略中指定的消息每1000毫秒重新发送一次。

希望这有助于其他人! 🙂

您不必使用事务,CLIENT_ACK / Session.recover()也可以使用…

发生以下任何情况时,会将邮件重新传递给客户端:

  • 使用事务会话并调用rollback()。
  • 在调用commit之前关闭事务会话。
  • 会话正在使用CLIENT_ACKNOWLEDGE并调用Session.recover()。

请参阅http://activemq.apache.org/message-redelivery-and-dlq-handling.html