Tag: rabbitmq

使用RabbitMQ源的Spark结构化流式传输

我正在尝试编写一个Structured Streaming的自定义接收器,它将使用来自RabbitMQ消息。 Spark 最近发布了 DataSource V2 API,这似乎非常有前景。 由于它抽象了很多细节,我想在简单性和性能方面使用这个API。 但是,由于它很新,因此可用的资源不多。 我需要经验丰富的Spark人员做一些澄清,因为他们会更容易掌握关键点。 开始了: 我的出发点是博客文章系列,第一部分在这里 。 它显示了如何在没有流function的情况下实现数据源。 为了制作流媒体源,我略微改变了它们,因为我需要实现MicroBatchReadSupport而不是(或除了) DataSourceV2 。 为了提高效率,让多个spark执行器同时使用RabbitMQ是明智的,即从同一队列中消耗RabbitMQ 。 如果我没有感到困惑,输入的每个分区-in Spark的术语 – 对应于来自队列的消费者-in RabbitMQ术语。 因此,我们需要为输入流分配多个分区,对吧? 与本系列的第4部分类似,我实现了MicroBatchReader ,如下所示: @Override public List<DataReaderFactory> createDataReaderFactories() { int partition = options.getInt(RMQ.PARTITICN, 5); List<DataReaderFactory> factories = new LinkedList(); for (int i = 0; i < partition; i++) { factories.add(new RMQDataReaderFactory(options)); } […]

如何让Spring RabbitMQ创建一个新的Queue?

在使用rabbit-mq的(有限)经验中,如果为尚不存在的队列创建新的侦听器,则会自动创建队列。 我正在尝试使用带有rabbit-mq的Spring AMQP项目来设置一个监听器,而我正在收到错误。 这是我的xml配置: 我在RabbitMq日志中得到了这个: =ERROR REPORT==== 3-May-2013::23:17:24 === connection , channel 1 – soft error: {amqp_error,not_found,”no queue ‘test’ in vhost ‘/'”,’queue.declare’} 和AMQP的类似错误: 2013-05-03 23:17:24,059 ERROR [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] (SimpleAsyncTaskExecutor-1) – Consumer received fatal exception on startup org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn’t exist or the broker will not allow us to use […]

自动重新连接RabbitMQ频道

我找到了这个gem: 如果连接失败,客户端将需要与代理建立新连接。 在先前连接上打开的任何频道都将自动关闭,这些频道也需要重新打开。 所以那不好。 我即将编写一大堆处理自动重新连接并重新创建通道,然后从我的所有代码中封装这种情况。 问题是,这应该已经完成​​了。 这可能在Java RMQ库中吗?

spring boot rabbitmq MappingJackson2MessageConverter自定义对象转换

我正在尝试使用spring boot创建一个简单的spring boot应用程序,它将“生成”消息发送到rabbitmq交换/队列,以及另一个“消耗”这些消息的示例spring boot应用程序。 所以我有两个应用程序(或微服务,如果你愿意)。 1)“生产者”微服务2)“消费者”微服务 “生产者”有2个域对象。 Foo和Bar应该转换为json并发送给rabbitmq。 “消费者”应该分别接收并将json消息转换为域Foo和Bar。 出于某种原因,我不能完成这个简单的任务。 关于这个的例子并不多。 对于消息转换器我想使用org.springframework.messaging.converter.MappingJackson2MessageConverter 这是我到目前为止: 生产者MICROSERVICE package demo.producer; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.stereotype.Service; @SpringBootApplication public class ProducerApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); […]

将curl调用转换为java urlconnection调用

我有curl命令: curl -i -u guest:guest -H “content-type:application/json” -XPUT \ http://localhost:15672/api/traces/%2f/my-trace \ -d'{“format”:”text”,”pattern”:”#”}’ 我想在Java API中创建HTTP请求,它将执行相同的操作。 这个curl命令可以在本自述文件中找到。 它用于开始在RabbitMQ上记录日志。 回应并不重要。 现在我创建了这样的东西(我删除了不太重要的行,即捕获exception等),但不幸的是它不起作用: url = new URL(“http://localhost:15672/api/traces/%2f/my-trace”); uc = url.openConnection(); uc.setRequestProperty(“Content-Type”, “application/json”); uc.setRequestProperty(“format”,”json”); uc.setRequestProperty(“pattern”,”#”) String userpass = “guest:guest”; String basicAuth = “Basic ” + javax.xml.bind.DatatypeConverter.printBase64Binary(userpass.getBytes()); uc.setRequestProperty (“Authorization”, basicAuth); 完整的代码

动态绑定多个队列到一个处理程序

是否可以将多个队列绑定到一个事件处理程序? 关键是这些队列将被动态添加(绑定),第一个,下一个,依此类推。 我想只有一个事件处理程序。 也许基于代码的创建队列由其他现有队列支持?

Spring集成 – Queue / Poller似乎在没有任何动作的情况下耗尽线程池

我有一个Spring集成应用程序,附加到AMQP代理。 我想从amqp队列接收消息,并更新db记录。 为了提高性能,我有一个工作池允许多个更新同时发生。 我有以下配置: 如果我开始运行,没有在AMQP频道上处理的入站消息,我很快就会看到thredpool耗尽,并开始拒绝。 这是日志: [Thu Apr 2013 23:41:51.153] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) – Retrieving delivery for Consumer: tag=[amq.ctag-w4qPp60jVEQOIEovR4cERv], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0 [Thu Apr 2013 23:41:51.160] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) – Retrieving delivery for Consumer: tag=[amq.ctag-Q3Lq4R9g9E8WBNVLYzaFmq], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), acknowledgeMode=AUTO local queue size=0 [Thu Apr 2013 23:41:51.166] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) […]

Spring云流动态支持路由消息

我想创建一个公共项目(使用spring cloud stream)根据消息内容动态地将消息路由到不同的(消费者)项目。 (rabbitmq作为消息代理) 春云流支持吗? 如果没有,任何提议的方法来实现这一点? 谢谢

通过运行RabbitMQ使用者安全地结束Java应用程序的最佳方法是什么?

我们有一个独立的java应用程序在Debian机器上进行一些后台处理。 它必须处理的作业是通过RabbitMQ消息发送的。 当需要升级java应用程序时,我们需要停止它(杀死它)。 但我们必须确保没有消费者正在处理消息。 根据您的经验,实现这一目标的最佳方法是什么? 我们尝试向消费者发送“SHUTDOWN”消息,但我们似乎无法关闭队列或频道?! 它冻结了应用程序! 或者是否有另一种解决方案,我们可以自动关闭应用程序,而无需在linux中执行kill命令? 谢谢分享你的经验。 问候

为什么我的RabbitMQ频道会继续关闭?

我正在调试一些使用Apache POI从Microsoft Office文档中提取数据的Java代码。 有时,当内存不足时,它会遇到大文档和POI崩溃。 此时,它会尝试将错误发布到RabbitMQ,以便其他组件可以知道此步骤失败并采取适当的操作。 但是,当它尝试发布到队列时,它会获得com.rabbitmq.client.AlreadyClosedException (clean connection shutdown; reason: Attempt to use closed channel) 。 这是error handling程序代码: try { //Extraction and indexing code } catch(Throwable t) { // Something went wrong! We’ll publish the error and then move on with // our lives System.out.println(“Error received when indexing message: “); t.printStackTrace(); System.out.println(); String error = […]