Tag: spring amqp

我们可以在Spring批处理中使用AmqpItemReader和AmqpItermWriter作为请求/回复用例吗?

我见过AmqpJobSample http://docs.spring.io/spring-batch/spring-batch-samples/#AmqpJobFunctionalTests 它使用AmqpItemReader从rabbitmq消息队列中读取,使用消息处理程序处理它,然后使用AmqpItemWriter写回队列。 我的用例是,从我实现的listItemReader中读取项目,然后将项目发送到消息队列,消息队列将由运行在不同服务器(rabbitmq集群)上的消费者处理,并且这些消费者将回复是否成功处理了iterm每个物品。 然后我将从消息队列(在itemWriter中)读取它,如果检查项目处理的状态,如果成功则返回itemWriter的状态,否则将抛出exception(如果失败)。 我的应用程序是在PHP Symfony框架中,但我正在介绍批处理的弹簧批处理,目前是shell脚本。 我使用spring批处理来生成消息,但我的消费者使用的是PHP,它运行在不同的服务器上。 我只想将id作为消息传递,并希望使用不同服务器上的PHP使用者来处理id。 如何进行远程分块或分区? http://docs.spring.io/spring-batch/reference/html/scalability.html 是否有任何样本用于使用消息传递进行远程分块和分区,例如RabbiyMq? 我怎么能使用AmqpItemReader和AmqpItemWriter? 我需要使用请求/回复吗? 我可以设置回复队列吗? 什么是最好的方法呢?

通知通道关闭到实现org.springframework.amqp.rabbit.connection.ConnectionListener的类

我们使用Spring-AMQP提供的ConnectionListener接口在底层连接上保留一个选项卡。 通过自动恢复创建通道,心跳设置为10分钟(针对某些产品需求)。 我的观察是即使在底层的rabbitMQ死后,connectionListener.onClose()方法也不会被调用近10分钟。 我们还对API执行运行状况检查,并使用connectionListener.isOpen()方法确定连接的状态。 并且因为SimpleConnection类中的这个代码块 @Override public boolean isOpen() { return delegate != null && (delegate.isOpen() || this.delegate.getClass().getSimpleName().contains(“AutorecoveringConnection”)); } 当连接自动恢复时,始终返回true。 因为Health API在连接中断后10分钟内没有得知连接失败。 是否有任何建议的方法来通知ConnectionListener通道关闭,因为isOpen方法显然无法满足需求? 正在实现ShutDownlistener的方式吗? 由于我们没有访问connectionListerner中的通道而无法直接执行connection.addConnectionListerner(this)。 从connectionFactory shutdownCompleted方法是否可以调用onClose或connectionListener上的任何其他方法来通知它关闭? 还有其他想法吗?

Spring-AMQP重新排队消息计数基于JVM吗?

我正在寻找rabbitmq文档,似乎rabbitmq不处理消息重新传递计数。 如果我要手动ACK / NACK消息,我需要将重试计数保留在内存中(例如,通过使用correlationId作为映射中的唯一键),或者通过在消息中设置我自己的标头,并重新传送它(因此把它放在队列的末尾) 然而,这是弹簧处理的情况。 具体来说,我指的是RetryInterceptorBuilder.stateful()。maxAttempts(x)。 这个计数是特定于JVM的,还是以某种方式操纵消息? 例如,我有一个部署到2台服务器的Web应用程序,maxAttempts设置为5.总重新传输计数是否可能是5-9,具体取决于重新传递和重新处理的顺序。服务器?

Group在RabbitMQ中收到消息,最好使用Spring AMQP?

我正在接收来自服务(S)的消息,该服务将每个单独的属性更改作为单独的消息发布到实体。 一个人为的例子是这样的实体: Person { id: 123 name: “Something”, address: {…} } 如果在同一事务中更新了名称和地址,则(S)将发布两条消息, PersonNameCorrected和PersonMoved 。 问题出在接收方,我正在存储此Person实体的投影,每个属性更改都会导致写入数据库。 因此,在这个例子中,将有两次写入数据库,但如果我可以在短时间内批量处理消息并按id分组,那么我只需要对数据库进行一次写入。 如何在RabbitMQ中处理这个问题? Spring AMQP是否提供了更简单的抽象? 请注意,我已经简要介绍了预取,但我不确定这是否可行。 如果我理解正确的话,预取也是基于连接的。 我试图在每个队列的基础上实现这一点,因为如果批处理(因此增加了延迟)是要走的路,我不想将这种延迟添加到我的服务所消耗的所有队列中(但仅限于那些需要“group-by-id”function)。

从Java / Spring中检索RabbitMQ队列中未确认消息的数量

有没有办法返回未确认的消息数? 我使用此代码来获取队列中的消息数: DeclareOk declareOk = amqpAdmin.getRabbitTemplate().execute( new ChannelCallback() { public DeclareOk doInRabbit(Channel channel) throws Exception { return channel.queueDeclarePassive(name); } }); return declareOk.getMessageCount(); 但我想知道未确认消息的数量。 我已经看到RabbitMQ管理工具包含了这些信息(对于每个队列,它给出了Ready / Unacked和Total消息的数量),我想必须有一种从Java / Spring中检索它的方法。 谢谢 UPDATE Oks,似乎没有办法以编程方式完成,因为配置/队列的列表不是AMPQ的一部分。 可以启用管理插件并查询有关队列的REST Web服务(以及其他内容)。 更多信息: http://www.rabbitmq.com/management.html

spring amqp rabbitmq MessageListener无效

我想使用spring amqp使用rabbitmq,下面是我的配置。 这是一个简单的Message Listener类, import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class ImportMessageListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println(“consumer output: ” + message); } } 这是生产者(spring批次的itemWriter), public class ImportItemWriter implements ItemWriter { private AmqpTemplate template; public AmqpTemplate getTemplate() { return template; } public void setTemplate(AmqpTemplate template) { this.template = template; } public void […]

没有连接的AMQP / RabbitMQ通道何时死亡?

我有一个简单的RabbitMQ测试程序随机排队消息,另一个读取它们,都使用Spring-AMQP。 如果消费者死亡(例如,在没有机会关闭其连接或通道的情况下终止进程),则任何未确认的消息似乎永远不会被确认。 我已经看到了许多引用(例如这个问题 ),它表示当通道没有连接时它会死亡,并且将重新传送剩余的未包装的消息。 这不是我看到的行为 – 相反,我得到了一个越​​来越多的标记为IDLE的频道列表,以及越来越多的标记正在运行但没有活动的连接列表。 一旦进程被杀死,是否需要一些配置来注意连接已经死亡? 编辑:我在VirtualBox VM中运行rabbitmq服务器,显然无法通过NAT正确管理死入站连接。 这对于直接在物理主机上运行的mq服务器来说效果很好。

spring amqp-outbound网关从不同的thead生成回复(如jms-outbound网关)

问题陈述: Spring amqp-outbound网关从不同的线程产生回复(类似于jms-outbound网关,具有不同的队列,使用相关键关联请求/响应)。 无法将此消息与此示例相关联。 Spring集成 配置 @Bean public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){ final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory); template.setQueue(“reply_queue”); return template; } @Bean public Binding binding(){ return BindingBuilder.bind(this.queue()).to(this.exchange()).with(“request_exchange_queue”); } @Bean public DirectExchange exchange(){ return new DirectExchange(“request_exchange”); } @Bean public Queue queue(){ return new Queue(“request_queue”, true, false, true); } @Bean public Binding bindingReply(){ return BindingBuilder.bind(this.queue()).to(this.exchange()).with(“reply_exchange_queue”); } @Bean […]

使用Spring AMQP接收和发送Java对象

我想实现Spring AMQP示例,用于使用侦听器发送和接收Java对象。 我试过这个: 发送Java对象 ConnectionFactory connectionFactory = new CachingConnectionFactory(“localhost”); AmqpAdmin admin = new RabbitAdmin(connectionFactory); admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false)).to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION)); AmqpTemplate template = new RabbitTemplate(connectionFactory); TransactionsBean obj = new TransactionsBean(); obj.setId(Long.valueOf(111222333)); 接收并发送回另一个Java对象: ConnectionFactory connectionFactory = new CachingConnectionFactory(“localhost”); AmqpAdmin admin = new RabbitAdmin(connectionFactory); admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false)) .to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION)); AmqpTemplate template = new RabbitTemplate(connectionFactory); TransactionsBean obj = (TransactionsBean) template.receiveAndConvert(QUEUE_PROCESSING_TRANSACTION); […]

Spring Amqp在springframework RetryCallback类中有NoClassDefFoundError

我一直在研究分布式Web项目,我想在其中使用带有RabbitMq的Spring amqp。 我在我的项目中使用springFramework版本4.1.6。 为此,我将以下依赖项添加到文件pom.xml中。 com.rabbitmq amqp-client 3.5.7 org.springframework.amqp spring-amqp 1.5.3.RELEASE org.springframework.amqp spring-rabbit 1.5.3.RELEASE 另外,我将下面的RabbitMQ配置文件放在web.xml中 此外,我写了一些方法来发送 @Service public class PrmSpringRabbitMessageSender { @Autowired private AmqpTemplate amqpTemplate; public void sendMessage(String queueName, String message) { amqpTemplate.convertAndSend(queueName, message); } } 并接收以下消息: @Component public class PrmSpringRabbitMessageReceiver { @RabbitListener(queues = “myQueue”) public void handleMessage(String data) { System.out.println(“Received Message : ” + data); […]