Tag: rabbitmq

如何将数据从服务器放到Kinesis Stream

我是Kinesis的新手。 读出我发现的文档,我可以创建Kinesis Stream来从Producer获取数据。 然后使用KCL将从Stream读取此数据以进一步处理。 我理解如何通过实现IRecordProcessor来编写KCL应用程序。 然而,关于如何将数据放在Kinesis流上的第一阶段仍然不清楚。 我们是否有一些确实需要实现的AWS API。 场景:我有一台服务器,可以从文件夹中的各种来源连续获取数据。 每个文件夹都包含文本文件,其行包含用于分析工作的必需属性。 我必须将所有这些数据推送到Kinesis Stream。 我需要一些代码,如下面的类putData方法将用于Kinesis流中 public class Put { AmazonKinesisClient kinesisClient; Put() { String accessKey = “My Access Key here” ; String secretKey = “My Secret Key here” ; AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey); kinesisClient = new AmazonKinesisClient(credentials); kinesisClient.setEndpoint(“kinesis.us-east-1.amazonaws.com”, “kinesis”, “us-east-1”); System.out.println(“starting the Put Application”); } […]

从Java / Spring中检索RabbitMQ队列中未确认消息的数量

有没有办法返回未确认的消息数? 我使用此代码来获取队列中的消息数: DeclareOk declareOk = amqpAdmin.getRabbitTemplate().execute( new ChannelCallback() { public DeclareOk doInRabbit(Channel channel) throws Exception { return channel.queueDeclarePassive(name); } }); return declareOk.getMessageCount(); 但我想知道未确认消息的数量。 我已经看到RabbitMQ管理工具包含了这些信息(对于每个队列,它给出了Ready / Unacked和Total消息的数量),我想必须有一种从Java / Spring中检索它的方法。 谢谢 UPDATE Oks,似乎没有办法以编程方式完成,因为配置/队列的列表不是AMPQ的一部分。 可以启用管理插件并查询有关队列的REST Web服务(以及其他内容)。 更多信息: http://www.rabbitmq.com/management.html

使用RabbitMQ(Java客户端),有没有办法确定消费期间网络连接是否关闭?

我正在使用Java客户端在RHEL 5.3上使用RabbitMQ。 我有2个节点(机器)。 Node1使用Java帮助程序类QueueingConsumer消耗来自Node2上队列的消息。 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(“MyQueueOnNode2”, noAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); … Process message – delivery.getBody() } 如果接口在Node1或Node2上关闭(例如ifconfig eth1 down),则客户端(上面)永远不会知道网络不再存在。 RabbitMQ是否在Java客户端上提供某种类型的配置,可用于确定连接是否已消失。 在Node2上关闭RabbitMQ服务器将触发ShutdownSignalException,可以捕获该应用程序并且应用程序可以进入重新连接循环。 但是,关闭接口不会导致任何类型的exception发生,因此代码将永远等待consumer.nextDelivery()。 我也尝试过使用此调用的超时版本。 例如 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(“MyQueueOnNode2”, noAck, consumer); int timeout_ms = 30000; while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(timeout_ms); if (delivery == […]

由于SocketException,RabbitMQ新连接被拒绝

在尝试创建与另一台服务器上运行的rabbitmq的新连接时,我收到以下错误: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:406) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533) Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketException: Connection reset at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) Caused by: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(Unknown Source) at java.net.SocketInputStream.read(Unknown Source) at java.io.BufferedInputStream.fill(Unknown Source) at java.io.BufferedInputStream.read(Unknown Source) at java.io.DataInputStream.readUnsignedByte(Unknown Source) at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) […]

RabbitMQ:消息仍为“未确认”

我的Java应用程序向RabbitMQ交换发送消息,然后交换重定向消息到绑定队列。 我在RabbitMQ中使用Springframework AMQP java插件。 问题:消息进入队列,但它保持“未确认”状态,它永远不会变为“就绪”。 可能是什么原因?

spring amqp rabbitmq MessageListener无效

我想使用spring amqp使用rabbitmq,下面是我的配置。 这是一个简单的Message Listener类, import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class ImportMessageListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println(“consumer output: ” + message); } } 这是生产者(spring批次的itemWriter), public class ImportItemWriter implements ItemWriter { private AmqpTemplate template; public AmqpTemplate getTemplate() { return template; } public void setTemplate(AmqpTemplate template) { this.template = template; } public void […]

为rabbitmq配置HAProxy

我想使用HAProxy作为负载均衡器。 我想把两个rabbitmq服务器放在haproxy后面。 rabbitmq服务器都在EC2的不同实例上。 我按照此参考配置了HAProxy服务器。 我的工作但问题是消息不以roundrobin模式发布。 消息仅在一台服务器上发布。 我的要求有不同的配置吗? 我在/etc/haproxy/haproxy.cfg中的配置 listen rabbitmq 0.0.0.0:5672 mode tcp stats enable balance roundrobin option tcplog no option clitcpka no option srvtcpka server rabbit01 46.XX.XX.XX:5672 check server rabbit02 176.XX.XX.XX:5672 check listen web-service *:80 mode http balance roundrobin option httpchk HEAD / HTTP/1.0 option httpclose option forwardfor option httpchk OPTIONS /health_check.html stats enable […]

没有连接的AMQP / RabbitMQ通道何时死亡?

我有一个简单的RabbitMQ测试程序随机排队消息,另一个读取它们,都使用Spring-AMQP。 如果消费者死亡(例如,在没有机会关闭其连接或通道的情况下终止进程),则任何未确认的消息似乎永远不会被确认。 我已经看到了许多引用(例如这个问题 ),它表示当通道没有连接时它会死亡,并且将重新传送剩余的未包装的消息。 这不是我看到的行为 – 相反,我得到了一个越​​来越多的标记为IDLE的频道列表,以及越来越多的标记正在运行但没有活动的连接列表。 一旦进程被杀死,是否需要一些配置来注意连接已经死亡? 编辑:我在VirtualBox VM中运行rabbitmq服务器,显然无法通过NAT正确管理死入站连接。 这对于直接在物理主机上运行的mq服务器来说效果很好。

在SpringBoot应用程序中测试@RabbitListener方法

码: RabbitMQListener: @Component public class ServerThroughRabbitMQ implements ServerThroughAMQPBroker { private static final AtomicLong ID_COUNTER=new AtomicLong(); private final long instanceId=ID_COUNTER.incrementAndGet(); @Autowired public ServerThroughRabbitMQ( UserService userService,LoginService loginService….){ …. } @Override @RabbitListener(queues = “#{registerQueue.name}”) public String registerUserAndLogin(String json) { ….. } SERVERCONFIG: @Configuration public class ServerConfig { @Value(“${amqp.broker.exchange-name}”) private String exchangeName; @Value(“${amqp.broker.host}”) private String ampqBrokerHost; @Value(“${amqp.broker.quidco.queue.postfix}”) private String […]

Websockets,SockJs,Stomp,Spring,RabbitMQ,自动删除用户特定的队列

我希望有人可以帮我解决这个问题:我使用Spring的Websocket支持。 使用SockJs和StompJs我订阅这样的队列: var socket = new SockJS(localhost + ‘websocket’); stompClient = Stomp.over(socket); stompClient.connect(”, ”, function(frame) { stompClient.subscribe(“/user/queue/gotMessage”, function(message) { gotMessage((JSON.parse(message.body))); }); }, function(error) { }); 这对Spring的SimpMessageSendingOperations非常有用。 但是有一个大问题。 队列名称如下所示: gotMessage-user3w4tstcj并未将其声明为自动删除队列,但这正是我想要的。 否则我有10k个未使用的队列。 在队列为无消费者的那一刻,队列应该被删除。 我怎么能这样呢?