使用PAHO订阅和读取MQTT消息

我正在使用paho来发送和接收mqtt消息。 到目前为止,发送消息没有问题,我通过使用mosquitto接收它们。

现在我想通过使用java客户端阅读消息,我注意到有关接收消息的文档较少。

我实现了MqttCallback接口,但我仍然无法弄清楚如何阅读我订阅的主题的消息。

这是我的源代码到目前为止,我可以使用mosquitto_sub阅读消息。

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; public class PahoDemo implements MqttCallback { MqttClient client; MqttClient subClient; public PahoDemo() { } public static void main(String[] args) { new PahoDemo().doDemo(); } public void doDemo() { try { client = new MqttClient("tcp://192.168.118.11:1883", "Sending"); subClient = new MqttClient("tcp://192.168.118.11:1883", "Subscribing"); client.connect(); subClient.connect(); subClient.subscribe("foo"); MqttMessage message = new MqttMessage(); message.setPayload("A single message from my computer fff" .getBytes()); client.publish("foo", message); client.disconnect(); client.close(); subClient.disconnect(); subClient.close(); } catch (MqttException e) { e.printStackTrace(); } } @Override public void connectionLost(Throwable cause) { // TODO Auto-generated method stub } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(message); } @Override public void deliveryComplete(IMqttDeliveryToken token) { // TODO Auto-generated method stub } } 

在经纪人有时间发回消息之前,您正在关闭客户端。

此外,您不需要2个客户端实例,只需一个即可发送和接收。

我已经编辑了一点代码,它现在将继续运行并接收消息,直到你杀了它。

 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; public class PahoDemo implements MqttCallback { MqttClient client; public PahoDemo() { } public static void main(String[] args) { new PahoDemo().doDemo(); } public void doDemo() { try { client = new MqttClient("tcp://192.168.118.11:1883", "Sending"); client.connect(); client.setCallback(this); client.subscribe("foo"); MqttMessage message = new MqttMessage(); message.setPayload("A single message from my computer fff" .getBytes()); client.publish("foo", message); } catch (MqttException e) { e.printStackTrace(); } } @Override public void connectionLost(Throwable cause) { // TODO Auto-generated method stub } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(message); } @Override public void deliveryComplete(IMqttDeliveryToken token) { // TODO Auto-generated method stub } } 

编辑:添加了缺少的client.setCallback(this)