Tag: rabbitmq

从应用程序的其他层发送STOMP消息

我正在使用带有RabbitMQ代理的集群tomcat环境中使用Spring Websockets构建应用程序。 我有一个API模块需要注册端点来监听。 我按照正常的例子提出了这个配置: @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { @Override public void configureMessageBroker(final MessageBrokerRegistry config) { config.enableStompBrokerRelay(“/topic/”) .setRelayHost(“localhost”) .setRelayPort(61613) .setClientLogin(“guest”) .setClientPasscode(“guest”); } @Override public void registerStompEndpoints(final StompEndpointRegistry registry) { registry.addEndpoint(“/updates”) .setAllowedOrigins(“*”) .withSockJS(); } } 虽然这有效,但它并没有解决我的问题,因为看起来WebSocket和中继配置都捆绑在API模块中,因此其他层无法重用代理。 我需要在服务层发生stomp消息代理中继配置,以便我们的应用程序的其他模块可以将消息推送到RabbitMQ中的主题,然后转向并通知API模块更新所有打开的websockets。 下面是我们的应用程序中相关层的示例图以及我要完成的任务。 我需要允许模块“Cron Message Sender”通过我们的其他API模块将消息推送给订阅消息主题的每个人。

RabbitMQ AMQP Java客户端关闭处理程序的用途是什么?

RabbitMQ文档介绍了如何添加关闭侦听器以及何时调用侦听器但我无法看到处理程序的用途。 似乎所有Java示例(包括https://github.com/rabbitmq/rabbitmq-tutorials )都忽略了关闭处理程序。 在哪种情况下,我更喜欢关闭监听器而不是简单地捕获ShutdownSignalException (和IOException )exception? 另一个令人费解的问题是控制流程在处理程序完成时的位置。 可能相关的相关SO问题: 通过运行RabbitMQ使用者安全地结束Java应用程序的最佳方法是什么? 使用RabbitMQ(Java客户端),有没有办法确定消费期间网络连接是否关闭?

Spring-AMQP重新排队消息计数基于JVM吗?

我正在寻找rabbitmq文档,似乎rabbitmq不处理消息重新传递计数。 如果我要手动ACK / NACK消息,我需要将重试计数保留在内存中(例如,通过使用correlationId作为映射中的唯一键),或者通过在消息中设置我自己的标头,并重新传送它(因此把它放在队列的末尾) 然而,这是弹簧处理的情况。 具体来说,我指的是RetryInterceptorBuilder.stateful()。maxAttempts(x)。 这个计数是特定于JVM的,还是以某种方式操纵消息? 例如,我有一个部署到2台服务器的Web应用程序,maxAttempts设置为5.总重新传输计数是否可能是5-9,具体取决于重新传递和重新处理的顺序。服务器?

Group在RabbitMQ中收到消息,最好使用Spring AMQP?

我正在接收来自服务(S)的消息,该服务将每个单独的属性更改作为单独的消息发布到实体。 一个人为的例子是这样的实体: Person { id: 123 name: “Something”, address: {…} } 如果在同一事务中更新了名称和地址,则(S)将发布两条消息, PersonNameCorrected和PersonMoved 。 问题出在接收方,我正在存储此Person实体的投影,每个属性更改都会导致写入数据库。 因此,在这个例子中,将有两次写入数据库,但如果我可以在短时间内批量处理消息并按id分组,那么我只需要对数据库进行一次写入。 如何在RabbitMQ中处理这个问题? Spring AMQP是否提供了更简单的抽象? 请注意,我已经简要介绍了预取,但我不确定这是否可行。 如果我理解正确的话,预取也是基于连接的。 我试图在每个队列的基础上实现这一点,因为如果批处理(因此增加了延迟)是要走的路,我不想将这种延迟添加到我的服务所消耗的所有队列中(但仅限于那些需要“group-by-id”function)。

使用RabbitMQ发送对象

我理解这个问题重复了使用rabbitmq发送消息而不是字符串而不是struct的问题 如果用第一种方式做到这一点 第一种方式 我有以下痕迹: java.io.EOFException at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2304) at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2773) at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:798) at java.io.ObjectInputStream.(ObjectInputStream.java:298) at com.mdnaRabbit.worker.data.Data.fromBytes(Data.java:78) at com.mdnaRabbit.worker.App.main(App.java:41) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) 我检查并确保在发件人类中将消息转换为字节,但是消费者无法接收它。 这是我的制作人类: package com.mdnaRabbit.newt; import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import org.apache.commons.lang.SerializationUtils; import com.mdnaRabbit.worker.data.Data; public class App { private static final String TASK_QUEUE_NAME […]

我怎样才能在rabbitmq中汇集频道?

我一直在尝试在线程之间共享连接,并且只在创建线程时才打开通道,但在研究了一点之后,我想我也想尝试connection pooling 。 我怎么能在rabbitmq上做到这一点? 或者这是一般性的想法我可以普遍适用吗? 我的目标是生成X线程,然后让它们不必打开新的通道(这需要在客户端和服务器之间建立循环)。 由于线程是它们自己的类,我不确定是否需要将池放入生成线程的类本身或它们去哪里?我还有多种类型的线程我想要在它们之间共享这些连接(不是只是一个)。 那可能吗? 为了给你一个大致的想法,这里是如何在rabbitmq中建立连接/渠道: ConnectionFactory factory = new ConnectionFactory(); factory.setHost(“localhost”); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //I want to share several of these between threads

如何使用Java在RabbitMQ中实现Headers Exchange?

我是一个尝试在java客户端实现Headers交换的新手。 我知道这就是“x-match”绑定参数的用途。 当“x-match”参数设置为“any”时,只需一个匹配的标头值就足够了。 或者,将“x-match”设置为“all”,强制所有值必须匹配。 但任何人都可以为我提供一个骨架代码,以便更好地理

在执行器服务RabbitMQ中只有一个线程同时运行

我已经创建了一个具有20个核心的指定线程池的连接。 ConnectionFactory factory = new ConnectionFactory(); …. //specified es ExecutorService consumerExecutor = Executors.newFixedThreadPool(threadNum, threadFactory); con = factory.newConnection(consumerExecutor, addresses); 然后从此连接创建一个频道: final Channel channel = connection.createChannel(); 并使用它来创建DefaultConsumer。 虽然我发现尽管线程可以用来消费消息,但总是只有一个线程消耗消息,即使消息在服务器中大量累积。 我查看源代码并找到: private final class WorkPoolRunnable implements Runnable { @Override public void run() { int size = MAX_RUNNABLE_BLOCK_SIZE; List block = new ArrayList(size); try { Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size); […]

Play Framework:为Spring RabbitMQ监听器手动打开JPA上下文

我正在使用Spring-AMQP来监视Play应用程序中的RabbitMQ消息队列。 问题是我无法从侦听器代码访问我的数据库,因为JPA上下文未在此范围内打开。 我理解Play Framework管理JPA上下文,以便在处理HTTP请求时打开它,但有没有办法可以从外部Play控制器/作业中使用JPA?

风暴拓扑不提交

我配置了我的机器zookeeper,nimbus,supervisor正常运行,我的拓扑在LocalCluster中工作 LocalCluster cluster = new LocalCluster(); cluster.submitTopology(“SendPost”, conf, builder.createTopology()); Utils.sleep(10000000000l); cluster.killTopology(“SendPost”); cluster.shutdown(); 现在我想尝试提交我的拓扑结构但它不起作用 /usr/local/storm/bin$ ./storm jar /home/winoria/Desktop/Storm/storm-starter/target/storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.winoria.post.PostTopology Post 我得到了以下错误 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/local/storm/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/winoria/Desktop/Storm/storm-starter/target/storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. Running: java -client -Dstorm.options= -Dstorm.home=/usr/local/storm -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local /storm/storm-netty-0.9.0.1.jar:/usr/local/storm/storm-console-logging-0.9.0.1.jar:/usr/local/storm/storm-core-0.9.0.1.jar:/usr/local/storm/lib/httpcore-4.1.jar:/usr/local/storm/lib/carbonite-1.5.0.jar:/usr/local/storm/lib/mockito-all-1.9.5.jar:/usr/local/storm/lib/commons-io-1.4.jar:/usr/local/storm/lib/commons-fileupload-1.2.1.jar:/usr/local/storm/lib/jgrapht-0.8.3.jar:/usr/local/storm/lib/ring-jetty-adapter-0.3.11.jar:/usr/local/storm/lib/jzmq-2.1.0.jar:/usr/local/storm/lib/asm-4.0.jar:/usr/local/storm/lib/logback-core-1.0.6.jar:/usr/local/storm/lib/tools.nrepl-0.2.3.jar:/usr/local/storm/lib/compojure-1.1.3.jar:/usr/local/storm/lib/json-simple-1.1.jar:/usr/local/storm/lib/ring-devel-0.3.11.jar:/usr/local/storm/lib/commons-logging-1.1.1.jar:/usr/local/storm/lib/httpclient-4.1.1.jar:/usr/local/storm/lib/reflectasm-1.07-shaded.jar:/usr/local/storm/lib/commons-exec-1.1.jar:/usr/local/storm/lib/guava-13.0.jar:/usr/local/storm/lib/clout-1.0.1.jar:/usr/local/storm/lib/objenesis-1.2.jar:/usr/local/storm/lib/slf4j-api-1.6.5.jar:/usr/local/storm/lib/clojure-1.4.0.jar:/usr/local/storm/lib/jetty-6.1.26.jar:/usr/local/storm/lib/hiccup-0.3.6.jar:/usr/local/storm/lib/clj-stacktrace-0.2.2.jar:/usr/local/storm/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/storm/lib/tools.logging-0.2.3.jar:/usr/local/storm/lib/ring-core-1.1.5.jar:/usr/local/storm/lib/zookeeper-3.3.3.jar:/usr/local/storm/lib/math.numeric-tower-0.0.1.jar:/usr/local/storm/lib/disruptor-2.10.1.jar:/usr/local/storm/lib/minlog-1.2.jar:/usr/local/storm/lib/core.incubator-0.1.0.jar:/usr/local/storm/lib/servlet-api-2.5-20081211.jar:/usr/local/storm/lib/netty-3.6.3.Final.jar:/usr/local/storm/lib/ring-servlet-0.3.11.jar:/usr/local/storm/lib/clj-time-0.4.1.jar:/usr/local/storm/lib/snakeyaml-1.11.jar:/usr/local/storm/lib/commons-codec-1.4.jar:/usr/local/storm/lib/tools.cli-0.2.2.jar:/usr/local/storm/lib/logback-classic-1.0.6.jar:/usr/local/storm/lib/servlet-api-2.5.jar:/usr/local/storm/lib/kryo-2.17.jar:/usr/local/storm/lib/joda-time-2.0.jar:/usr/local/storm/lib/curator-client-1.0.1.jar:/usr/local/storm/lib/libthrift7-0.7.0-2.jar:/usr/local/storm/lib/tools.macro-0.1.0.jar:/usr/local/storm/lib/jline-0.9.94.jar:/usr/local/storm/lib/clojure-complete-0.2.3.jar:/usr/local/storm/lib/curator-framework-1.0.1.jar:/usr/local/storm/lib/commons-lang-2.5.jar:/usr/local/storm/lib/junit-3.8.1.jar:/usr/local/storm/lib/jetty-util-6.1.26.jar:/home/winoria/Desktop/Storm/storm-starter/target/storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:/usr/local/storm/conf:/usr/local/storm/bin -Dstorm.jar=/home/winoria/Desktop/Storm/storm-starter/target/storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.winoria.post.PostTopology […]