ACTIVEMQ-发布者订阅者hello world示例

有两个程序:订阅者和发布者…订阅者能够将消息放到主题上并成功发送消息。 当我在浏览器上检查activemq服务器时,它显示1 msg排队。 但是当我运行消费者代码时,它没有收到消息

这是生产者代码:

import javax.jms.*; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class producer { private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); Connection connection = connectionFactory.createConnection(); connection.start(); // JMS messages are sent and received using a Session. We will // create here a non-transactional session object. If you want // to use transactions you should set the first parameter to 'true' Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("testt"); MessageProducer producer = session.createProducer(topic); // We will send a small text message saying 'Hello' TextMessage message = session.createTextMessage(); message.setText("HELLO JMS WORLD"); // Here we are sending the message! producer.send(message); System.out.println("Sent message '" + message.getText() + "'"); connection.close(); } } 

运行此代码后,控制台的输出为:

 26 Jan, 2012 2:30:04 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect INFO: Successfully connected to tcp://localhost:61616 Sent message 'HELLO JMS WORLD' 

这是消费者代码:

 import javax.jms.*; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class consumer { // URL of the JMS server private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; // Name of the topic from which we will receive messages from = " testt" public static void main(String[] args) throws JMSException { // Getting JMS connection from the server ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("testt"); MessageConsumer consumer = session.createConsumer(topic); MessageListener listner = new MessageListener() { public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; System.out.println("Received message" + textMessage.getText() + "'"); } } catch (JMSException e) { System.out.println("Caught:" + e); e.printStackTrace(); } } }; consumer.setMessageListener(listner); connection.close(); } } 

运行此代码后,它没有显示任何内容。 有人可以帮我解决这个问题吗?

您的问题是您的消费者正在运行,然后立即关闭。

尝试将此添加到您的消费者:

  consumer.setMessageListener(listner); try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } connection.close(); 

这将等到你在停止之前按下一个键。

其他需要考虑的事项:

  • 使用finally块进行关闭
  • Java命名约定鼓励在类的第一个字母处使用大写

主要问题(除了快速关闭应用程序)是您发送到主题。 主题不保留消息,因此如果您运行生成然后运行使用者的应用程序,则消费者将不会收到任何内容,因为它在发送消息时未订阅该主题。 如果您修复了关闭问题,然后在一个终端中运行使用者,然后运行生产者,那么您应该看到消费者收到的消息。 如果您想要保留邮件,那么您需要使用一个队列来保留邮件,直到有人使用它为止。

一些:

  • 使用队列而不是主题。 如果没有可用的消费者,主题中的消息将被丢弃,它们不会持久存在。
  • 设置消息监听器后添加connection.start()。 在正确设置所有消费者/生产者时,您应该开始连接。
  • 在再次关闭连接之前等待一段时间。

该主题可能是您最重要的失败原因。

你的生产者类是正确的。 它运行顺利。

但是,您的消费者不正确,您必须对其进行修改。

  • 首先,在创建连接对象后添加setClientID(“any_string_value”) ;

    例如: Connection connection = connectionFactory.createConnection(); // need to setClientID value, any string value you wish connection.setClientID("12345"); Connection connection = connectionFactory.createConnection(); // need to setClientID value, any string value you wish connection.setClientID("12345");

  • 其次,使用createDurableSubscriber()方法而不是createConsumer()来通过主题传输消息。

    MessageConsumer consumer = session.createDurableSubscriber(topic,"SUB1234");

这是修改过的comsumer类:

 package mq.test; import javax.jms.*; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class consumer { // URL of the JMS server private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; // Name of the topic from which we will receive messages from = " testt" public static void main(String[] args) throws JMSException { // Getting JMS connection from the server ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); Connection connection = connectionFactory.createConnection(); // need to setClientID value, any string value you wish connection.setClientID("12345"); try{ connection.start(); }catch(Exception e){ System.err.println("NOT CONNECTED!!!"); } Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("test_data"); //need to use createDurableSubscriber() method instead of createConsumer() for topic // MessageConsumer consumer = session.createConsumer(topic); MessageConsumer consumer = session.createDurableSubscriber(topic, "SUB1234"); MessageListener listner = new MessageListener() { public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; System.out.println("Received message" + textMessage.getText() + "'"); } } catch (JMSException e) { System.out.println("Caught:" + e); e.printStackTrace(); } } }; consumer.setMessageListener(listner); //connection.close(); } } 

现在,您的代码将成功运行。