ActiveMQ和嵌入式代理

编辑:改述问题:

我想使用ActiveMQ作为我的服务器和客户端应用程序之间的信使服务。

我试图在服务器中设置一个嵌入式代理(即不是一个单独的进程)来处理生成的消息供我的客户端使用。 此队列是持久的。

经纪人初始化如下:

BrokerService broker = new BrokerService(); KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(); adaptor.setDirectory(new File("activemq")); broker.setPersistenceAdapter(adaptor); broker.setUseJmx(true); broker.addConnector("tcp://localhost:61616"); broker.start(); 

经过修修补补后,我最终得到的服务器部分是:

 public static class HelloWorldProducer implements Runnable { public void run() { try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); // apparently the vm part is all i need Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("TEST.FOO"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); TextMessage message = session.createTextMessage(text); System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName()); producer.send(message); session.close(); connection.close(); } catch (Exception e) { System.out.println("Caught: " + e); e.printStackTrace(); } } } 

客户端非常相似,看起来像这样:

 public static class HelloWorldConsumer implements Runnable, ExceptionListener { public void run() { try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); Connection connection = connectionFactory.createConnection(); // exception happens here... connection.start(); connection.setExceptionListener(this); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("TEST.FOO"); MessageConsumer consumer = session.createConsumer(destination); Message message = consumer.receive(1000); if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("*****Received: " + text); } else { System.out.println("*****Received obj: " + message); } consumer.close(); session.close(); connection.close(); } catch (Exception e) { System.out.println("Caught: " + e); e.printStackTrace(); } } 

main方法只是在一个线程中启动它们中的每一个以开始生成/接收消息。

…但是我在每个线程的开头遇到以下问题:

 2013-01-24 07:54:31,271 INFO [org.apache.activemq.broker.BrokerService] Using Persistence Adapter: AMQPersistenceAdapter(activemq-data/localhost) 2013-01-24 07:54:31,281 INFO [org.apache.activemq.store.amq.AMQPersistenceAdapter] AMQStore starting using directory: activemq-data/localhost 2013-01-24 07:54:31,302 INFO [org.apache.activemq.kaha.impl.KahaStore] Kaha Store using data directory activemq-data/localhost/kr-store/state 2013-01-24 07:54:31,339 INFO [org.apache.activemq.store.amq.AMQPersistenceAdapter] Active data files: [] 2013-01-24 07:54:31,445 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Probably not using JRE 1.4: mx4j.tools.naming.NamingService 2013-01-24 07:54:31,450 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Failed to create local registry java.rmi.server.ExportException: internal error: ObjID already in use at sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:186) at sun.rmi.transport.Transport.exportObject(Transport.java:92) at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:247) at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:411) at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)  

看起来消息是成功生成和消费的(我之前发布的其他问题已经解决),但上面的例外令我担心。

编辑:在经纪人关闭期间,我现在也受到以下问题的欢迎:

 2013-01-25 08:40:17,486 DEBUG [org.apache.activemq.transport.failover.FailoverTransport] Transport failed with the following exception: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) at java.lang.Thread.run(Thread.java:722) 

您可以通过多种方式将代理嵌入到代码中,其中大部分都记录在此处 。 您可能希望尝试升级您的版本,因为您使用的内容似乎很老,因为它默认为现在已弃用的AMQ商店而不是较新的KahaDB商店。 您可能遇到问题,因为客户端线程之间的竞争使用了可能竞争在VM代理中创建的不同连接工厂。 如果在生产者上设置create = false选项并确保在此之后启动消费者线程可以解决问题,或者您可以提前创建VM代理并向两个线程添加create = false,这可能会起作用。

 BrokerService broker = new BrokerService(); // configure the broker broker.setBrokerName("localhost"); broker.setUseJmx(false); broker.start(); 

然后在客户端代码中通过此连接工厂配置进行附加。

 ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false"); 

当我运行你的代码时,我得到以下exception:

 javax.jms.JMSException: Could not connect to broker URL: tcp://localhost. Reason java.lang.IllegalArgumentException: port out of range:-1 

您的代理正在运行并侦听端口61616,因此任何尝试连接到代理的客户端都需要在其URL中具有该端口。

客户端代码尝试连接到localhost但未提及它必须连接的端口。 生产者和消费者代码都需要修复。

 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); 

 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); 

修复端口后,我能够运行您的代码。