Tag: activemq

重播通过ActiveMQ发送的消息

是否有一种简单的方法来创建通过队列发送的每条消息的副本,以便在需要时,用户可以浏览以前传输的消息列表并通过单击按钮多次重播它们? 我有程序X向队列发送消息,程序Y然后读取它。我希望能够重放以前发送的消息,而不必返回程序X并再次重新生成它。

Spring Batch – 并非所有记录都是从MQ检索中处理的

我是Spring和Spring Batch的新手,如果你有任何问题,请随时提出任何澄清问题。 我看到Spring Batch的问题,我无法在我们的测试或本地环境中重新创建。 我们有一个日常工作,通过JMS连接到Websphere MQ并检索一组记录。 此作业使用开箱即用的JMS ItemReader。 我们实现了自己的ItemProcessor,但它除了记录之外没有做任何特殊的事情。 没有应该影响传入记录的filter或处理。 问题是,在MQ上的每日10,000多条记录中,只有大约700个左右(确切的数字每次不同)通常会记录在ItemProcessor中。 所有记录都已成功从队列中删除。 记录的记录数每次都不同,似乎没有模式。 通过将日志文件与MQ中的记录列表进行比较,我们可以看到一个看似随机的记录子集正在被我们的工作“处理”。 可能会拾取第一条记录,然后跳过50条记录,然后连续5条等等。每次作业运行时,模式都不同。 也没有记录exception。 在localhost中运行相同的应用程序并使用相同的数据集进行测试时,ItemProcessor将成功检索并记录所有10,000多条记录。 该作业在生产中运行20到40秒(也不是常数),但在测试和本地,它需要几分钟才能完成(这显然是有意义的,因为它处理了更多的记录)。 因此,这是解决问题的难题之一,因为我们无法重新创建它。 一个想法是实现我们自己的ItemReader并添加额外的日志记录,以便我们可以看到记录是否在读者之前或读者之后丢失 – 我们现在知道的是ItemProcessor只处理了一部分记录。 但即使这样也无法解决我们的问题,并且考虑到它甚至不是一个解决方案,它将在某种程度上及时实施。 还有其他人看过像这样的问题吗? 任何可能的想法或疑难解答建议将不胜感激。 以下是我们用于参考的一些jar版本号。 spring – 3.0.5.RELEASE Spring Integration – 2.0.3.RELEASE Spring Batch – 2.1.7.RELEASE 活动MQ – 5.4.2 Websphere MQ – 7.0.1 提前感谢您的意见。 编辑:每个请求,处理器的代码: public SMSReminderRow process(Message message) throws Exception { SMSReminderRow retVal […]

无法让ActiveMQ重新发送我的消息

我有一个用Java编写的单线程ActiveMQ使用者。 我所要做的就是从队列中接收()一个消息,尝试将其发送到Web服务,如果成功则确认()它。 如果Web服务调用失败,我希望消息保留在队列中并在超时后重新发送。 它或多或少都在工作,除了重发部分:每次重新启动我的消费者时,它会为每个仍然在队列中的消息收到一条消息,但是在发送它们之后,消息永远不会被重新发送。 我的代码看起来像: public boolean init() throws JMSException, FileNotFoundException, IOException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); RedeliveryPolicy policy = new RedeliveryPolicy(); policy.setInitialRedeliveryDelay(500); policy.setBackOffMultiplier(2); policy.setUseExponentialBackOff(true); connectionFactory.setRedeliveryPolicy(policy); connectionFactory.setUseRetroactiveConsumer(true); // ???? Connection connection = connectionFactory.createConnection(); connection.setExceptionListener(this); connection.start(); session = connection.createSession(transacted, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); destination = session.createQueue(subject); //??? consumer = session.createConsumer(destination); //consumer.setMessageListener(this); // message listener had same […]

可以以某种方式查看AMQ主题的内容吗?

我正在尝试使用activemq-admin查看主题的内容。 这就是我正在尝试的: ./activemq-admin browse –amqurl tcp://localhost:61616 my.topic 这是我得到的输出: Java Runtime: Sun Microsystems Inc. 1.6.0_24 /usr/lib/jvm/java-6-sun-1.6.0.24/jre Heap sizes: current=62848k free=62190k max=932096k JVM args: -Dactivemq.classpath=/home/pc/dev/apache-activemq-5.3.1/conf; -Dactivemq.home=/home/pc/dev/apache-activemq-5.3.1 -Dactivemq.base=/home/pc/dev/apache-activemq-5.3.1 ACTIVEMQ_HOME: /home/pc/dev/apache-activemq-5.3.1 ACTIVEMQ_BASE: /home/pc/dev/apache-activemq-5.3.1 这不是我想要的。 我希望以某种方式以原始forms看到队列中的消息。 那可能吗 ? 谢谢,

activemq优先级

我们目前正在使用JMS和activemq(5.5.1)开发应用程序。 我们希望为某些消息定义更高的优先级,这将使它们首先被消耗。 在设置生产者和使用者之后(通过spring(3.1)JMSTemplate),优先级不能完全发挥作用。 实际上,当我们“关闭”消费者并发送一些消息时,优先权得到尊重,但是当我们在消费者开启时添加消息时,消息的接收顺序与他们发送的顺序相同。 配置很简单: 在activemq配置文件中激活优先级: ” prioritizedMessages=”true”/> … 并且在生产者模板配置中启用了QoS: 要发送具有高优先级的消息,我们只需更改生产者端的模板优先级属性: template.setPriority(9); 任何想法? 这是正常的行为,还是有一些我们会忘记的配置?

使用ActiveMQ,Camel和Spring实现Request-Reply模式

我正在尝试实现以下function: 然后逐行读取CSV文件: 根据该行包含的值构建请求 将请求发送到消息队列 其他组件需要接收消息,处理请求并将响应发送到另一个消息队列(生产者已知,因此生产者可以获取响应)。 我相信请求 – 回复模式适合该法案。 我安装了ActiveMQ,下载了camel并尝试使用他们的jms项目。 配置组件,队列和测试连接(工作)后,我试图弄清楚实际上如何实现请求 – 回复? 我没有找到任何好的例子 我有一个RouteBuilder RouteBuilder public class MyRouteBuilder extends RouteBuilder { public static void main(String[] args) throws Exception { new Main().run(args); } public void configure() { from(“file:src/data?noop=true”) .to(“activemq:RequestQ”); from(“activemq:RequestQ?exchangePattern=InOut&timeToLive=5000”) .inOut(“activemq:RequestQ”, “bean:myBean?method=someMethod”); } } 骆驼的context.xml org.apache.camel.example.spring 问题: 如何读取文件逐行构造并根据行内容发布消息? 如何配置路由以及如何配置邮件头以便在获取响应后将被删除的临时队列中获取响应? 您可以推荐哪些快速入门指南? 编辑 我得到了下面的代码。 现在让我们说在处理器中我创建响应。 我该如何寄回? 我该如何使用响应? public […]

Java运行时环境检测到致命错误。 EXCEPTION_ACCESS_VIOLATION

我有java 1.6,maven 2,activeMQ 5.5和testngfunction测试。 当我在Idea中启动它然后确定,但是当我尝试从控制台使用maven启动它们时,然后在尝试通过activeMQ发送消息后进程暂停,并且在一段时间后崩溃并在日志中出现以下错误: # # A fatal error has been detected by the Java Runtime Environment: # # EXCEPTION_ACCESS_VIOLATION (0xc0000005) at pc=0x000000006d92f7a6, pid=5716, tid=7000 # # JRE version: 6.0_27-b07 # Java VM: Java HotSpot(TM) 64-Bit Server VM (20.2-b06 mixed mode windows-amd64 compressed oops) # Problematic frame: # V [jvm.dll+0x9f7a6] # # If you […]

ActiveMQ:没有经纪人的新手消费者

我正在编写一个从队列中消耗的JMS客户端。 如果重要的话,我的经纪人是activemq。 一个要求是即使经纪人关闭,客户也应该开始。 在这种情况下,它应该表现得好像队列中没有消息,并且一旦代理启动并且消息开始相应的行为。 问题是在我的代码中: connectionFactory = new ActiveMQConnectionFactory(url); Connection connection = connectionFactory.createConnection(); connection.start() 如果代理已关闭,那么它会卡在connection.start() 。 虽然我想要的是connection.start()以静默方式返回并继续尝试在后台连接并消耗消息,而不能消息。 我怎样才能做到这一点。

当两个应用程序都使用嵌入式activemq时,如何将Jms消息从一个spring-boot应用程序发送到另一个应用程序

我有两个spring-boot应用程序。 在receiver-application的Application.java中我有: @Bean public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } 在Receiver.java中…… @JmsListener(destination = “myQueue”, containerFactory = “myFactory”) public void receiveMessage(String tradeString) throws JSONException, IOException { tradeImpl = new ObjectMapper().readValue(tradeString, TradeImpl.class); } 在sender-application中我只使用: public void send(trade) { String queueName = “myQueue”; String tradeString = new ObjectMapper().writeValueAsString(trade); […]

如何使用Spring Boot配置嵌入式ActiveMQ Broker URL

我按照一个简单的例子来设置和运行带有Spring Boot的嵌入式ActiveMQ(版本1.4.X)。 这是示例https://spring.io/guides/gs/messaging-jms/的链接 我的课程结构如下: @SpringBootApplication @EnableJms public class Application { @Autowired ConfigurableApplicationContext context; @Bean JmsListenerContainerFactory myJmsContainerFactory(ConnectionFactory connectionFactory) { SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); return factory; } @JmsListener(destination = “mailbox-destination”, containerFactory = “myJmsContainerFactory”) public void receiveMessage(String message) { System.out.println(“Message received: ” + message); context.close(); } public static void main(String[] args) throws Exception { FileSystemUtils.deleteRecursively(new File(“active-data”)); […]