Spring MqttPahoMessageDrivenChannelAdapter丢失连接:连接丢失; 重试

我们使用Spring message-driven-channel-adapter来订阅MQTT主题。 但是我们经常遇到错误。 我已经使用JavaScript客户端( mqttws31.js )测试了连接,它工作正常。 意味着连接没有问题。

错误: –

 org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter connectionLost SEVERE: Lost connection:Connection lost; retrying... 

MQTT消息: –

 [payload=6483D03E4C75BA943148F18D73,1.00,1E, headers={mqtt_retained=false, mqtt_qos=0, id=5fa41168-34c6-1e3d-a775-e3146842990a, mqtt_topic=TEST/GATEWAY2, mqtt_duplicate=false, timestamp=1499067757559}] 

配置 : –

            

pom.xml:

  org.eclipse.paho org.eclipse.paho.client.mqttv3 1.1.1   org.springframework.integration spring-integration-mqtt 4.2.2.RELEASE  

在调试org.eclipse.paho.client.mqttv3-1.1.1-sources.jar : –

CommsReceiver.Java

 public void run() { final String methodName = "run"; MqttToken token = null; while (running && (in != null)) { try { //@TRACE 852=network read message log.fine(CLASS_NAME,methodName,"852"); receiving = in.available() > 0; MqttWireMessage message = in.readMqttWireMessage(); receiving = false; // instanceof checks if message is null if (message instanceof MqttAck) { token = tokenStore.getToken(message); if (token!=null) { synchronized (token) { // Ensure the notify processing is done under a lock on the token // This ensures that the send processing can complete before the // receive processing starts! ( request and ack and ack processing // can occur before request processing is complete if not! clientState.notifyReceivedAck((MqttAck)message); } } else if(message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) { //This is an ack for a message we no longer have a ticket for. //This probably means we already received this message and it's being send again //because of timeouts, crashes, disconnects, restarts etc. //It should be safe to ignore these unexpected messages. log.fine(CLASS_NAME, methodName, "857"); } else { // It its an ack and there is no token then something is not right. // An ack should always have a token assoicated with it. throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR); } } else { if (message != null) { // A new message has arrived clientState.notifyReceivedMsg(message); } } } catch (MqttException ex) { //@TRACE 856=Stopping, MQttException log.fine(CLASS_NAME,methodName,"856",null,ex); running = false; // Token maybe null but that is handled in shutdown clientComms.shutdownConnection(token, ex); } catch (IOException ioe) { //@TRACE 853=Stopping due to IOException log.fine(CLASS_NAME,methodName,"853"); running = false; // An EOFException could be raised if the broker processes the // DISCONNECT and ends the socket before we complete. As such, // only shutdown the connection if we're not already shutting down. if (!clientComms.isDisconnecting()) { clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe)); } } finally { receiving = false; } } //@TRACE 854=< log.fine(CLASS_NAME,methodName,"854"); } 

在上面的方法中,有时in.readMqttWireMessage()抛出IOException 。 所以基于catch块,它使用clientComms.shutdownConnection(token, ...重新连接clientComms.shutdownConnection(token, ...

只是想分享它有帮助…我有相同的例外并通过确保生成一个唯一的客户端ID(使用MqttAsyncClient.generateClientId() )来修复它,如下所述: https : //github.com/eclipse/ paho.mqtt.java/issues/207#issuecomment-338246879

但你仍然没有真正描述一个问题。 您在上面显示了一条消息,因此它必须适合您。 Paho正在检测连接问题; 它通知将重新连接的Spring Integration。

您可以通过向ApplicationListener添加ApplicationListener来获取有关exception的完整信息。

 @Bean public ApplicationListener eventListener() { return new ApplicationListener() { @Override public void onApplicationEvent(MqttConnectionFailedEvent event) { event.getCause().printStackTrace(); } }; } 

结果:

 Connection lost (32109) - java.io.EOFException at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:164) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:267) at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:116) ... 1 more 

(当我关闭经纪人时)。

如果您认为paho客户端存在问题,则应该为该项目提出问题。