Tag: amqp

如何从Qpid JMS(qpid-jms-client-0.11.1.jar)发送/接收来自Azure Service Bus的消息?

我目前正在研究如何使用Qpid JMS(qpid-jms-client-0.11.1.jar)连接到Azure Service Bus。 我创建了一个Demo Java应用程序SimpleSenderReceiver,它使用以下指南( #link1 )连接到已配置的Azure Service Bus。 此代码似乎使用Qpid JMS客户端(版本0.32)的“非常”旧版本。 我现在正试图让它与Qpid JMS的最新稳定版本(qpid-jms-client-0.11.1.jar)一起使用,到目前为止我还没有成功。 通过Qpid JMS 0.11.1的文档#link2 ,您可以看到属性文件中的连接方法与0.32版本中的连接方式不同。 如何在属性文件中设置正确的连接amqp连接字符串 ? 如何设置de Qpid JMS-Azure Service Bus Demo以使用最新的Qpid稳定版本 ? 我一直在运行以下问题: 731 [AmqpProvider:(1):[amqps://example-bus.servicebus.windows.net?transport.connectTimeout=60000]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder – Best match for SASL auth was: SASL-PLAIN javax.jms.JMSException: Idle timeout value specified in connection OPEN (‘30000 ms’) is not supported. Minimum idle […]

Spring-AMQP重新排队消息计数基于JVM吗?

我正在寻找rabbitmq文档,似乎rabbitmq不处理消息重新传递计数。 如果我要手动ACK / NACK消息,我需要将重试计数保留在内存中(例如,通过使用correlationId作为映射中的唯一键),或者通过在消息中设置我自己的标头,并重新传送它(因此把它放在队列的末尾) 然而,这是弹簧处理的情况。 具体来说,我指的是RetryInterceptorBuilder.stateful()。maxAttempts(x)。 这个计数是特定于JVM的,还是以某种方式操纵消息? 例如,我有一个部署到2台服务器的Web应用程序,maxAttempts设置为5.总重新传输计数是否可能是5-9,具体取决于重新传递和重新处理的顺序。服务器?

从Java / Spring中检索RabbitMQ队列中未确认消息的数量

有没有办法返回未确认的消息数? 我使用此代码来获取队列中的消息数: DeclareOk declareOk = amqpAdmin.getRabbitTemplate().execute( new ChannelCallback() { public DeclareOk doInRabbit(Channel channel) throws Exception { return channel.queueDeclarePassive(name); } }); return declareOk.getMessageCount(); 但我想知道未确认消息的数量。 我已经看到RabbitMQ管理工具包含了这些信息(对于每个队列,它给出了Ready / Unacked和Total消息的数量),我想必须有一种从Java / Spring中检索它的方法。 谢谢 UPDATE Oks,似乎没有办法以编程方式完成,因为配置/队列的列表不是AMPQ的一部分。 可以启用管理插件并查询有关队列的REST Web服务(以及其他内容)。 更多信息: http://www.rabbitmq.com/management.html

使用RabbitMQ(Java客户端),有没有办法确定消费期间网络连接是否关闭?

我正在使用Java客户端在RHEL 5.3上使用RabbitMQ。 我有2个节点(机器)。 Node1使用Java帮助程序类QueueingConsumer消耗来自Node2上队列的消息。 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(“MyQueueOnNode2”, noAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); … Process message – delivery.getBody() } 如果接口在Node1或Node2上关闭(例如ifconfig eth1 down),则客户端(上面)永远不会知道网络不再存在。 RabbitMQ是否在Java客户端上提供某种类型的配置,可用于确定连接是否已消失。 在Node2上关闭RabbitMQ服务器将触发ShutdownSignalException,可以捕获该应用程序并且应用程序可以进入重新连接循环。 但是,关闭接口不会导致任何类型的exception发生,因此代码将永远等待consumer.nextDelivery()。 我也尝试过使用此调用的超时版本。 例如 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(“MyQueueOnNode2”, noAck, consumer); int timeout_ms = 30000; while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(timeout_ms); if (delivery == […]

由于SocketException,RabbitMQ新连接被拒绝

在尝试创建与另一台服务器上运行的rabbitmq的新连接时,我收到以下错误: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:406) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533) Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketException: Connection reset at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) Caused by: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(Unknown Source) at java.net.SocketInputStream.read(Unknown Source) at java.io.BufferedInputStream.fill(Unknown Source) at java.io.BufferedInputStream.read(Unknown Source) at java.io.DataInputStream.readUnsignedByte(Unknown Source) at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) […]

RabbitMQ:消息仍为“未确认”

我的Java应用程序向RabbitMQ交换发送消息,然后交换重定向消息到绑定队列。 我在RabbitMQ中使用Springframework AMQP java插件。 问题:消息进入队列,但它保持“未确认”状态,它永远不会变为“就绪”。 可能是什么原因?

没有连接的AMQP / RabbitMQ通道何时死亡?

我有一个简单的RabbitMQ测试程序随机排队消息,另一个读取它们,都使用Spring-AMQP。 如果消费者死亡(例如,在没有机会关闭其连接或通道的情况下终止进程),则任何未确认的消息似乎永远不会被确认。 我已经看到了许多引用(例如这个问题 ),它表示当通道没有连接时它会死亡,并且将重新传送剩余的未包装的消息。 这不是我看到的行为 – 相反,我得到了一个越​​来越多的标记为IDLE的频道列表,以及越来越多的标记正在运行但没有活动的连接列表。 一旦进程被杀死,是否需要一些配置来注意连接已经死亡? 编辑:我在VirtualBox VM中运行rabbitmq服务器,显然无法通过NAT正确管理死入站连接。 这对于直接在物理主机上运行的mq服务器来说效果很好。

如何使用Protocol Buffers对Map 进行编码?

我正在尝试使用Protocol Buffers进行消息序列化。 我的消息格式应该包含Map 条目……但是如何编写.proto定义? 据我所知,Protocol Buffers没有内置Map类型。 我可以使用重复字段来模拟它。 但我遇到的最大问题是,您需要定义所有类型。 我希望我的信息灵活,所以我不能指定类型。 有任何想法吗?

RabbitMQ:快速的生产者和缓慢的消费者

我有一个应用程序使用RabbitMQ作为消息队列在两个组件之间发送/接收消息:发送方和接收方。 发件人以非常快的方式发送消息。 接收器接收消息然后执行一些非常耗时的任务(主要是针对非常大的数据大小的数据库写入)。 由于接收器需要很长时间才能完成任务,然后检索队列中的下一条消息,因此发送方将继续快速填满队列。 所以我的问题是:这会导致消息队列溢出吗? 消息使用者如下所示: public void onMessage() throws IOException, InterruptedException { channel.exchangeDeclare(EXCHANGE_NAME, “fanout”); String queueName = channel.queueDeclare(“allDataCase”, true, false, false, null).getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, “”); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(” [x] Received ‘” + message + “‘”); JSONObject […]

无法使用基于JMS的代码和amqp 1.0访问ActiveMQ

我正在尝试使用AMQP 1.0连接到ActiveMQ代理,但我想在我的应用程序代码中使用JMS。 我对使用JMS感兴趣主要是因为我希望开发人员能够使用他们已经熟悉的API。 我在localhost上运行ActiveMQ 5.14.0,代码如下: public static void main(String[] args) throws JMSException, InterruptedException { Connection connection = null; try { // Producer ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(“amqp://localhost:5672”); connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(“customerTopic”); // Publish MessageProducer producer = session.createProducer(topic); for ( int i = 0; i < 10; […]