Android – Paho Mqtt客户端在网络连接更改后不会收到消息(移动数据已禁用并再次启用)

我正在使用Mosquitto Mqtt和paho API来接收Android设备上的推送消息。 但是一旦网络连接发生变化,就会停止接收消息。 以下是使用简单测试用例重现问题的步骤:

1)创建一个简单的活动。

2)On Activity StartUp通过paho API连接到mosquitto测试服务器(test.mosquitto.org:1883)。

3)订阅一些主题。

4)向主题发布一些消息。

结果: Mqtt Client收到发布到该主题的所有消息。 现在

5)禁用移动设备上的互联网连接(移动数据)

6)向主题发布一些消息。

7)重新连接互联网。

结果:客户端在禁用Internet连接后未收到任何已发布的消息。

由于KeepAliveInterval一直保持高值(30分钟),它应该在重新连接到互联网后收到所有消息。

相同的用例(相同的代码)适用于简单的java项目(非android),我在我的笔记本电脑上禁用互联网来运行用例。

知道为什么它不能在Android设备上工作??? 我错过了什么吗?

注意:

1)使用mqtt-client-0.4.1

2)Android目标API级别11

3)测试期间不要将设备置于睡眠模式。

4)在connectionLost回调中没有得到任何调用,并且mqtt回调的所有4个线程都在整个测试用例中运行,即mosquitto服务器的连接是完整的。

Java客户端库在一定程度上受底层网络API的支配。 调用publish时,它会将MQTT数据包写入套接字。 如果该写入失败,则将调用连接丢失,如果该写入有效,则客户端库将继续。 您看到的行为差异是因为网络库在这些情况下表现不同。

MQTT keepalive间隔旨在帮助解决这个问题。 在某些情况下,TCP连接可能看起来是实时的。 这在移动或卫星连接设备上尤其可行 – 您不能指望网络API在所有情况下都能完全相同。 Keepalive将ping数据包发送到服务器并期望响应 – 如果未收到响应,则假定会话已关闭。

如果将keepalive间隔设置为10秒,则应在15到20秒内将连接识别为已断开。

您可以将MqttCallback侦听器附加到MqttAsyncclient。 它有回调方法连接丢失,当连接丢失事件发生或paho断开时将调用它。

为了解决这个问题,每当互联网连接重新启动时,我都必须向代理进行显式ping操作(以及等待ping响应的计时器)。 如果ping失败或计时器熄灭,我强行终止现有连接(disconnectForcibly),然后显式调用connectionLost方法。 (然后仅从connectionLost方法重新连接)。

在您的服务中: –

//Receiver that notifies the Service when the phone gets data connection private NetworkConnectionIntentReceiver netConnReceiver; 

创建以下类: –

 /* * Called in response to a change in network connection - after losing a * connection to the server, this allows us to wait until we have a usable * data connection again */ class NetworkConnectionIntentReceiver extends BroadcastReceiver { private static String TAG ="NetworkConnectionIntentReceiver"; @Override public void onReceive(Context ctx, Intent intent) { // we protect against the phone switching off while we're doing this // by requesting a wake lock - we request the minimum possible wake // lock - just enough to keep the CPU running until we've finished PowerManager pm = (PowerManager) ctx.getSystemService(ctx.POWER_SERVICE); PowerManager.WakeLock wl = pm.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK, "MQTT"); wl.acquire(); Connection c = Connections.getInstance(ctx).getConnection(clientHandle); final ActionListener callback = new ActionListener(ctx, ActionListener.Action.CONNECT, clientHandle,null); c.getClient().setCallback(new MqttCallbackHandler(ctx, clientHandle,messenger_where_incoming_messages_tobe_sent)); c.getClient().connect(c.getConnectionOptions(), null, callback); /* The Above Reconnect Logic can be put up in a Reconnect() function. * OR WRITE Any Other LOGIC TO RECONNECT TO MQTT */ // we're finished - if the phone is switched off, it's okay for the CPU // to sleep now wl.release(); } 

现在调用以下方法在OnResume()或onCreate中适当的地方注册BroadcastReceiver。

 synchronized void handleNetworkChange() { // changes to the phone's network - such as bouncing between WiFi // and mobile data networks - can break the MQTT connection // the MQTT connectionLost can be a bit slow to notice, so we use // Android's inbuilt notification system to be informed of // network changes - so we can reconnect immediately, without // haing to wait for the MQTT timeout if (netConnReceiver == null) { netConnReceiver = new NetworkConnectionIntentReceiver(); registerReceiver(netConnReceiver, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION)); } } 

我修复了recconect bug如下(使用rxJava2,但不是必需的):

  public void reconnect() { Completable.create(emitter -> { while (!mqttClient.isConnected()) { mqttClient.connect(options, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { emitter.onComplete(); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { LogHelper.d(TAG,"try to connect failed"); } }); Thread.sleep(2000); } emitter.onComplete(); }) .subscribeOn(Schedulers.io()) .subscribe(); } 

和一个示例电话

 private BroadcastReceiver changeNetworkStateReceiver = new BroadcastReceiver() { @Override public void onReceive(Context context, Intent intent) { if (Objects.equals(intent.getAction(), NetworkStateReceiver.EVENT_CHANGE_NETWORK_STATE)) { if(Utils.isOnline(context)) { mqttClient.reconnect(); } } } };