如何从Qpid JMS(qpid-jms-client-0.11.1.jar)发送/接收来自Azure Service Bus的消息?

我目前正在研究如何使用Qpid JMS(qpid-jms-client-0.11.1.jar)连接到Azure Service Bus。

我创建了一个Demo Java应用程序SimpleSenderReceiver,它使用以下指南( #link1 )连接到已配置的Azure Service Bus。 此代码似乎使用Qpid JMS客户端(版本0.32)的“非常”旧版本。 我现在正试图让它与Qpid JMS的最新稳定版本(qpid-jms-client-0.11.1.jar)一起使用,到目前为止我还没有成功。 通过Qpid JMS 0.11.1的文档#link2 ,您可以看到属性文件中的连接方法与0.32版本中的连接方式不同。

  • 如何在属性文件中设置正确的连接amqp连接字符串
  • 如何设置de Qpid JMS-Azure Service Bus Demo以使用最新的Qpid稳定版本

我一直在运行以下问题:

731 [AmqpProvider:(1):[amqps://example-bus.servicebus.windows.net?transport.connectTimeout=60000]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN javax.jms.JMSException: Idle timeout value specified in connection OPEN ('30000 ms') is not supported. Minimum idle timeout is '60000' ms. TrackingId:238849ced1em4cd3a093261372f4fc1e_G21, SystemTracker:gateway6, Timestamp:10/27/2016 8:16:23 AM [condition = amqp:internal-error] at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:150) at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:105) at org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.remotelyClosed(AmqpAbstractResource.java:147) at org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.processRemoteClose(AmqpAbstractResource.java:251) at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:771) at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1900(AmqpProvider.java:90) at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:699) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 

我有以下属性文件servicebus.properties:

 # servicebus.properties - sample JNDI configuration # Register a ConnectionFactory in JNDI using the form: # connectionfactory.[jndi_name] = [ConnectionURL] connectionfactory.myFactoryLookup = amqps://example-open-bus.servicebus.windows.net?jms.username=somePolicy&jms.password=aM2k3PaZY5jdIkmGKm7G%2FcH%2BUFQaFAgHIYc3dSsuiLI%3D&transport.connectTimeout=6000 # Register some queues in JNDI using the form # queue.[jndi_name] = [physical_name] # topic.[jndi_name] = [physical_name] queue.myQueueLookup = queue1 

我有流动的类SimpleSenderReceiver.java:

 package com.demo.AzureTest; import javax.jms.*; import javax.naming.Context; import javax.naming.InitialContext; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.Hashtable; import java.util.Random; public class SimpleSenderReceiver implements MessageListener { private static boolean runReceiver = false; private Connection connection; private Session sendSession; private Session receiveSession; private MessageProducer sender; private MessageConsumer receiver; private static Random randomGenerator = new Random(); public SimpleSenderReceiver() throws Exception { // Configure JNDI environment Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); env.put(Context.PROVIDER_URL, "C://PATH//servicebus.properties"); Context context = new InitialContext(env); // Look up ConnectionFactory and Queue ConnectionFactory cf = (ConnectionFactory) context.lookup("myFactoryLookup"); System.out.println("lookup: " + context.lookup("myFactoryLookup")); System.out.println("cf:"+cf); Destination queue = (Destination) context.lookup("myQueueLookup"); System.out.println("queue:"); // Create Connection connection = cf.createConnection(); System.out.println("connection :"+connection); // // Create sender-side Session and MessageProducer sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); System.out.println("Session open."); sender = sendSession.createProducer(queue); System.out.println(sender.getDestination()); System.out.println("sender:"+sender); if (runReceiver) { // Create receiver-side Session, MessageConsumer,and MessageListener receiveSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); receiver = receiveSession.createConsumer(queue); receiver.setMessageListener(this); connection.start(); } } public static void main(String[] args) { try { if ((args.length > 0) && args[0].equalsIgnoreCase("sendonly")) { runReceiver = false; } SimpleSenderReceiver simpleSenderReceiver = new SimpleSenderReceiver(); System.out.println("Press [enter] to send a message. Type 'exit' + [enter] to quit."); BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in)); while (true) { String s = commandLine.readLine(); if (s.equalsIgnoreCase("exit")) { simpleSenderReceiver.close(); System.exit(0); } else { simpleSenderReceiver.sendMessage(); } } } catch (Exception e) { e.printStackTrace(); } } private void sendMessage() throws JMSException { TextMessage message = sendSession.createTextMessage(); message.setText("Hello from SIS Test AMQP message from Java JMSaaa"); long randomMessageID = randomGenerator.nextLong() >>>1; message.setStringProperty("TenantId", "klant"); message.setStringProperty("EventType", "bericht"); message.setStringProperty("EventTypeVersion", "1.0"); message.setStringProperty("MessageType", "DocumentMessage"); message.setStringProperty("OperationType", "Create"); message.setStringProperty("SourceSystem", "sis_sender"); message.setStringProperty("EnterpriseKey", "sis_sender-klant-bericht"); message.setJMSMessageID("ID:" + randomMessageID); sender.send(message); System.out.println("Sent message with JMSMessageID = " + message.getJMSMessageID()); System.out.println("Sent message with Text = " + message.getText()); } public void close() throws JMSException { connection.close(); } public void onMessage(Message message) { try { System.out.println("Received message with JMSMessageID = " + message.getJMSMessageID()); TextMessage txtmessage = (TextMessage) message; System.out.println("Received message with Text = " + txtmessage.getText()); message.acknowledge(); } catch (Exception e) { e.printStackTrace(); } } } 

Maven依赖:

    org.apache.qpid qpid-jms-client 0.11.1    org.slf4j slf4j-simple 1.6.2   

更新

我已经进一步,但仍然有点卡住。 更新到connectionfactory属性:

 connectionfactory.myFactoryLookup = connectionfactory.myFactoryLookup = amqps://example-open-bus.servicebus.windows.net?amqp.idleTimeout=150000&jms.username=somePolicy&jms.password=aM2k3PaZY5jdIkmGKm7G%2FcH%2BUFQaFAgHIYc3dSkuiLI%3D 

我现在得到以下stacktrace:

 842 [AmqpProvider:(1):[amqps://example-open-bus-bus.servicebus.windows.net]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN 1014 [AmqpProvider:(1):[amqps://example-open-bus-bus.servicebus.windows.net]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:543efe98-3ecc-485e-9f7f-3046c40db0cb:1 connected to remote Broker: amqps://example-open-bus-bus.servicebus.windows.net 1301 [AmqpProvider:(1):[amqps://example-open-bus-bus.servicebus.windows.net]] WARN org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder - Open of resource:(JmsProducerInfo { ID:546efe78-3ecc-485d-9f6f-3065c40db1ce:1:1:1, destination = klant }) failed: Attempted to perform an unauthorized operation. TrackingId:2950b1ed7a0d4e0a97b0k32b25434ac2_G10, SystemTracker:gateway6, Timestamp:10/27/2016 1:36:21 PM [condition = amqp:unauthorized-access] Caught exception, exiting. javax.jms.JMSSecurityException: Attempted to perform an unauthorized operation. TrackingId:2890b0ed9a0d4e0a97b1k32b25434ac2_G10, SystemTracker:gateway6, Timestamp:10/27/2016 1:36:21 PM [condition = amqp:unauthorized-access] at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:129) at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:105) at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:167) at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:113) at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:795) at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1900(AmqpProvider.java:90) at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:699) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 

较新的客户端默认启用AMQP心跳/空闲超时,而较旧的客户端则不启用。 客户端设置默认的60秒超时,反过来这意味着它在连接到服务器时在其AMQP Open帧中请求30秒(30000ms)空闲超时值,这符合规范定义的行为(其中对等体通告其实际超时的一半)帮助避免虚假超时)。

ServiceBus拒绝30000ms开放帧值,并指示它需要至少60000ms的值(或者可能也是0,这意味着它被禁用)。 要实现此目的,您需要将客户端配置为至少设置为120000ms,这将导致所需的最小60000ms开放帧空闲超时值ServiceBus正在强制执行(或者再次,可能通过设置它来禁用客户端超时处理到0)。

您可以使用“amqp.idleTimeout”URI选项执行此操作,根据http://qpid.apache.org/releases/qpid-jms-0.11.1/docs/index.html#amqp-configuration-options

编辑:我看到你在我输入答案的同时想到了这一点。

新的例外是来自ServiceBus,说你没有被授权做你正在尝试的事情。 它应该很容易从源头捕获exception并确定什么。

你的URI似乎很好(虽然我假设你的用户名实际上不是’somePolicy’而且double connectionfactory.myFactoryLookup = connectionfactory.myFactoryLookup =在开始时是一个c&p错误)。 我没有亲自使用ServiceBus的客户端,但是我看到了各个人的问题,所以我不知道一个特定的问题直接阻止他们一起工作。

我遇到了上面提到的相同的安全问题并花了一段时间跟踪它,所以对于其他任何人我的问题是由包含+字符的user.password查询参数中指定的键值引起的。

在值的末尾通常有一个=我在字符串中编码为%3D并且我将+编码为%2B但是如果你在实例化ConnectionFactory的点处放置一个断点并查看密码属性你会看到=正确未编码,但+已被剥离,是一个空间,因此未经授权的访问问题。

我的解决方法只是重新生成Azure中的主键,因此它没有一个+ (yuk),但它有效。 可能是AQPID库中的一个错误。