RAPIFIRE

Blog

How to Send Commands to Your MQTT Client

This blog shows how to control your MQTT client by sending commands to it on the MQTT topic known as the commands channel in RAPIFIRE. This LogErrorsPublisherCmd class to allow you to remotely turn on/off publishing and to update the filter word from "Exceptions" to any new value such as "ERROR" using a MQTT tool such as mqtt-spy. Here are the steps on how to control your client:

  • Your MQTT client needs the Commands Receiving Channel value shown in the API ACCESS KEYS AND CHANNEL NAMES in order to subscribe to receive remote commands. Login and go your Server Log Exceptions Thing details page. Note the value of the commands channel for the next step.

  • Copy the code below and replace the value FILL_IN fields with the values from the API ACCESS KEYS AND CHANNEL NAMES values in the dashboard. The subscribes to the commands channel and sets the callback function to itself. When a command message arrives on this channel the callback method messageArrived() is called. The message is parsed and the command is executed to change app's state. The commands are cmdPublishON for turning on publishing to the data channel, cmdPublishOFF for disabling publishing and cmdFilterWord=newWord for updating the filter word to trigger publishing the next 4 lines.
import java.io.BufferedReader;  
import java.io.FileReader;  
import java.io.IOException;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;  
import org.eclipse.paho.client.mqttv3.MqttCallback;  
import org.eclipse.paho.client.mqttv3.MqttClient;  
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;  
import org.eclipse.paho.client.mqttv3.MqttException;  
import org.eclipse.paho.client.mqttv3.MqttMessage;  
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

// Used for monitoring the exceptions in a log file 
// which are published to a RAPIFIRE topic channel 
// and then emailed to a user with a cloud code script.
// 
// Command control of the publisher is
// by it subscribing on the command channel
// and interpreting the command messages received on that channel.

public class LogErrorsPublisherCmd implements MqttCallback {

    String broker = "tcp://mqtt.rapifire.com:1883"; // RAPIFIRE broker URL and
                                                    // port
    String clientId = "LogErrorsPublisherCmd";

    String userName = "FILL_IN_ThingId"; // ThingId
    char[] password = "FILL_IN_Thing_Token".toCharArray(); // Thing Token
    String topic = "FILL_IN_Data_Publish_Channel"; // Data Publish Channel
    String commandTopic = "FILL_IN_Command_Receiving_Channel"; // Command Receiving Channel


    boolean isPublishON = true;

    String filterWord = "Exception";

    MqttClient client;

    public LogErrorsPublisherCmd() {
        super();

    }

    public void connect() {

        try {

            MemoryPersistence persistence = new MemoryPersistence();
            client = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(userName);
            connOpts.setPassword(password);
            connOpts.setCleanSession(false);
            connOpts.setKeepAliveInterval(60 * 5);
            connOpts.setConnectionTimeout(60 * 10);
            client.connect(connOpts);

        } catch (MqttException me) {
            me.printStackTrace();
            System.exit(0);
        }

    }

    public void publish(String content) {

        int maxSize = 8000; 
        String part = content;
        if (content.length() > maxSize) {
             part = content.substring(0, maxSize);
        }
        // If publishing is turned off do not publish it.
        if (!isPublishON) {
            return;
        }

        try {

            String formatedText = "{\"e\":[{\"n\":\"error\", \"sv\":\"" + part.replaceAll("", "") + "\"}]}";
            MqttMessage message = new MqttMessage(formatedText.getBytes());
            message.setQos(0);

            client.publish(topic, message);

        } catch (MqttException me) {
                me.printStackTrace();
                System.exit(0);
        }

    }

    public void subscribe() {

        try {
            client.setCallback(this);
            client.subscribe(commandTopic, 0);

        } catch (MqttException e) {
            e.printStackTrace();
            System.exit(0);
        }

    }

    public void disconnect() {

        try {
            client.disconnect();
        } catch (MqttException me) {
            me.printStackTrace();
            System.exit(0);
        }


    }

    public void unsubscribe() {

        try {
            client.unsubscribe(commandTopic);

        } catch (MqttException me) {
            me.printStackTrace();
            System.exit(0);
        }

    }

    @Override
    public void messageArrived(String topic, MqttMessage receivedMsg) throws Exception {
        String command = new String(receivedMsg.getPayload());
        if (command.contains("cmdPublishOFF")) {
            isPublishON = false;
        } else if (command.contains("cmdPublishON")) {
            isPublishON = true;
        } else if (command.contains("cmdUpdateFilterWord")) {
            int startIndex  = command.indexOf("=");
            if (startIndex == -1) {
                return;
            }
            String newFilterWord = command.substring(startIndex + 1);

            if (newFilterWord.isEmpty()) {
                return;
            }
            filterWord = newFilterWord;
        }

    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        // TODO Auto-generated method stub

    }

    public static void main(String[] args) throws IOException {

        long sleepTime = 1000;
        int numExtractedLine = 4;

        if (args.length > 0) {
            if (args.length > 1) {
                sleepTime = Long.parseLong(args[1]) * 1000;

            }
            String filename = args[0];
            BufferedReader input = new BufferedReader(new FileReader(filename));

            String currentLine = null;

            int linesAfterError = 0;
            String exception = "";
            boolean ok = false;

            LogErrorsPublisherCmd publisher = new LogErrorsPublisherCmd();

            publisher.connect();
            publisher.subscribe();


            boolean foundError = false;
            while (true) {

                if ((currentLine = input.readLine()) != null) {

                    if (!foundError && currentLine.contains(publisher.filterWord)) {
                        exception = currentLine;
                        linesAfterError++;
                        foundError = true;

                    } else if (foundError && (linesAfterError > 0) && (linesAfterError < numExtractedLine)) {
                        exception = exception + "\n" + currentLine;
                        linesAfterError++;

                    } else if (foundError && (linesAfterError == numExtractedLine)) {

                        publisher.publish(exception);

                        linesAfterError = 0;
                        exception = "";
                        foundError = false;
                    }
                    continue;
                }

                try {
                    Thread.sleep(sleepTime);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }

            }
            input.close();

            publisher.unsubscribe();
            publisher.disconnect();

        }
    }

    @Override
    public void connectionLost(Throwable cause) {
        // TODO Auto-generated method stub

    }
}
  • Compile the class LogErrorPublisher.

  • Run the LogErrorPublisher from a java IDE or from the command line on the server log file as below.

java -cp org.eclipse.paho.client.mqttv3-1.0.2.jar:. LogErrorsblisher  /opt/myapp/serverapp.log  
  • Click on Users and note your Bob the User's user ID and user token password. Click on the red padlock to see the value of the user token password.

  • Start a standalone MQTT tool such mqtt-spy, or MQTT Inspector. Create a new connection using user ID, user token, and broker URL mqtt.sentaca.com and port 1883.

  • Publish the command onPublishOFF to the Commands Receiver channel from the API ACCESS KEYS AND TOPIC NAMES. This will disable publishing the exceptions and no email alerts will be sent when a new exception is written to the log file.

  • Publish the command onPublishON. This will enable publishing the exceptions, and email alerts will be sent every 10 minutes when a new exception is written to the log file.

  • Publish the command onFilterWord=error. This will change the filter word to error from Exception and the publishing will be triggered when this word is encountered while reading the log file.

That's it. You have now mastered remote device control using MQTT.

Author image
About Mark Garzone