Install mosquitto (mqtt) server on CentOS to publish IoT data

Since a long to time I did not post about MQTT … The main reason was I uses MQTT as a protocol to publish data directly from a device but in a centralized environment like SigFox / LoraWan you can’t use it directly on the device. Actually I have some devices communicating with a backend and the question about how to provide these information to the customers of my service are raising. To match this pattern I have to ways : providing and API where the information are pulled by the customer and MQTT where the information are pushed to the customer.

One choice is not against the other one, I had the two kind of customers. For this reason I will describe how we can implement a MQTT server to push sigfox data device as json content.

Installing and configuring MQTT on CentOs7

1 – Install mosquitto (open source MQTT) server

# yum install mosquitto

2 – Configure mosquitto

Edit the /etc/mosquitto/mosquitto.conf

We will have persistent storage for client subscribing to the queues. It sounds good to activate client expiration to ensure we won’t have unlimited persistence.

persistent_client_expiration 15d

The messages are firstly stored in memory then saved on disk after the autosave_interval second. The default value is 30 minutes (1800s) and it create a risk of large loss. A reduction to 1 minute sound safer.

autosave_interval 60
persistence true
persistence_file mosquitto.db
persistence_location /var/lib/mosquitto/

In a first step we will not create authentication or ssl configuration. We will activate it later.

Create the directory for persistence DB

mkdir /var/lib/mosquitto/
chown mosquitto:mosquitto /var/lib/mosquitto/

3 – Start mosquitto

Add mosquitto to the list of service to be started on boot and start it.

# systemctl restart mosquitto
# systemctl enable mosquitto

Create your topic tree

Communication on MQTT is managed by topic, we have 4 level of topic working like a tree. Even if topics are dynamic it is good to decide what will be your organization. Here is the one I have chosen :

Group_of_device/API_Version/Device_ID/Type_of_Data

  • Group_of_device : allows me to have a root point by customer or type of devices
  • Device_ID : is corresponding to the device ID
  • API_Version : allows me to change the data format month after month but continuing to provide a compatible interface for the customer keeping the older versions
  • Type_of_Data : is corresponding to the data type like battery_level, gps_coordinate …

As an example I can have the following tree :

foxtrackr /
          - 1BBEFD /
                   - v1.0 /
                          - battery
                          - location
                          - movement

Customer can register to a list of devices/version and access the different messages.

Test mqtt server

We have two command to test the server : mosquitto_pub to publish message and mosquitto_sub to subscribe to a topic.

We first have to subscribe to a topic as an named client (-i myName) and avoid session cleaning ( -c ) to ensure the message will be stored once the mosquitto_sub client will be killed with a default QoS of 2 (-q 2) to ensure message persistence.

# mosquitto_sub -h localhost -t 'foxtrackr/1BBCE/v1.0/bat' -q 2 -c -i myName

This will create an channel in the mosquitto server for this client and the message publish on this topic will be stored on this channel waiting for being poped. The command will never end, waiting for message ; you can kill it with CTRL+C.

Now we can publish messages over this topic :

# mosquitto_pub -h localhost -t 'foxtrackr/1BBCE/v1.0/bat' -m "{ 'battery' : 10 }" -q 2
# mosquitto_pub -h localhost -t 'foxtrackr/1BBCE/v1.0/bat' -m "{ 'battery' : 11 }" -q 2
# mosquitto_pub -h localhost -t 'foxtrackr/1BBCE/v1.0/bat' -m "{ 'battery' : 12 }" -q 2

To finish we can start again the subscriber and get the messages from this channel :

# mosquitto_sub -h localhost -t 'foxtrackr/1BBCE/v1.0/bat' -q 2 -c -i myName
{ 'battery' : 10 }
{ 'battery' : 11 }
{ 'battery' : 12 }

Transform a message coming from SigFox to MQTT

A SigFox message is proceeded by a PHP script on my server so the first step is to install a php library to connect to MQTT.

The list of client libraries for MQTT are available on mqtt.org website. The library Mosquitto-PHP seems to be the most active one actually even if it still an alpha version it has 240+ commit.

For installation use pecl :

# yum install php-devel re2c mosquitto-devel
# pecl install Mosquitto-alpha

pecl will ask you for the libmosquitto path, just ENTER it will find it.

Then activate the extension in the php.ini file by adding the following line in the “dynamic extension” part of the file

;;;;;;;;;;;;;;;;;;;;;;
; Dynamic Extensions ;
;;;;;;;;;;;;;;;;;;;;;;
...
extension=mosquitto.so

Now you can restart php (php-fpm in my case)

# systemctl restart php-fpm

Try the php code working only with QoS = 2:

<?php
   // Once the client will be connected to the mosquitto server
   // we will fire a message with QoS 2 then we will exit
   $c = new Mosquitto\Client('myClient');
   $c->onConnect(function($rc,$message) use ($c) {
     if ( $rc == 0 ) {     // connection success        
        $c->publish('foxtrackr/1BBCE/v1.0/bat', "{ 'battery' : 11 }", 2,false);
        $c->exitLoop();        
     }
   });

   // We request to connect and we loop to have the library correctly manage the event
   $c->connect('localhost', 1883, 5);
   $c->loopForever();

   // After firing a message in QoS2 the library have to process the double ack requiered 
   // by this QoS level. As an impact we have to continue to run the background library
   // for a couple of cycles...
   for ($i = 0; $i < 50; $i++) {
     $c->loop(1);
   }
   // Now we can disconnect, the message may have been fully transfered.
   $c->disconnect();
?>

We can capture the messages with the previously used subscription command :

# mosquitto_sub -h localhost -t 'foxtrackr/1BBCE/v1.0/bat' -q 2 -c -i myName

This was for the basic approach, now we have to integrate it in a Sigfox callback with a potential mongodb storage in parallel of the messaging solution. We are going  to create a Message Queue for posting at end of sigfox processing :

    $with_mongo=false;
    $with_file=false;
    $with_mqtt=true;

    // -----------------------------------------------------
    // Mqtt message queuing management to prepare later post
    class Message {
        public $id;
        public $state = false;
        public $msg;
        public static function factory(Mosquitto\Message $msg, $state = false) {
            $message = new Message();
            $message->state = $state;
            $message->msg = $msg;
            $message->id = $msg->mid;
            return $message;
        }
    }   
    class MQ {
        public static $publish = array();
        public static function addPublish($topic, $payload) {
          $msg = Message::factory(new Mosquitto\Message());
          $msg->msg->topic = $topic;
          $msg->msg->payload = $payload;
          $msg->msg->qos = 2;
          self::$publish[] = $msg;
          // echo "added ".$topic." ".$payload;
        }
        public static function transmit() {
          $c = new Mosquitto\Client('myClient');
          $c->onConnect(function($rc,$message) use ($c) {
              if ( $rc == 0 ) {     
                 foreach (self::$publish as $msg) {  
                 //echo "publish";  
                    $c->publish( $msg->msg->topic,
                                 $msg->msg->payload,
                                 $msg->msg->qos,
                                 false );
                 }
                 $c->exitLoop();        
              }
          });
          $c->connect('localhost', 1883, 5);
          $c->loopForever();
          for ($i = 0; $i < 100; $i++) {
               $c->loop(1);
          }
          $c->disconnect();
        }
    }
    // -----------------------------------------------------

A problem with the previous script is on the client-id : as you can have concurrent request to push data, you have risk to re-open a connection to MQTT with the same id. In this case the MQTT kills the previously existing connection and you first page is getting disconnected. This is raising exception and potentially loose information. The solution is to use a dynamic clientId with a kind for random it generation like this :

  $uid = round(microtime(true) * 1000) % 10000; // "last digit second + ms"
  $pid = getmypid(); 
  $c = new Mosquitto\Client('id-client'.$uid.$pid);

Then decode message a queue the mqtt messages

   if ( $with_mongo == true ) {
       $m = new MongoClient();
       $db = $m->foxtrackr;

       $raw_collec   = $db->raw;
   }

   // Get Sigfox fields
   $_id = $_GET["id"];
   $_time = $_GET["time"];
   $_timeMs = $_time * 1000;
   $_heure = gmdate("Y-m-d H:i:s",$_time);
   $_signal = $_GET["signal"];
   $_station = $_GET["station"];
   $_slat = $_GET["lat"];
   $_slon = $_GET["lng"];
   $_rssi = $_GET["rssi"];
   $_data = $_GET["data"];
   $_ack = $_GET["ack"];
   $_avgSignal = $_GET["avgSignal"];
   $_duplicate = $_GET["duplicate"];
   $_sigSeq = $_GET["seq"];

   $base_topic="foxtrackr/" . $_id . "/v1.0/";

   $_raw = array(
        "topic" => "raw",
        "id" => $_id,
        "time" => $_timeMs,
        "data" => $_data,
        "sigSeq"  => $_sigSeq,
        "from" => $_station,
        "rssi" => $_rssi,
        "signal" => $_signal,
        "avgsignal" => $_avgSignal,
        "ack" => $_ack,
        "duplicate" => $_duplicate,
        "slat" => $_slat,
        "slng" => $_slon
   );
   if ( $with_mongo == true ) {
      $raw_collec->insert($_raw);
   }
   if ( $with_mqtt == true ) {
      MQ::addPublish($base_topic . "raw",json_encode($_raw));
   }

At end of the php file we are going to call the mqtt manager

// ---------------------------------------------------------------
// Lets push to Mqtt
MQ::transmit();

The json message is now saved in the mongoDB and also pushed to the mqtt channel for the subscribers.

Some administration point

Accepting subscribers to topic with retention also means a certain control on these subscription to correctly manage them. Mosquitto offers some topic you can register to get these metrics.

You can see the number of messages retained in the queues waiting for being send by subscribing to :

# mosquitto_sub -h localhost -t '$SYS/broker/retained messages/count'

You can also get the number of active subscription (not counting current subscription)

# mosquitto_sub -h localhost -t '$SYS/broker/subscriptions/count'

I do not exactly see how to clean the topic where client requests for retention. I assume we could manually do it with a subscription using ‘clean’ flag but for this we need the client name used. So I also not see how to get it. On this part the mosquitto broker does not have a lot of admin functions …

Add authentication

Now we can watch a little bit how to secure the mosquitto server by adding authentication… For this we can edit the mosquitto.conf file

clientid_prefixes id-                 // force a certain client ID name
allow_anonymous false                 // avoid anonymous connection
password_file /etc/mosquitto/passwd   // indicate the user:password file
acl_file /etc/mosquitto/aclfile       // indicate the acl file to use

Now we can create a user (every time you have to give a password)

# mosquitto_passwd  -c /etc/mosquitto/passwd admin    // create the first one
# mosquitto_passwd  /etc/mosquitto/passwd user        // create the subscriber one
# mosquitto_passwd  /etc/mosquitto/passwd nginx       // create the publisher one

Now we can edit the acl file

user admin                    // Admin can access $SYS and RW on foxtrackr
topic read $SYS/#
topic readwrite foxtrackr/#

user nginx                    // nginx can RW on foxtrackr and sub
topic readwrite foxtrackr/#

user user                     // user can readonly on foxtrackr and sub
topic read foxtrackr/#

Now we can test it after restarting the mosquitto service.

# mosquitto_sub -h localhost -t 'foxtrackr/1BBCE/v1.0/bat' -q 2 -c  \
    -i id-xxxx \            // the client id must start by id-
    -u user    \            // username
    -P 'user-password'      // password

Publish a message

# mosquitto_pub -h localhost -t 'foxtrackr/1BBCE/v1.0/bat' \ 
      -m "{ 'battery' : 12 }" -q 2 \ 
      -i id-nginx \
      -u nginx -P 'nginx-password'

The message can be read by restarting the mosquitto_sub command. We can also test that user is not allowed to publish on the topic. (you will see no error but no message will be stored in the topic.

No we can change the php script to user the authentication :

    public static function transmit() {
          $c = new Mosquitto\Client('id-user');
          $c->onConnect(function($rc,$message) use ($c) {
              if ( $rc == 0 ) {     
                 foreach (self::$publish as $msg) {  
                 //echo "publish";  
                    $c->publish( $msg->msg->topic,
                                 $msg->msg->payload,
                                 $msg->msg->qos,
                                 false );
                 }
                 $c->exitLoop();        
              }
          });
          $c->setCredentials('user','password');
          $c->connect('localhost', 1883, 5);
          $c->loopForever();
          for ($i = 0; $i < 100; $i++) {
               $c->loop(1);
          }
          $c->disconnect();
   }

We have modified the id to match the configuration file and add the setCredentials line for being connected.

Open the service to internet

Now we have to set the firewall to open the mqtt port for having our users able to connect to the server :

# firewall-cmd --permanent --add-port=1883/tcp
# firewall-cmd --reload

Now the mqtt service is accessible from internet

Add a web-socket listener

To activate a websocket listener on port 1884 you need to edit the /etc/mosquitto/mosquitto.conf file and change the following lines:

#port 1883          // comment this line
listener 1883       // add these 4 lines
listener 1884
protocol mqtt
protocol websocket

Then restart the mosquitto service

#kill -SIGHUP Pid_of_mosquitto

To finish, don’t forget to open the service on the firewall port 1884/tcp as described previously. You can test it with this webclient : https://hobbyquaker.github.io/mqtt-admin/ (this was the only one working for me)

This config is more for testing as the encryption is not activate at this point.

This entry was posted in IoT and tagged , , . Bookmark the permalink.

Leave a Reply

Your email address will not be published. Required fields are marked *