无法接收已发布的消息以订阅mqtt paho上的主题

我正在使用paho来发送和接收mqtt消息。 到目前为止,发送消息没有问题。 我收到它们有问题。我的代码是:

package BenchMQTT; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttClient; public class Test_A_2 implements MqttCallback { MqttClient clientR; MqttClient clientS; public Test_A_2() { } public static void main(String[] args) throws InterruptedException { long startTime = System.currentTimeMillis(); new Test_A_2().doDemo(); long endTime = System.currentTimeMillis(); } public void doDemo() throws InterruptedException { try { clientS = new MqttClient("tcp://mybroker:1883", "Sender"); clientR = new MqttClient("tcp://mybroker:1883", "Reiever"); clientR.connect(); clientS.connect(); MqttMessage message = new MqttMessage(); String messagePayload = "qwertyuiopasdfghjklzxcvbnmqwertyuiopasdfghjk" + "lzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfghj" + "klzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfgh" + "jklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfg" + "hjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasd" + "fghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopas" + "dfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopa" + "sdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiop" + "asdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuio" + "pasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyui" + "opasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyu" + "iopasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwerty" + "uiopasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwert" + "nmqwertyuiop"; clientR.subscribe("BenchMQTT"); clientR.setCallback(this); for(int i=0;i<10;i++) { message.setPayload((messagePayload) .getBytes()); System.out.println(i); clientS.publish("BenchMQTT", message); } clientR.disconnect(); clientS.disconnect(); clientR.close(); clientS.close(); } catch (MqttException e) { System.out.println("ERROR"); } } @Override public void connectionLost(Throwable cause) { // TODO Auto-generated method stub } @Override public void messageArrived(String topic, MqttMessage message) { System.out.println("Received: " + message.toString()); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } } 

这发送和接收消息。

OUTPUT:

 0 Received: 0 1 Received: 1 2 Received: 2 3 Received: 3 4 Received: 4 5 Received: 5 6 Received: 6 7 Received: 7 8 Received: 8 9 Received: 9 

我想发送消息,然后收到它们。 有帮助吗? 预期产量:

 0 1 2 3 4 5 6 7 8 9 Received: 0 Received: 1 Received: 2 Received: 3 Received: 4 Received: 5 Received: 6 Received: 7 Received: 8 Received: 9 

这不是MQTT(或任何发布/订阅消息)的工作原理,如果接收者连接到服务器,则消息将在发送时传递。

例外情况是,如果接收方连接并订阅了大于0的QOS主题,则稍后断开连接并重新连接(没有设置干净会话标志),那么已发布的QOS大于0的错过消息将被传递在重新连接点。

另一种可能性是,如果消息已发布且保留标志设置为true,则只有发布到主题的最后一条消息将在接收客户端订阅时传递。

以下代码执行您想要的操作,但它强制MQTT以不应该的方式运行。 消息队列上的消息仅用于确保将所有消息传递到客户端,即使它在一段时间内断开连接,也将始终尽快传递消息。

  import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttClient; public class Test_A_2 implements MqttCallback { MqttClient clientR; MqttClient clientS; public Test_A_2() { } public static void main(String[] args) throws InterruptedException { long startTime = System.currentTimeMillis(); new Test_A_2().doDemo(); long endTime = System.currentTimeMillis(); } public void doDemo() throws InterruptedException { try { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); clientS = new MqttClient("tcp://localhost:1883", "Sender"); clientR = new MqttClient("tcp://localhost:1883", "Reiever"); clientR.connect(options); clientS.connect(); clientR.setCallback(this); clientR.subscribe("BenchMQTT",2); MqttMessage message = new MqttMessage(); String messagePayload = "qwertyuiopasdfghjklzxcvbnmqwertyuiopasdfghjk" + "lzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfghj" + "klzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfgh" + "jklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfg" + "hjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasd" + "fghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopas" + "dfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopa" + "sdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiop" + "asdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuio" + "pasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyui" + "opasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyu" + "iopasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwerty" + "uiopasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwert" + "nmqwertyuiop"; clientR.disconnect(); for(int i=0;i<10;i++) { message.setPayload((messagePayload) .getBytes()); System.out.println(i); message.setQos(2); clientS.publish("BenchMQTT", message); } clientR.connect(options); clientR.setCallback(this); clientR.subscribe("BenchMQTT",2); clientR.disconnect(); clientS.disconnect(); clientR.close(); clientS.close(); } catch (MqttException e) { System.out.println("ERROR"); e.printStackTrace(); } } @Override public void connectionLost(Throwable cause) { // TODO Auto-generated method stub } @Override public void messageArrived(String topic, MqttMessage message) { System.out.println("Received: " + message.toString()); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }