使用单个连接实例实现Eclipse MQTT Android Client

我在我的应用程序中使用Eclipse Paho android mqtt服务。 我能够订阅并将消息发布到mqtt broker。 我在应用程序中有几个活动,当任何活动启动时,它使用mqttAndroidClient.connect(null, new IMqttActionListener() {}连接到代理mqttAndroidClient.connect(null, new IMqttActionListener() {}并在mqttAndroidClient.setCallback(new MqttCallback() {}获取响应。

我的问题:

  1. 这是实现android mqtt服务的正确方法吗?
  2. 有没有办法在整个应用程序中使用相同的连接和回调实例?

“更好”的方法是创建连接/重新连接到MQTT Broker的服务。

我创建了自己的名为MqttConnectionManagerService的服务,该服务维护和管理与代理的连接。

该解决方案的主要特点:

  1. 只要它还活着,服务就会维护一个实例。
  2. 如果服务被终止,Android会重新启动它(因为START_STICKY
  3. 设备启动时可以启动服务。
  4. 服务在后台运行,并始终连接以接收通知。
  5. 如果服务处于活动状态,再次调用startService(..)将触发其onStartCommand()方法(而不是onCreate() )。 在此方法中,我们只需检查此客户端是否已连接到代理,并在需要时连接/重新连接。

示例代码:

MqttConnectionManagerService

 public class MqttConnectionManagerService extends Service { private MqttAndroidClient client; private MqttConnectOptions options; @Override public void onCreate() { super.onCreate(); options = createMqttConnectOptions(); client = createMqttAndroidClient(); } @Override public int onStartCommand(Intent intent, int flags, int startId) { this.connect(client, options); return START_STICKY; } private MqttConnectOptions createMqttConnectOptions() { //create and return options } private MqttAndroidClient createMqttAndroidClient() { //create and return client } public void connect(final MqttAndroidClient client, MqttConnectOptions options) { try { if (!client.isConnected()) { IMqttToken token = client.connect(options); //on successful connection, publish or subscribe as usual token.setActionCallback(new IMqttActionListener() {..}); client.setCallback(new MqttCallback() {..}); } } catch (MqttException e) { //handle e } } } 

AndroidManifest.xml中

                  

MqttServiceStartReceiver

 public class MqttServiceStartReceiver extends BroadcastReceiver { @Override public void onReceive(Context context, Intent intent) { context.startService(new Intent(context, MqttConnectionManagerService.class)); } } 

在你的Activity的onResume()

 startService(new Intent(this, MqttConnectionManagerService.class)); 

这是我的MQTT客户端的Singleton实现:

  public class MQTTConnection extends ServerConnectionImpl { private static String TAG = MQTTConnection.class.getSimpleName(); private static Context mContext; private static MqttAndroidClient mqttAndroidClient; private static String clientId; private static MQTTConnection sMqttConnection = null; private MQTTConnection() { } public static MQTTConnection getInstance(Context context) { if (null == sMqttConnection) { mContext = context; init(); } return sMqttConnection; } public static void reconnectToBroker() { try { if (sMqttConnection != null) { sMqttConnection.disconnect(); } init(); } catch (Exception e) { e.printStackTrace(); } } private static void init() { sMqttConnection = new MQTTConnection(); setClientId(); connectToBroker(); } private static void connectToBroker() { String ip = STBPreferences.getInstance(mContext).getString(Constants.KEY_MQTT_SERVER_IP, null); if (ip == null) { ip = Constants.MQTT_SERVER_IP; } final String uri = Constants.MQTT_URI_PREFIX + ip + ":" + Constants.MQTT_SERVER_PORT; mqttAndroidClient = new MqttAndroidClient(mContext.getApplicationContext(), uri, clientId); mqttAndroidClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { if (reconnect) { LogUtil.d(TAG, "Reconnected to : " + serverURI); // Because Clean Session is true, we need to re-subscribe subscribeToTopic(); } else { LogUtil.d(TAG, "Connected to: " + serverURI); } } @Override public void connectionLost(Throwable cause) { LogUtil.d(TAG, "The Connection was lost."); } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { String messageReceived = new String(mqttMessage.getPayload()); LogUtil.d(TAG, "Incoming message: " + messageReceived); try { Gson gson = new Gson(); Message message = gson.fromJson(messageReceived, Message.class); // Here you can send message to listeners for processing } catch (JsonSyntaxException e) { // Something wrong with message format json e.printStackTrace(); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { LogUtil.d(TAG, "Message delivered"); } }); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setCleanSession(false); try { mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { LogUtil.d(TAG, "connect onSuccess"); DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions(); disconnectedBufferOptions.setBufferEnabled(true); disconnectedBufferOptions.setBufferSize(100); disconnectedBufferOptions.setPersistBuffer(false); disconnectedBufferOptions.setDeleteOldestMessages(false); mqttAndroidClient.setBufferOpts(disconnectedBufferOptions); subscribeToTopic(); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { LogUtil.d(TAG, "Failed to connect to: " + uri); } }); } catch (MqttException ex){ ex.printStackTrace(); } } public void publish(Message publishMessage) { try { Gson gson = new Gson(); String replyJson = gson.toJson(publishMessage); String publishTopic = clientId + Constants.MQTT_PUB_TOPIC_APPEND; MqttMessage message = new MqttMessage(); message.setPayload(replyJson.getBytes()); mqttAndroidClient.publish(publishTopic, message); LogUtil.d(TAG, "Message Published"); /*if(!mqttAndroidClient.isConnected()){ LogUtil.d(TAG, mqttAndroidClient.getBufferedMessageCount() + " messages in buffer."); }*/ } catch (MqttException e) { LogUtil.d(TAG, "Error Publishing: " + e.getMessage()); e.printStackTrace(); } catch (NullPointerException e) { e.printStackTrace(); if (mqttAndroidClient == null) { init(); } } } private static void subscribeToTopic() { try { String subscriptionTopic = clientId + Constants.MQTT_SUB_TOPIC_APPEND; mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { LogUtil.d(TAG, "subscribe onSuccess"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { LogUtil.d(TAG, "Failed to subscribe"); } }); } catch (MqttException ex){ System.err.println("Exception whilst subscribing"); ex.printStackTrace(); } } public void unSubscribe() { LogUtil.d(TAG, "unSubscribe"); final String topic = "foo/bar"; try { IMqttToken unsubToken = mqttAndroidClient.unsubscribe(topic); unsubToken.setActionCallback(new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { // The subscription could successfully be removed from the client LogUtil.d(TAG, "unSubscribe onSuccess"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { LogUtil.d(TAG, "unSubscribe onFailure"); // some error occurred, this is very unlikely as even if the client // did not had a subscription to the topic the unsubscribe action // will be successfully } }); } catch (MqttException e) { e.printStackTrace(); } } public void disconnect() { LogUtil.d(TAG, "disconnect"); try { IMqttToken disconToken = mqttAndroidClient.disconnect(); disconToken.setActionCallback(new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { // we are now successfully disconnected LogUtil.d(TAG, "disconnect onSuccess"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { LogUtil.d(TAG, "disconnect onFailure"); // something went wrong, but probably we are disconnected anyway } }); } catch (MqttException e) { e.printStackTrace(); } } private static void setClientId() { String srNo = STBPreferences.getInstance(mContext).getString(Constants.STB_SERIAL_NO, null); clientId = srNo; } private String getClientId() { if (clientId == null) { setClientId(); } return clientId; } @Override public boolean isInternetEnabled() { return NetworkUtility.isNetworkAvailable(mContext); } @Override public void sendMessage(Message message) { publish(message); } @Override public void reconnect() { reconnectToBroker(); } } 

这是消息模型。 根据您的需要更改Model类。

 public class Message { /** * Type of data */ @SerializedName("type") private String type; /** * Name of component */ @SerializedName("name") private String name; /** * Data in text format */ @Expose @SerializedName("data") private Object data; public Message(String type, String name, Object data) { this.type = type; this.name = name; this.data = data; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Object getData() { return data; } public void setData(Object data) { this.data = data; } @Override public String toString() { return "Message{" + "type=" + type + "\n" + "name=" + name + "\n" + "data=" + data.toString() + '}'; } } 

在您的活动中获取MQTT实例

 MQTTConnection mqttConnection = HTTPConnection.getInstance(mContext); 

发布消息

 mqttConnectin.sendMessage(new Message( ... )); 

编辑1:这是我的ServerConnectionImpl类供您参考。

 public class ServerConnectionImpl extends ConfigurationChangeListenerImpl implements ServerConnection { /** * Logging TAG */ private static final String TAG = ServerConnectionImpl.class.getSimpleName(); /** * List of all listener which are registered for messages received */ private static ArrayList sConfigurationChangeListeners = new ArrayList<>(); @Override public boolean isInternetEnabled() { return false; } @Override public ResponseData getSubscriptionDetails(String serialNumber) { return null; } @Override public void sendMessage(Message message, WebSocket webSocket) { } @Override public void sendMessage(Message message) { } @Override public void sendMessageToAll(Message message) { } //@Override public static void notifyListeners(int config, Message message, WebSocket wc) { switch (config) { case Configs.CAMERA: { for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) { l.onCameraServerChanged(); } break; } case Configs.GESTURE: { for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) { l.onGestureCommandServerChanged(); } break; } case Configs.MOTION_SENSOR: { for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) { l.onMotionSensorServerChanged(); } break; } case Configs.MESSAGE: { for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) { l.onMessageReceived(message, wc); } break; } } } /** * Adds listener to listen to messages. * * @param listener */ @Override public synchronized void addListener(ConfigurationChangeListenerImpl listener) { LogUtil.d(TAG, "addListener()"); if (listener == null) { throw new IllegalArgumentException("Invalid listener " + listener); } sConfigurationChangeListeners.add(listener); } /** * Removes the listener * * @param listener */ @Override public synchronized void removeListener(ConfigurationChangeListenerImpl listener) { LogUtil.d(TAG, "removeListener()"); if (listener == null) { throw new IllegalArgumentException("Invalid listener " + listener); } sConfigurationChangeListeners.remove(listener); } @Override public void updateState() { } @Override public void reconnect() { } 

}

您可以将自己的实现用于ServerConnectionImpl类。