Tag: messaging

Spring-AMQP重新排队消息计数基于JVM吗?

我正在寻找rabbitmq文档,似乎rabbitmq不处理消息重新传递计数。 如果我要手动ACK / NACK消息,我需要将重试计数保留在内存中(例如,通过使用correlationId作为映射中的唯一键),或者通过在消息中设置我自己的标头,并重新传送它(因此把它放在队列的末尾) 然而,这是弹簧处理的情况。 具体来说,我指的是RetryInterceptorBuilder.stateful()。maxAttempts(x)。 这个计数是特定于JVM的,还是以某种方式操纵消息? 例如,我有一个部署到2台服务器的Web应用程序,maxAttempts设置为5.总重新传输计数是否可能是5-9,具体取决于重新传递和重新处理的顺序。服务器?

当两个应用程序都使用嵌入式activemq时,如何将Jms消息从一个spring-boot应用程序发送到另一个应用程序

我有两个spring-boot应用程序。 在receiver-application的Application.java中我有: @Bean public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } 在Receiver.java中…… @JmsListener(destination = “myQueue”, containerFactory = “myFactory”) public void receiveMessage(String tradeString) throws JSONException, IOException { tradeImpl = new ObjectMapper().readValue(tradeString, TradeImpl.class); } 在sender-application中我只使用: public void send(trade) { String queueName = “myQueue”; String tradeString = new ObjectMapper().writeValueAsString(trade); […]

JMS – 消息选择器如何与多个队列和主题使用者一起工作?

假设您有一个JMS队列,并且多个消费者正在查看队列中的消息。 您希望其中一个消费者获得所有特定类型的消息,因此您决定使用消息选择器。 例如,您可以在名为targetConsumer JMS消息头中定义一个属性。 您应用于称为A的消费者的消息选择器类似于WHERE targetConsumer = ‘CONSUMER_A’ 。 很明显,消费者A现在只是抓住具有属性集的消息,就像在示例中一样。 但是,其他消费者是否会意识到这一点? IOW,如果它在消费者A之前查看队列,那么另一个不受消息选择器约束的消费者会抓住CONSUMER_A消息吗? 我是否需要将消息选择器(例如, WHERE targetConsumer ‘CONSUMER_A’应用于其他人? 我现在正在RTFMing并收集经验数据,但希望有人可能知道他们的头脑。

Kafka Java使用者永远不会收到任何消息

我正在尝试设置一个基本的Java使用者来接收来自Kafka主题的消息。 我已经按照以下示例访问了 – https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example – 并且拥有以下代码: package org.example.kafka.client; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaClientMain { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public KafkaClientMain(String a_zookeeper, String a_groupId, String a_topic) { this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic […]

从JMS MessageListener发出回滚信号

我一直在使用JMS和ActiveMQ。 一切都在创造奇迹。 我不是用spring,也不是我。 接口javax.jms.MessageListener只有一个方法onMessage 。 在实现中,可能会抛出exception。 如果实际上抛出exception,那么我说消息没有正确处理,需要重新尝试。 所以,我需要ActiveMQ等待一段时间,然后重试。 即我需要抛出exception来回滚JMS事务。 我怎样才能完成这样的行为? 也许ActiveMQ中有一些我无法找到的配置。 或者……也许可以取消将MessageListener注册到消费者并自己使用消息,如下所示: while (true) { // … some administrative stuff like … session = connection.createSesstion(true, SESSION_TRANSACTED) try { Message m = receiver.receive(queue, 1000L); theMessageListener.onMessage(m); session.commit(); } catch (Exception e) { session.rollback(); Thread.sleep(someTimeDefinedSomewhereElse); } // … some more administrative stuff } 在几个线程中,而不是注册监听器。 或者……我可以以某种方式装饰/ AOP /字节操作MessageListener来自己做。 你会采取什么途径?为什么? […]

Kafka – 使用高级消费者实现延迟队列

想要使用高级消费者api实现延迟消费者 大意: 按密钥生成消息(每个消息包含创建时间戳)这可确保每个分区按生产时间排序消息。 auto.commit.enable = false(将在每个消息进程后显式提交) 消费一条消息 检查消息时间戳并检查是否已经过了足够的时间 进程消息(此操作永远不会失败) 提交1个偏移量 while (it.hasNext()) { val msg = it.next().message() //checks timestamp in msg to see delay period exceeded while (!delayedPeriodPassed(msg)) { waitSomeTime() //Thread.sleep or something…. } //certain that the msg was delayed and can now be handled Try { process(msg) } //the msg process will never fail […]

Websphere 7 MQueue:如何从Java访问队列深度?

我想编写一些代码来监视Websphere 7 MQ上的队列大小。 这是我提出的代码 MQEnvironment.hostname = “10.21.1.19”; MQEnvironment.port = 1414; MQEnvironment.channel = “SYSTEM.CDEF.SVRCONN”; MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES); MQQueueManager qMgr = new MQQueueManager(“MYQMGR”); MQQueue destQueue = qMgr.accessQueue(“PUBLISH”, MQC.MQOO_INQUIRE); System.out.println(destQueue.getCurrentDepth()); destQueue.close(); qMgr.disconnect(); 我怎么知道“频道”是什么? 我如何知道传递给MQQueueManager的队列管理器名称是什么? 或者我应该看看另一个API? 我需要它与WRS 7 SIB和MQ一起工作。 谢谢Jeff Porter

高性能JMS消息传递

我阅读了今年UberConf的幻灯片,其中一位发言人提出Spring JMS为您的消息队列系统增加了性能开销的论点,但我没有看到任何证据支持幻灯片。 发言者还说明点对点比传统的“发布 – 订阅”方法更快,因为每个消息只发送一次而不是广播给每个消费者。 我想知道是否有经验丰富的Java消息传递专家可以在这里权衡并澄清一些技术细节: 使用Spring JMS而不仅仅是纯JMS实际上是否会产生性能开销? 如果是这样,它是如何以及在何处引入的? 它有什么办法吗? 有什么实际证据支持P2P比pub-sub模型更快,如果是这样的话,是否有任何情况下你想要通过P2P发布sub-sub(即为什么变慢?!? )?