如何将消息发送到托管在IBM MQ集群中的不同队列管理器和主机名中的其他队列

我的基于Apache-camel的应用正在消耗来自IBM队列之一的消息,例如下面是连接工厂的详细信息

hostname=host1000 QManager=QM1000 Port="some port" Channel="common channel" 

Camel流程使用和处理并将响应发送到来自消息头的ReplyQueue。

  from(wmq:queue:) .bean("processBean") .bean("beanToSendMsgToReplyQueue") 

在驼峰标题中,我得到了JMSReplyQueue。 您可以看到它是一个不同的队列管理器,此队列管理器来自不同的主机,但在集群环境中。

 JMSReplyTo = queue://QM1012/TEST.REPLY?targetClient=1 

队列管理器也介于两者之间。 喜欢

 queue:////? 

以下是我在发送邮件时获得的例外情况。

 ERROR o.apache.camel.processor.DefaultErrorHandler:215 - Failed delivery for (MessageId: ID-xxxxxxxxx-0-4 on ExchangeId: ID-xxxxxx-42443-1492594420697-0-1). Exhausted after delivery attempt: 1 caught: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: wmq://queue://QM1012/TEST.REPLY?targetClient=1 due to: Failed to resolve endpoint: wmq://queue://TAP2001R5/TEST?targetClient=1 due to: There are 1 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. Unknown parameters=[{targetClient=1}]. Processed by failure processor: FatalFallbackErrorHandler[Pipeline[[Channel[sendTo(Endpoint[wmq://queue:BACKOUT_Q])], Channel[DelegateSync[com.xxx.yyy.listener.XXXOnExceptionProcessor@21c66ee4]], Channel[Stop]]]] 

任何人都可以帮我把消息发送到不同的主机管理器队列,这些队列管理器队列在不同的主机中,但都在同一个集群中。 队列管理器名称也在字符串的中间,所以如何解决这个问题。 如果您需要更多详细信息,请与我们联系。

更新-1:尝试使用相同的队列管理器并且没有参数

JMSReplyTo = queue://QM1000/QUEUE_V1低于我得到的exception

 org.springframework.jms.InvalidDestinationException: JMSWMQ2008: Failed to open MQ queue 'QM1000/QUEUE_V1'.; nested exception is com.ibm.msg.client.jms.DetailedInvalidDestinationException: JMSWMQ2008: Failed to open MQ queue 'QM1000/QUEUE_V1'. JMS attempted to perform an MQOPEN, but WebSphere MQ reported an error. Use the linked exception to determine the cause of this error. Check that the specified queue and queue manager are defined correctly.; nested exception is com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2189' ('MQRC_CLUSTER_RESOLUTION_ERROR'). 

更新2

我可以使用普通的javax.jms。*和com.ibm.mq.jms。* api将消息发送到JMSReplyTo,但不能通过Apache camel发送。 来自Camel用户/开发人员组的任何人都可以帮助我使用camel组件处理相同的事情。

 @Override public void process(Exchange exchange) throws Exception { QueueConnection m_connection = this.connectionFactory.createQueueConnection(); //m_connection.start(); boolean transacted = false; QueueSession session = m_connection.createQueueSession(transacted, QueueSession.AUTO_ACKNOWLEDGE); TextMessage outMessage = session.createTextMessage(); outMessage.setText(exchange.getIn().getBody()); MQQueue mq = new MQQueue( "queue://QM1012/TEST.REPLY"); QueueSender queueSender = session.createSender((MQQueue) mq); queueSender.send(outMessage); /* producerTemplate.send("wmq:" + "queue://QM1012/TEST.REPLY", exchange); */ } 

您希望与两个不同的队列管理器进行通信,因此您需要相应地定义两个Camel JMS组件实例。 Camel无法神奇地了解QM1000或QM1012的含义以及如何访问QM。

首先需要两个WMQ QM的两个JMS连接工厂实例。 如何获取这些取决于您的执行环境。 在JEE服务器上,可以在配置后使用JNDI访问连接池。 查看有关如何设置JMS池的appserver文档。 如果您想独立运行,请查看Spring JMS连接缓存或Atomikos(如果您想要XA事务)。 假设QM1000的CF是sourceCF,而QM1012的CF是targetCF。

现在,您可以定义两个Camel JMS组件实例,每个QM一个实例。 将连接工厂注入JMS组件(.setConnectionFactory(…))。 假设您定义了一个ID为“jmssource”的Camel JMS组件,请注入sourceCF。 在JMS组件id“jmstarget”中注入targetCF。 如何做到这一点取决于您的环境(JEE / CDI,Spring,普通Java)。 看看stackoverflow,有例子。

您现在可以使用以下语法在Camel路由上指定Camel JMS生成器和使用者:

 .from("jmssource:INPUT_QUEUE") ... (do some processing) ... .to("jmstarget:QUEUE_V1") 

您不能使用Camel的JMS回复逻辑(使用JMSReplyTo标头)来回复另一个队列管理器。 我认为JMS标准不允许这样做。 您需要通过发送到回复队列来明确地做出回复。

要设置targetClient选项,目标解析器可能很有用:

 import org.springframework.jms.support.destination.DynamicDestinationResolver; import org.springframework.jms.support.destination.DestinationResolver; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Session; import com.ibm.mq.jms.JMSC; import com.ibm.mq.jms.MQDestination; public class WMQDestinationResolver extends DynamicDestinationResolver implements DestinationResolver { private int targetClient = JMSC.MQJMS_CLIENT_JMS_COMPLIANT; public void setTargetClient(int targetClient) { this.targetClient = targetClient; } public Destination resolveDestinationName(Session session, String destinationName, boolean isPubSubDomain) throws JMSException { Destination destination = super.resolveDestinationName(session, destinationName, isPubSubDomain); if (destination instanceof MQDestination) { MQDestination mqDestination = (MQDestination) destination; mqDestination.setTargetClient(targetClient); } return destination; } } 

首先,感谢大家的支持。 我的用例如下(上面也有)。

使用Apache Camel连接到MQ主机(主机名,队列管理器,端口,通道),并使用来自队列的消息,该队列属于同一主机/ Qmanager。 消息来自replyToQueue(JMSReplyTo)标头值。 ReplyToQueue(JMSReplyTo)的值如下

例如

queue://Different_QueueManager_in_Cluster/TEST.REPLY?mdReadEnabled=true&messageBody=0&mdWriteEnabled=true&XMSC_WMQ_REPLYTO_STYLE=1&targetClient=1

现在的问题是,当连接对象连接到上述主机和队列管理器时,如何将回复消息发送到具有不同队列管理器的不同队列。

注意:所有MQ队列管理器都位于群集环境中。

解决方案1:例如

 form(wmq:queue:INPUT_MSG_Q) .bean(requestProcessor) .bean(responseProcessor) 

Apache Camel默认处理ReplyToQ(JMSReplyTo)。 如果您不想向ReplyToQ(JMSReplyTo)发送回复,请在使用时使用disableReplyTo=true

注意:在使用相同的连接/连接工厂发送到queue://Different_QueueManager_in_Cluster/TEST.REPLY ,MQ集群将检查该消息是否必须转到具有Cluster中指定队列的指定队列管理器。 关于以下参数?mdReadEnabled=true&messageBody=0&mdWriteEnabled=true&XMSC_WMQ_REPLYTO_STYLE=1&targetClient=1 ,Apache Camel能够在不使用任何第3方解析器的情况下自动解析,同时自动回复JMSReplyTo

解决方案2:

使用disableReplyTo=true禁用自动回复,并使用普通javax.jms。*和com.ibm.mq.jms。* api从头部获取队列详细信息并发送消息。 代码如下。

 @Override public void process(Exchange exchange) throws Exception { QueueConnection m_connection = this.connectionFactory.createQueueConnection(); //m_connection.start(); boolean transacted = false; QueueSession session = m_connection.createQueueSession(transacted, QueueSession.AUTO_ACKNOWLEDGE); TextMessage outMessage = session.createTextMessage(); outMessage.setText(exchange.getIn().getBody()); MQQueue mq = new MQQueue( "queue://Different_QueueManager_in_Cluster/TEST.REPLY"); QueueSender queueSender = session.createSender((MQQueue) mq); queueSender.send(outMessage); /* producerTemplate.send("wmq:" + "queue://Different_QueueManager_in_Cluster/TEST.REPLY", exchange); */ } 

对于参数,使用@Sebastian Brandt提到的目的地解析器(post)