RAPIFIRE

Blog

IoT with Machine Learning

What is Machine Learning?

Machine learning is a subfield of computer science that evolved from the study of pattern recognition and computational learning theory in artificial intelligence. Machine learning is very useful in IoT since it can be used to learn hidden relationships in the Big Data which flows in the system and used to make real-time complex classifications for taking actions based on them.

Machine Learning Example with RAPIFIRE

There are many machine learning packages such as Apache Spark , Mahout, and Weka, each with its advantages and disadvantages. This blog shows how to use the easy-to-use powerful Java Statistical Analysis Tool library (JSAT) for a courier parcel pick up website app integrated with RAPIFIRE. The example illustrates how a user can get the estimated waiting time of a courier parcel pick up based on the GPS position of trucks. The machine learning component is used to get the waiting time classification (<15min, 15min-30min, >30min) based on the input of the truck's sensor data of distance and average speed.

For this example we will simulate a courier fleet of 3 trucks with telemetics box sensors. For those interested in using a real box, check out the telematics boxes by Telliq, Partitur, Geotab GO7, and Airco VT1310.

Below are the high level steps of the solution. Some of the steps are simulated to simplify the coding of the solution. The reader is left to complete those steps with a real implementation as an exercise.

High Level Solution

  • Truck telematics sensor sends to Truck Things data for GPS coordinates, speed, and truck status (Engine ON/OFF) (simulated sensor class).
  • Product Truck Fleet's cloud code filters out the out-of-service OFF engine trucks.
  • Cloud Cloud forwards truck sensor data to external truck server webapp thing (simulated webapp as Truck Server class).
  • User HTTP requests wait time from webapp with GPS coordinates parameters (simulated user request call).
  • Webapp converts truck GPS coordinates into distance to web user using crow's line of sight distance formula.
  • Webapp trains the classifer on startup initalization with the training dataset of distance km, speed, wait time seconds file.
  • Webapp runs machine learning classifer on input selected truck distance to get the output predicted waiting time.
  • Webapp HTTP responds with predicted time to the user (simulated response to the console).

Before we start coding we need to prepare a dataset in a file for training the JSAT machine learning classifer that will be run in the Truck Server. The dataset file format is ARFF (Attribute-Relation File Format) which is an ASCII text file. See here. Each row in the file has the truck distance to the user, speed of truck and the classification of the waiting time to reach the user as less than 15 minutes, between 15 and 30 minutes, and more than 30 minutes. Data for the dataset came from the formula of waiting time = distance / truck speed. Other input attributes such as the number of traffic light stops to the user, and traffic level at the truck's location could be added to improve the training of the classifier.

Below is the training file dataset trucktime.arff that needs to be saved to the Truck Server's webapp disk:

% Dataset for Truck Parcel Pick Up Waiting Time
% 
%
% Mark Garzone (2016). Rapifire.
%
%
% Truck waiting times minutes based on distance (km), average speed (km/hour)
% attribute.

@RELATION trucktime
@ATTRIBUTE distance real
@ATTRIBUTE speed real
@ATTRIBUTE class     {<15min, 15min-30min, >30min}

@DATA
10,50,<15min  
5,80,<15min  
1,60,<15min  
8,40,<15min  
16,100,<15min  
30,80,15min-30min  
8,35,<15min  
10,17,>30min  
2,5,15min-30min  
4,50,<15min  
6,70,<15min  
17,80,<15min  
25,90,15min-30min  
18,45,15min-30min  
19,33,>30min  
5,12,15min-30min  
6,67,<15min  
19,90,<15min  
6,90,<15min  
7,12,>30min  
10,5,>30min  
11,120,<15min  
23,110,<15min  
9,85,<15min  

Now let's create a new Product Truck Fleet in RAPIFIRE. Login into the dashboard and add the new product Truck Fleet in the Products page.

alt

Next go to the Things page and add the truck things Truck 1, Truck 2, Truck 3 and Webapp Thing Truck Server with the product assigned to it as Truck Fleet.

alt

Then click on the Cloud code page, select the Truck Fleet product and add the following cloud code snippet below. This cloud code filters out truck sensors events whose's engine is turned off and forwards the sensor events from the 3 truck things Truck 1, 2, 3 data channels to the Truck Server thing on the control channel.

function process(ctx, thing, events) {  
  var isEngineON = ctx.utils.get(events, "isEngineON");
  console.log("isEngineON is " + isEngineON.value);
  if (isEngineON.value === "true") {
    ctx.commands.send("jRONR01ix1hsH1qFAlPcO7", { "sensor": thing.name,  "events": events});
  } else {
    console.log("Truck Engine is OFF. Ignoring event.");
  }

}

Hit the red triangle button to save the code snippet. The page should look like below:

alt

Now we can code the Truck sensor class which simulates publishing randomized sensor data for GPS, speed and engine ON/OFF status using MQTT's Paho java client library to the Truck 1, 2, 3 things on their data channels. Create a new project in your favorite java IDE as a pom project and add JSAT jar dependency below to it:

<dependencies>  
  <dependency>
    <groupId>com.edwardraff</groupId>
    <artifactId>JSAT</artifactId>
    <version>0.0.3</version>
  </dependency>
</dependencies>  

Download and link in the org.eclipse.paho.client.mqttv3-1.0.2.jar in your project's build path. Finally add the following Truck sensor code below.

import java.io.IOException;  
import java.util.ArrayList;  
import java.util.List;  
import java.util.Random;

import org.eclipse.paho.client.mqttv3.MqttClient;  
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;  
import org.eclipse.paho.client.mqttv3.MqttException;  
import org.eclipse.paho.client.mqttv3.MqttMessage;  
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

// Used for simulating trucks sensor sending GPS latitude/longitude coordinates, speed, and engine status ON/OFF.

public class TruckSensor {

    String broker = "tcp://mqtt.rapifire.com:1883"; // RAPIFIRE broker URL and
                                                    // port
    String clientId;
    String userName; // ThingId
    char[] password; // Thing Token
    String topic; // Data Publish Channel

    MqttClient client;


    public TruckSensor(int truckNum) {
        super();
        clientId = "Truck " + truckNum;
        System.out.println("Initialize sensor " + clientId);

        if (truckNum == 1) {
            userName = "YqF8cxyTUi8tYZuZ2pPd58"; // ThingId
            password = "VALFMb9uiVhS".toCharArray(); // Thing Token
            topic = "/YqF8cxyTUi8tYZuZ2pPd58/data"; // Data Publish Channel
        } else if (truckNum == 2) {
            userName = "SooT3u90NY1k1JhcLYBzab"; // ThingId
            password = "U2q2QAbnOQyq".toCharArray(); // Thing Token
            topic = "/SooT3u90NY1k1JhcLYBzab/data"; // Data Publish Channel
        } else if (truckNum == 3) {
            userName = "WTnP19onZ3EC8nJKVkop88"; // ThingId
            password = "oxPfP9ve0RMR".toCharArray(); // Thing Token
            topic = "/WTnP19onZ3EC8nJKVkop88/data"; // Data Publish Channel
        } else {
            System.out.println("ERROR unknown sensor " + clientId);
        }

    }

    public boolean connect() {
        boolean ok = false;
        try {

            MemoryPersistence persistence = new MemoryPersistence();
            client = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(userName);
            connOpts.setPassword(password);
            connOpts.setCleanSession(false);
            connOpts.setKeepAliveInterval(60 * 5);
            connOpts.setConnectionTimeout(60 * 10);
            System.out.println("Connecting to Broker: " + broker);
            System.out.println("with ClientId:" + clientId);
            System.out.println("     ThingId:" + userName);
            System.out.println("     Thing Token:" + new String(password));
            System.out.println("     Channel:" + topic);

            client.connect(connOpts);
            System.out.println("Connected");
            ok = true;
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
            ok = false;
        }
        return ok;

    }

    public boolean publish(double latGPS, double longGPS, int speed, boolean isEngineON) {

        boolean ok = false;

        try {

            String formatedText = "{\"e\":[" + "{\"n\":\"latGPS\", \"v\":" + latGPS + "},"
                    + "{\"n\":\"longGPS\", \"v\":" + longGPS + "}," + "{\"n\":\"speed\", \"v\":" + speed + "},"
                    + "{\"n\":\"isEngineON\", \"sv\":\"" + isEngineON + "\"}" + "]}";

            System.out.println("publish msg: " + formatedText);
            MqttMessage message = new MqttMessage(formatedText.getBytes());
            message.setQos(0);

            client.publish(topic, message);
            System.out.println("Sensor data published: " + formatedText);

            ok = true;

        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
            ok = false;
        }

        return ok;

    }

    public boolean disconnet() {

        boolean ok = false;
        try {
            client.disconnect();
            System.out.println("Disconnected");
            ok = true;
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
            ok = false;
        }

        return ok;

    }

    public static int randInt(int min, int max) {

        // Usually this can be a field rather than a method variable
        Random rand = new Random();

        // nextInt is normally exclusive of the top value,
        // so add 1 to make it inclusive
        int randomNum = rand.nextInt((max - min) + 1) + min;

        return randomNum;
    }

    public static void main(String[] args) throws IOException {

        List<TruckSensor> trucks = new ArrayList<TruckSensor>();

        long sleepTime = 1000;
        int NUM_TRUCKS = 3;
        int NUM_DATA_POINTS = 1;

        boolean ok = false;
        for (int i = 0;i < NUM_TRUCKS; i++) {
            trucks.add(new TruckSensor(i + 1));

            ok = trucks.get(i).connect();
            if (!ok) {
                System.out.println("Failed to connect " + i  + ". Exiting!");
                System.exit(0);
            }
        }

        for (int i = 0; i < NUM_DATA_POINTS; i++) {

            for (int j = 0; j < NUM_TRUCKS; j++) {
                int speed = randInt(10, 100);
                double latGPS = 38.898556 + (randInt(-40,40) * 0.01);
                double longGPS = -77.037852 + (randInt(-40, 40) * 0.01);
                boolean isEngineON =  randInt(0, 1) == 1; // Randomize if the truck engine is on

                ok = trucks.get(j).publish(latGPS, longGPS, speed, isEngineON);
                if (!ok) {
                    System.out.println("Failed to publish truck " + j + ". Exiting!");
                    System.exit(0);
                }
            }


            try {
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }

        }

        for (int i = 0; i < NUM_TRUCKS; i++) {
            ok = trucks.get(i).disconnet();
            if (!ok) {
                System.out.println("Failed to disconnet " + i + ". Exiting!");
                System.exit(0);
            }
        }


    }
}

Now add the Truck Server class below to your project which subscribes to the command channel of the Truck Server thing. On receiving the truck GPS data and speed, it calculates the distance between the user and the truck and calls machine learning classifer to get the classification wait time.

import java.io.File;  
import java.io.IOException;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;  
import org.eclipse.paho.client.mqttv3.MqttCallback;  
import org.eclipse.paho.client.mqttv3.MqttClient;  
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;  
import org.eclipse.paho.client.mqttv3.MqttException;  
import org.eclipse.paho.client.mqttv3.MqttMessage;  
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import jsat.ARFFLoader;  
import jsat.DataSet;  
import jsat.classifiers.CategoricalResults;  
import jsat.classifiers.ClassificationDataSet;  
import jsat.classifiers.Classifier;  
import jsat.classifiers.DataPoint;  
import jsat.classifiers.bayesian.NaiveBayes;  
import jsat.linear.*;  
import jsat.classifiers.DataPoint;

// Used for getting the truck sensor data and running the machine learning classifier
// to get the predicted waiting time for the best truck.

public class TruckServer implements MqttCallback {

    String broker = "tcp://mqtt.rapifire.com:1883"; // RAPIFIRE broker URL and
                                                    // port
    String clientId = "TruckServer";
    String userName = "jRONR01ix1hsH1qFAlPcO7"; // ThingId
    char[] password = "v47XeKsBMEvR".toCharArray(); // Thing Token
    String topic = "/jRONR01ix1hsH1qFAlPcO7/data"; // Data Publish Channel
    String commandTopic = "/jRONR01ix1hsH1qFAlPcO7/commands"; // Command
                                                                // Receiving
                                                                // Channel

    MqttClient client;

    double userLatGPS = 0;
    double userLongGPS = 0;

    Classifier classifier;

    public TruckServer() {
        super();

        String nominalPath = "C:\\Users\\rziembicki\\Documents\\rapfire\\";
        File file = new File(nominalPath + "trucktime.arff");
        DataSet dataSet = ARFFLoader.loadArffFile(file);

        // We specify '0' as the class we would like to make the target class.
        ClassificationDataSet cDataSet = new ClassificationDataSet(dataSet, 0);
        classifier = new NaiveBayes();
        classifier.trainC(cDataSet);
        System.out.println("Done training classifier");

    }

    public boolean connect() {
        boolean ok = false;
        try {

            MemoryPersistence persistence = new MemoryPersistence();
            client = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(userName);
            connOpts.setPassword(password);
            connOpts.setCleanSession(false);
            connOpts.setKeepAliveInterval(60 * 5);
            connOpts.setConnectionTimeout(60 * 10);
            System.out.println("Connecting to Broker: " + broker);
            System.out.println("with ClientId:" + clientId);
            System.out.println("     ThingId:" + userName);
            System.out.println("     Thing Token:" + new String(password));
            System.out.println("     Data Channel:" + topic);
            System.out.println("     Command Channel:" + commandTopic);

            client.connect(connOpts);
            System.out.println("Connected");
            ok = true;
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
            ok = false;
        }
        return ok;

    }

    public void setUserLatGPS(double userLatGPS) {
        this.userLatGPS = userLatGPS;
    }

    public void setUserLongGPS(double userLongGPS) {
        this.userLongGPS = userLongGPS;
    }

    public boolean subscribe() {

        boolean ok = false;

        try {
            client.setCallback(this);
            client.subscribe(commandTopic, 0);
            System.out.println("Subscribed on " + commandTopic);
            ok = true;

        } catch (MqttException e) {
            e.printStackTrace();
        }

        return ok;
    }

    public boolean disconnect() {

        boolean ok = false;
        try {
            client.disconnect();
            System.out.println("Disconnected");
            ok = true;
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
            ok = false;
        }

        return ok;

    }

    public boolean unsubscribe() {

        boolean ok = false;
        try {
            client.unsubscribe(commandTopic);
            System.out.println("Unsubscribed");
            ok = true;
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
            ok = false;
        }

        return ok;

    }

    // @Override
    public void messageArrived(String topic, MqttMessage receivedMsg) throws Exception {
        String command = new String(receivedMsg.getPayload());
        System.out.println("Command msg arrived:'" + command + "'");


        String sensor = this.getSensorName(command);
        System.out.println("sensor is '" + sensor  + "'");

        String latGPS = this.getMsgValue("latGPS", command);
        System.out.println("latGPS value is '" + latGPS + "'");

        String longGPS = this.getMsgValue("longGPS", command);
        System.out.println("longGPS value is '" + longGPS + "'");

        double speed = new Double(this.getMsgValue("speed", command));
        System.out.println("speed value is '" + speed + "'");

        String isEngineON = this.getMsgValue("isEngineON", command);
        System.out.println("isEngineON value is '" + isEngineON + "'");

        double dist = distance(userLatGPS, userLongGPS, new Double(latGPS), new Double(longGPS), "K");

        System.out.println(dist + " Kilometers to user's GPS coordinate.");
        // Extract the data and run the classifier.

        double[][] dataValue = new double[1][2];   
        dataValue[0][0] = dist;
        dataValue[0][1] = speed;
        Matrix matrix = new DenseMatrix(dataValue);
        DataPoint dataPoint = new DataPoint(matrix.getRow(0));
        System.out.println("dataPoint:" + dataPoint);

        // Categorical Results contains the probability estimates for each
        // possible target class value.
        CategoricalResults predictionResults = classifier.classify(dataPoint);
        int predicted = predictionResults.mostLikely();
        System.out.println("Exact time (min): " + (dist / speed) * 60.0);
        if (predicted == 0) {
            System.out.println(sensor + "'s waiting time: <15min classification, probability:" + predictionResults.getProb(predicted));
        } else if (predicted == 1) {
            System.out.println(sensor + "'s waiting time: 15min-30min classification, probability:" + predictionResults.getProb(predicted));
        } else if (predicted == 2) {
            System.out.println(sensor + "'s waiting time: >30min classification, probability:" + predictionResults.getProb(predicted));
        } else {
            System.out.println(sensor + "'s unknown classification, probability:" + predictionResults.getProb(predicted));
        }

        System.out.println("----------------------------------------------------------------------------");
    }

    private String getMsgValue(String name, String command) {
        int nameEnd = command.lastIndexOf(name);
        int valueStart = command.indexOf("value", nameEnd);
        int valueEnd = command.indexOf("}", valueStart);
        String value = command.substring(valueStart + 7, valueEnd);

        return value;
    }

    private String getSensorName(String command) {
        int nameEnd = command.indexOf("sensor");
        String value = command.substring(nameEnd + 9, nameEnd + 16);

        return value;
    }

    // @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        // TODO Auto-generated method stub

    }

    private static double distance(double lat1, double lon1, double lat2, double lon2, String unit) {
        double theta = lon1 - lon2;
        double dist = Math.sin(deg2rad(lat1)) * Math.sin(deg2rad(lat2))
                + Math.cos(deg2rad(lat1)) * Math.cos(deg2rad(lat2)) * Math.cos(deg2rad(theta));
        dist = Math.acos(dist);
        dist = rad2deg(dist);
        dist = dist * 60 * 1.1515;
        if (unit == "K") {
            dist = dist * 1.609344;
        } else if (unit == "N") {
            dist = dist * 0.8684;
        }

        return (dist);
    }

    // This function converts decimal degrees to radians
    private static double deg2rad(double deg) {
        return (deg * Math.PI / 180.0);
    }

    // This function converts radians to decimal degrees
    private static double rad2deg(double rad) {
        return (rad * 180 / Math.PI);
    }

    public static void main(String[] args) throws IOException {

        long sleepTime = 1000;
        boolean ok = false;

        TruckServer subscriber = new TruckServer();

        // User's GPS position
        subscriber.setUserLatGPS(38.898556);
        subscriber.setUserLongGPS(-77.037852);

        ok = subscriber.connect();
        if (!ok) {
            System.out.println("Failed to connect. Exiting!");
            System.exit(0);
        }

        ok = subscriber.subscribe();
        if (!ok) {
            System.out.println("Failed to subscribe. Exiting!");
            System.exit(0);
        }
        int sec = 0;
        while (true) {

            try {
                System.out.print("Going to sleep...");
                Thread.sleep(sleepTime);
                System.out.println("awoke at sec: " + sec);
                sec++;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }

        }

        ok = subscriber.unsubscribe();
        if (!ok) {
            System.out.println("Failed to unsubscribe. Attempting to disconnect.");
        }

        ok = subscriber.disconnect();
        if (!ok) {
            System.out.println("Failed to disconnect. Exiting!");
            System.exit(0);
        }

    }

    // @Override
    public void connectionLost(Throwable cause) {
        // TODO Auto-generated method stub

    }
}

Next run the Truck Server class followed by running the Truck Sensor class. You will then get the following truck data sent as shown below in the Raw Data view of things Truck 1, 2, 3:

alt

alt

alt

Finally the IDE console of the Truck Server class shows that sensor Truck 2's waiting time is the 15min-30min classification while for Truck 3's waiting time is the <15min classification. Both classifier values match the exact time calculated with the formula time = distance/speed. Truck 1's data has been filtered out by the Cloud code since the isEngineON status was false.

alt

That's it for machine learning. As you can see, using sophisticated AI in IoT is not so hard. Try it now yourself with RAPIFIRE.

Author image
About Mark Garzone