IBM MQ消息侦听器

您有没有人知道如何使用IBM MQ创建消息监听器? 我知道如何使用JMS规范来完成它,但我不知道如何为IBM MQ做到这一点。 非常感谢任何链接或指针。

虽然前面的响应者有一个WMQ Java API,但WMQ也支持JMS,所以这里有一些资源可以帮助你入门。

看看这篇文章: IBM WebSphere开发者技术期刊:在WebSphere MQ V6.0上运行独立的Java应用程序

此外,如果您已经安装了完整的WMQ客户端而不仅仅是抓住了jar子,那么您将安装大量示例代码。 默认情况下,这些将存在于C:\ Program Files \ IBM \ WebSphere MQ \ tools \ _Jms或/ opt / mqm / samp中,具体取决于您的平台。

如果您需要WMQ客户端安装媒体,请在此处获取。 请注意,这是WMQ v7客户端,而不是v6客户端。 它与v6 QMgr兼容,但由于v6是截至2011年9月的寿命终止,你应该在v7客户端上进行新的开发,如果可能的话,还应该使用v7 QMgr。 如果双方都是v7,那么可以使用许多function和性能增强function。

如果需要,您可以在此处获取产品手册。

最后,请确保在获得JMSexception时打印链接的exception。 这不是WMQ的事情,而是JMS的事情。 Sun为JMSexception提供了一个多级数据结构,而真正有趣的部分通常是嵌套级别。 这不是什么大问题,可以用几行来实现:

try { . . code that might throw a JMSException . } catch (JMSException je) { System.err.println("caught "+je); Exception e = je.getLinkedException(); if (e != null) { System.err.println("linked exception: "+e); } else { System.err.println("No linked exception found."); } } 

这有助于确定JMS错误与传输错误之间的差异。 例如,JMS安全性错误可能是WMQ 2035,或者它可能是JSSE配置,或者应用程序可能无法访问文件系统中的某些内容。 其中只有一个值得花费大量时间来挖掘WMQ错误日志,并且只有通过打印链接的exception才能判断它是否是那个。

查看IBM Help: 编写WebSphere MQ基础Java应用程序

IBM有一个用于与队列交互的API。 这是他们的样本:

 import com.ibm.mq.*; // Include the WebSphere MQ classes for Java package public class MQSample { private String qManager = "your_Q_manager"; // define name of queue // manager to connect to. private MQQueueManager qMgr; // define a queue manager // object public static void main(String args[]) { new MQSample(); } public MQSample() { try { // Create a connection to the queue manager qMgr = new MQQueueManager(qManager); // Set up the options on the queue we wish to open... // Note. All WebSphere MQ Options are prefixed with MQC in Java. int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT ; // Now specify the queue that we wish to open, // and the open options... MQQueue system_default_local_queue = qMgr.accessQueue("SYSTEM.DEFAULT.LOCAL.QUEUE", openOptions); // Define a simple WebSphere MQ message, and write some text in UTF format.. MQMessage hello_world = new MQMessage(); hello_world.writeUTF("Hello World!"); // specify the message options... MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the // defaults, // same as MQPMO_DEFAULT // put the message on the queue system_default_local_queue.put(hello_world,pmo); // get the message back again... // First define a WebSphere MQ message buffer to receive the message into.. MQMessage retrievedMessage = new MQMessage(); retrievedMessage.messageId = hello_world.messageId; // Set the get message options... MQGetMessageOptions gmo = new MQGetMessageOptions(); // accept the defaults // same as MQGMO_DEFAULT // get the message off the queue... system_default_local_queue.get(retrievedMessage, gmo); // And prove we have the message by displaying the UTF message text String msgText = retrievedMessage.readUTF(); System.out.println("The message is: " + msgText); // Close the queue... system_default_local_queue.close(); // Disconnect from the queue manager qMgr.disconnect(); } // If an error has occurred in the above, try to identify what went wrong // Was it a WebSphere MQ error? catch (MQException ex) { System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode); } // Was it a Java buffer space error? catch (java.io.IOException ex) { System.out.println("An error occurred whilst writing to the message buffer: " + ex); } } } // end of sample 

我不确定IBMjar子是否位于基础Maven回购中。 我知道在过去我必须从本地IBM安装中提取它们并将它们放在本地SVN仓库中。 我正在使用以下jar子:

  com.ibm com.ibm.mq 5.3.00 compile   com.ibm com.ibm.mq.pcf 5.3.00 compile   com.ibm com.ibm.mqbind 5.3.00 compile   com.ibm com.ibm.mqjms 5.3.00 compile  

看看上面提供的示例。

特别是在线

 MQGetMessageOptions gmo = new MQGetMessageOptions(); system_default_local_queue.get(retrievedMessage, gmo); 

您可以将get配置为在抛出MQRC_NO_MSG_AVAILABLEexception之前等待指定的时间。 或者你可以永远等待。

 gmo.waitInterval= qTimeout; gmo.options = MQC.MQGMO_WAIT 

因此,您可以创建一个持续查找新消息的线程,然后将它们传递给处理程序。 获取和放置不需要在同一个线程甚至应用程序中。

我希望这有助于回答你的问题。

除现有答案外,重要的一点是:JMS提供MessageListener ,这是一个允许您以异步回调方式接收消息的类。

本机API 没有相同的function! 你必须反复调用get(...)

在获取消息之前的循环中,您可以指定如下

 gmo.options = MQC.MQGMO_WAIT gmo.waitInterval = MQConstants.MQWI_UNLIMITED; 

这使循环将等待,直到队列中有消息。 对我来说,它类似于MessageListerner

以防万一有人像我一样使用MQ Listener google stackoverflow …由于JMS的实现,可能不是答案,但这正是我所寻求的。 像这样的东西:

 MQQueueConnectionFactory cf = new MQQueueConnectionFactory(); MQQueueConnection conn = (MQQueueConnection)cf.createQueueConnection(); MQQueueSession session = (MQQueueSession)conn.createSession(false, 1); Queue queue = session.createQueue("QUEUE"); MQQueueReceiver receiver = (MQQueueReceiver)session.createReceiver(queue); receiver.setMessageListener(new YourListener()); conn.start(); 

YourListener应该实现MessageListener接口,您将收到onMessage(Message msg)方法的消息。

您好,这是使用IBM MQ的消息侦听器的工作示例。 在这里我也使用spring来创建bean等……

 package queue.app; import javax.annotation.PostConstruct; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueReceiver; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.ibm.mq.jms.MQQueue; import com.ibm.mq.jms.MQQueueConnectionFactory; import com.ibm.msg.client.wmq.WMQConstants; @Component public class QueueConsumer implements MessageListener{ private Logger logger = Logger.getLogger(getClass()); MQQueueConnectionFactory qcf = new MQQueueConnectionFactory(); QueueConnection qc; Queue queue; QueueSession queueSession; QueueReceiver qr; @Value("${jms.hostName}") String jmsHost; @Value("${jms.port}") String jmsPort; @Value("${jms.queue.name}") String QUEUE_NAME; @Value("${jms.queueManager}") String jmsQueueMgr; @Value("${jms.username}") String jmsUserName; @Value("${jms.channel}") String jmsChannel; @PostConstruct public void init() throws Exception{ qcf.setHostName (jmsHost); qcf.setPort (Integer.parseInt(jmsPort)); qcf.setQueueManager (jmsQueueMgr); qcf.setChannel (jmsChannel); qcf.setTransportType (WMQConstants.WMQ_CM_CLIENT); qc = qcf.createQueueConnection (); queue = new MQQueue(QUEUE_NAME); qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE); queueSession = qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE); qr = queueSession.createReceiver(queue); qr.setMessageListener(this); qc.start(); } @Override public void onMessage(Message message) { logger.info("Inside On Message..."); long t1 = System.currentTimeMillis(); logger.info("Message consumed at ...."+t1); try{ if(message instanceof TextMessage) { logger.info("String message recieved >> "+((TextMessage) message).getText()); } }catch(Exception e){ e.printStackTrace(); } } } 

以下是我的依赖…

  com.sun.messaging.mq fscontext 4.2 test   com.ibm jms 1.0   org.springframework spring-jms 3.2.17.RELEASE   com.ibm com.ibm.mq 1.0   com.ibm com.ibm.mq.allclient 1.0   com.ibm com.ibm.mq.jmqi 1.0   com.ibm com.ibm.mqjms 1.0