[SOLVED] Arduino MQTT on Ubidots

Please allow me to report on my finding using

“MQTT” compared to “HTTP/GET” API with my Arduino MKR1000 connecting to Ubidots:

1.) On the second look, MQTT is much easier to use

than getValue() / add() resp sendall() using HTTP/GET
As a beginner with IoT, I (also) started with HTTP/GET, because a lot of examples were brought up by internet search and copy&paste seemed to be easy.
In principle my application is working with HTTP/GET, but as the project grew i.e.:

  • more (Ubidot control) buttons were to be integrated
  • more variables had to be transfered
    Continous polling (=“Arduino asking Ubidots whether a control changed”) and updating created a lot of traffice and eat resources.
    By the way, I also run into inconsistency when buttons were changed on the hardware AND the Dashboard, since the latency of polling was to high.

2.) MQTT.subscribe() makes life so much easier reading (Ubidot control) buttons

If you subscribe() to a variable, your Arduino receives an event and the Event-function is executed.
That’s it. No polling. No CheckButtons() every x seconds. No HTTP traffic.
Why didn’t I knew earlier … ARGH.

  1. Stabiliy of HTTP/GET on Arduino

Summary: I cannot live with 0.6% ERRORs
I did quite systematic testing on roundtrip times, timeouts and errors while server resp. client connect as well as evaluating corrupted reply[]-string. In brief:

  • 99% resp. 95% of the GetDot (:= getValue()) resp. SendDots (:= sendAll()) a received within one second. Great.
  • However 0,2% resp. 0,6% of the GetDot resp. SendDots take longer than 5s resulting in ERROR. This attributes to 8 out 4250 resp 5 out 880 accesses.

Unfortunatelly these ERRORs very often lead to system hangs, which sometimes could not be recovered by my “Hardware Watchdot := digitalwrite LOW of a Pin connected to RESET pin”.

  1. Stability of MQTT on Arduino

I used a test sketch, which comprises:

  1. write/publish a test-variable every 10 sec to Ubidos and display it on the dashboard as chart

  2. write/publish a state-variable every 1 sec to Ubidots and have this connected to a control slider. This state-variable is increase +1 after every post. If the user is changing the state slider on the dashboard, the Arduino shall receive this new value and continue calculating with this.
    This is basically a combination of read/write = publish/subscribe. As you know, it can be done in HTTP/GET, but is pretty tricky due to the latency and “collisions”. With MQTT there are still collisions, which could be sorted out by the “timestamp” in the reply string, but are less critical due to the short latency.

  3. Have a control button on the dashboard and send to the Arduino ("led-variable) to switch the built-in LED of the arduino…
    If you managed to #2, its a piece of cake and reacts pretty fast. NICE.

Stability:
I tested the sketch producing more than 12000 publish cycles within 3.3 hours using the “PubSubClient”-library.

NO ERRORs. NO HANGs.

This is much, much better for my type of applications, with less WiFi traffic and resources (memory, system load etc.).
If I would have known earlier …


small print:
I observed different results with different MQTT-libraries for Arduino. I hope to find time to post about later.

Arduino MKR1000 MQTT Test Sketch

Please find the discussed test vehicle below:

// MQTT Client Benchmark for ARDUINO
// This example uses an Arduino/Genuino MKR1000
// Should also work with Arduino/Genuino Zero together with a WiFi101 Shield
//
#define UbidotsContextSize 128
#define UbidotsBufferSize  256
#define UbidotsTopicSize    81
#define MQTT_SERVER "mqtt://things.ubidots.com"

// #define _PAHO

#ifdef _PAHO
// if you use #define _PAHO, then MQTT by Joel Gaehwiler is used
// the MQTT library for Arduino based on the Eclipse Paho projects
// tested version: 1.10.1 
// see http://www.eclipse.org/paho/clients/c/embedded/ for details
// Don't forget to add in your Library Manager: MQTT 
#else
// if you do NOT #define _PAHO, then PubSubClient by Nick O'Leary is used
// its subtitled a client library for MQTT messiging
// tested version: 2.6.0 
// see http://pubsubclient.knolleary.net/ for details
// Don't forget to add in your Library Manager: PubSubClient 
#endif

// Used in MQTTClient.h and in this Sketch:
#define MQTT_BUFFER_SIZE 256

#include <SPI.h>
#include <WiFi101.h>
#ifdef _PAHO
  #include <MQTTClient.h>
  char _PahoTopic[UbidotsTopicSize];
#else
  // see PubSubClient.h::39ff
  // Arduino WiFi Shield - if you want to send packets > 90 bytes with this shield, enable the MQTT_MAX_TRANSFER_SIZE define in PubSubClient.h.
  #define MQTT_MAX_TRANSFER_SIZE 256
  #define MQTT_MAX_PACKET_SIZE 254
  #include <PubSubClient.h>
#endif

// This extra include, is due to avoiding String() in loops 
#include <avr/dtostrf.h>

#define VERBOSE

char LABEL_DATA_SOURCE[]  = "/v1.6/devices/MQTT_Test";

#define DOT_STATE 0
#define DOT_TEST  1
#define DOT_LED   2
#define DOTARRAY_SIZE 3

typedef struct dot_t {
  char* label;
  char  id[25];
  float value;
  unsigned long count;
  char* context;
  unsigned long timestamp;  // This should be changed to long or to long long and Milliseconds
  bool  valid;
  char* topic;
  char buffer[UbidotsBufferSize];
} dot_t;

dot_t dot[] {
  {"state", "xxxxxxxxxxxxxxxxxxxxxxxx", 0.0, 0L, NULL, 0L, false, "/v1.6/devices/MQTT_Test/state", ""},
  {"test",  "xxxxxxxxxxxxxxxxxxxxxxxx", 0.0, 0L, NULL, 0L, false, "/v1.6/devices/MQTT_Test/test", ""},
  {"led",   "xxxxxxxxxxxxxxxxxxxxxxxx", 1.0, 0L, NULL, 0L, false, "/v1.6/devices/MQTT_Test/led", ""}
};

char ssid[] = "xxxxxxxx"; //  your network SSID (name)
char pass[] = "xxxxxxxxx";    // your network password (use for WPA, or use as key for WEP)


WiFiClient net;

#ifdef _PAHO
  MQTTClient client;
  MQTTMessage _msg; // Required to set retained-Flag, since there is no direct method available
#else
  PubSubClient  client(net);
#endif


void PrintDot(int index){
  Serial.println("Dot["+String(index,DEC)+"] = {");

  Serial.print("char* label     = \"");
  if(dot[index].label)  Serial.print(dot[index].label);
  else Serial.print(" (char *)NULL");
  Serial.println("\"");

  Serial.print("char id[25]     = \"");
  if(dot[index].id)  Serial.print(dot[index].id);
  Serial.println("\"");

  Serial.print("float value     = ");
  Serial.println(dot[index].value);

  Serial.print("ulong count     = ");
  Serial.println(dot[index].count);

  Serial.print("char* context   =");
  if(dot[index].context)  Serial.println(dot[index].context);
  else Serial.println(" (char *)NULL");

  Serial.print("ulong timestamp = ");
  Serial.println(dot[index].timestamp);

  Serial.print("bool valid      = ");
  if(dot[index].valid) Serial.println("true");
  else Serial.println("false");

  Serial.print("char* topic     = \"");
  if(dot[index].topic)  Serial.print(dot[index].topic);
  else Serial.print(" (char *)NULL");
  Serial.println("\"");

  Serial.print("char buffer["+String(UbidotsBufferSize,DEC)+"]= \"");
  if(dot[index].buffer)  Serial.print(dot[index].buffer);
  Serial.println("\"");

  Serial.println("}");
}

void setup() {
  pinMode(LED_BUILTIN, OUTPUT);
  digitalWrite(LED_BUILTIN, dot[DOT_LED].value*HIGH);
  
  Serial.begin(9600);
  for (int s = 0; s < 100; s++) {
    if (Serial) break; // wait for serial port to connect. Needed for native USB port only
    delay(100);
  }

  Serial.println("MQTT_BUFFER_SIZE := "+String(MQTT_BUFFER_SIZE,DEC));
  
  WiFi.begin(ssid, pass);
#ifdef _PAHO
  client.begin(MQTT_SERVER, net); // Port is 1883 by default
  Serial.println("Using PaHo-Library: <MQTTClient.h>");
#else
  client.setServer(MQTT_SERVER, 1883);
  client.setClient(net);
  client.setCallback(callback);
  Serial.println("Using PubSub-Library: <PubSubClient.h>");
#endif

  reconnect();
}

void reconnect() {
  Serial.print("Checking wifi...");
  while (WiFi.status() != WL_CONNECTED) {
    Serial.print(".");
    delay(1000);
  }
  Serial.println("OK");

  Serial.print("Connecting MQTT ...");
#ifdef _PAHO
  // boolean connect(const char * clientId, const char* username, const char* password);
  while (!client.connect("mqtt://things.ubidots.com", "PUT YOUR UBIDOT TOKEN HERE", "")) {
#else
  // boolean connect(const char* id, const char* user, const char* pass);
  while (!client.connect("mqtt://things.ubidots.com", "PUT YOUR UBIDOT TOKEN HERE", "")) {
#endif
    Serial.print(".");
    delay(1000);
  }

  Serial.println("connected!");

  int valid;

 // Generate Subscribe for "/v1.6/devices/MQTT_Test/test"
  Serial.print("Subscribe " + String(dot[DOT_TEST].topic) + "...");
#ifdef _PAHO
  valid = client.subscribe(dot[DOT_TEST].topic);
#else
  valid = client.subscribe(dot[DOT_TEST].topic);
#endif
  if (valid == 1) Serial.println(" OK");
  else Serial.println(" ERROR [" + String(valid, DEC) + "]");

  delay(1000);
  
  // Generate Subscribe for "/v1.6/devices/MQTT_Test/state"
  Serial.print("Subscribe " + String(dot[DOT_STATE].topic) + "...");
#ifdef _PAHO
  valid = client.subscribe(dot[DOT_STATE].topic);
#else
  valid = client.subscribe(dot[DOT_STATE].topic);
#endif
  if (valid == 1) Serial.println(" OK");
  else Serial.println(" ERROR [" + String(valid, DEC) + "]");

  delay(1000);
  
  // Generate Subscribe for "/v1.6/devices/MQTT_Test/led"
  Serial.print("Subscribe " + String(dot[DOT_LED].topic) + "...");
#ifdef _PAHO
  valid = client.subscribe(dot[DOT_LED].topic);
#else
  valid = client.subscribe(dot[DOT_LED].topic);
#endif
  if (valid == 1) Serial.println(" OK");
  else Serial.println(" ERROR [" + String(valid, DEC) + "]");

  delay(1000);
}

int lines = 0;
float value;

unsigned long lastMillis = 0;
unsigned long lastMillis_Test = 0;
unsigned long lastMillis_State = 0;
unsigned long Interval_Test = 10000;
unsigned long Interval_State = 1000;

void loop() {
char *pch;
  
  client.loop();

  if (!client.connected()) {
    reconnect();
  }

  // MQTT publish a message roughly every 10 seconds
  if (millis() - lastMillis_Test > Interval_Test) {
    lastMillis_Test = millis();
    pch=dot[DOT_TEST].buffer;
    strcpy(pch, "{\"");
    strcat(pch, dot[DOT_TEST].label);
    strcat(pch, "\": ");
    itoa(lastMillis_Test,&pch[strlen(pch)], 10);
    strcat(pch, "}");

    Serial.println(String(lines++, DEC) + " [" + String(lastMillis_Test, DEC) + "] : >" + LABEL_DATA_SOURCE + "<\t>>" + String(dot[DOT_TEST].buffer) + "<<" );

    client.publish(LABEL_DATA_SOURCE, dot[DOT_TEST].buffer);
  }

  
  // MQTT publish a message roughly every 1 seconds
  if (millis() - lastMillis_State > Interval_State) {
    lastMillis_State = millis();
    dot[DOT_STATE].value += 1.0;
    if (dot[DOT_STATE].value > 1000.0) dot[DOT_STATE].value = 0.0;
    pch=dot[DOT_STATE].buffer;
    strcpy(pch, "{\"");
    strcat(pch, dot[DOT_STATE].label);
    strcat(pch, "\": ");
    dtostrf(dot[DOT_STATE].value, 8, 5, &pch[strlen(pch)]);
    strcat(pch, "}");
 
    Serial.println(String(lines++, DEC) + " [" + String(lastMillis_State, DEC) + "] : >" + LABEL_DATA_SOURCE + "<\t>>" + String(dot[DOT_STATE].buffer) + "<<" );
#ifdef _PAHO
    _msg.topic = LABEL_DATA_SOURCE;
    _msg.retained = true;
    _msg.payload = dot[DOT_STATE].buffer;
    _msg.length = strlen(dot[DOT_STATE].buffer);
    client.publish(&_msg);
#else
  //    boolean publish(const char* topic, const char* payload, boolean retained);
    client.publish(LABEL_DATA_SOURCE, dot[DOT_STATE].buffer, true);
#endif
  }

//  PrintDot(0);
//  PrintDot(1);
//  PrintDot(2);
}


#ifdef _PAHO
// This is the default function of Paho-MQTT Client for callbacks
void messageReceived(String topic, String payload, char * bytes, unsigned int length) {
  Serial.print("incoming: ");
  Serial.print(topic);
  Serial.print(" [");
  Serial.print(String(length, DEC));
  Serial.print("] - ");
  Serial.print(payload);
  Serial.print(" - ");
  Serial.print(bytes);
  Serial.println();

  topic.toCharArray(_PahoTopic, UbidotsTopicSize);
  parseMessage(_PahoTopic, bytes, length);
}
#else
// This is the PubSubClient-Callback
void callback(char* topic, byte* bytes, unsigned int length) {
  Serial.print("incoming: ");
  Serial.print(topic);
  Serial.print(" [");
  Serial.print(String(length, DEC));
  Serial.print("] - ");
  Serial.print((char*)bytes);
  Serial.println();

  parseMessage(topic, (char*)bytes, length);
}
#endif

bool parseMessage(char * topic, char * bytes, unsigned int length) {
// Please note, that I prefer to NOT use String-type here due to potential heap fragmentation
// Compromises were made in setup() resp. loop() to increase readability, but not here
// Ref for background:  https://hackingmajenkoblog.wordpress.com/2016/02/04/the-evils-of-arduino-strings/
// Thanks to Majenko for his nice article
//

int index=-1;
int context_length;
char *pch;
char *rest;
char *end;

bool valid=true;
  
  pch = strrchr(topic,'/');
  if (pch==NULL){
      Serial.print("ERROR: Topic invalid: >");
      Serial.print(topic);
      Serial.println("<");
      goto CLEANUP_ON_ERROR;
  }
  strcpy(topic, pch+1);
  Serial.print("Topic: >");
  Serial.print(topic);
  Serial.print("<\t");

  for (int i=0;i<DOTARRAY_SIZE;){
     if (strcasecmp(dot[i].label, topic)==0){
      index=i;
      break;
    }
    i++;
  }
  if (index<0){
    Serial.println("ERROR: Could NOT associate topic");    
    goto CLEANUP_ON_ERROR;
  }
  
  Serial.print(" Index: ");    
  Serial.print(index);    
  Serial.print("\t");    
  
// Parse payload
  Serial.print("Payload: >");
  Serial.print(bytes);
  Serial.print("<\t");

  end = bytes+length;
  pch = strstr(bytes, "value\":");
  if (pch+8>end) goto CLEANUP_ON_ERROR;
  value = strtof (pch+8,&rest);  
  if (*rest!=',' && *rest!='}'){
    value=0.0;
    valid=false;
  }
  dot[index].value = value;
  Serial.print("value: >");
  Serial.print(value);
  Serial.print("<\t");

  // Parse for count
  pch = strstr(bytes,"\"count\":");
  if (pch+8>end || pch==NULL) { // keyword "count" NOT found or truncated contend
    dot[index].count = 0L;
  } else {
    dot[index].count = strtoul(pch+8,&rest,10);      
    if (*rest!=',' && *rest!='}') valid=false; // delimiter not correct invalidated reading
  }

  // Parse for timestamp
  pch = strstr(bytes,"\"timestamp\":");
  if (pch+12>end || pch==NULL){  // No keyword "timestamp" found
    dot[index].timestamp = 0L;
  } else {
    dot[index].timestamp = (ulong) (strtoll(pch+12,&rest,10)/1000); // Timestamp in seconds NOT milliseconds, sorry     
    if (*rest!=',' && *rest!='}') valid=false;
  }

  // Parse for context
  pch = strstr(bytes,"\"context\":");
  if (pch+11>end || pch==NULL){  // No keyword "context" found
    goto ContextSkipped;
  }
  rest = strstr(pch,",");
  context_length = rest-(pch+11);
  if (context_length > 0) {
    if (strncmp(pch+11,"{}",2)==0){
      goto ContextSkipped;
    } 
    char* tmp = (char *)realloc(dot->context,sizeof(char)*(context_length+1));  // Try to re-use the memory ...
    if (tmp==NULL) {
      Serial.print(F("ERROR: No memory allocated for dot->context. "));
      Serial.print(context_length+1);
      Serial.println(F(" bytes required"));
      valid=false;
      goto ContextSkipped;
    }
    dot[index].context=tmp;

    strncpy(dot[index].context, pch+11, context_length);
    dot[index].context[context_length]='\0';
  }
  ContextSkipped: 


  // Parse for id
  pch = strstr(bytes,"\"id\":");
  if (pch+6>end || pch==NULL)  goto IdSkipped;
  pch =  strstr(pch+6, "\"")+1; // Just after the left-\"
  rest = strstr(pch,"\"");      // Just before the right-\"
  context_length = rest-pch;
  if (context_length == 24) {   // There have to be exactly 24 chars
    strncpy(dot[index].id, pch, context_length);
    dot[index].id[context_length]='\0';
  }
  IdSkipped:

  dot[index].valid = valid;

  switch (index){
    case DOT_LED:
        digitalWrite(LED_BUILTIN, dot[DOT_LED].value*HIGH);
      break;
    default:
      break;
    }

  Serial.println(" ... OK");
  return true;
  
CLEANUP_ON_ERROR:
  Serial.println("ERROR: Skipped parsing message");
  return false;
}



with the Ubidots dashboard looking something like this:

Hi @JLeib thanks so much for your contributions

1 and 2) --> I know, I’ve been playing with MQTT as well and also ask myself we we didn’t publish this before! It’s so easy to use.

  1. We are gradually adding MQTT support to our current firmware libraries and this benchmark adds so much sense to it. Thanks again for taking this time. We’re starting with Particle MQTT and will continue with Arduino MQTT following your contribution/suggestion.

–Agustín

ISSUE with publish() on ARDUINO

Hi Agustin,

can you please double-check the behavior of the API, when contacted via QoS=0:

Background:
For the MQTT-communication from my Arduino I use “PubSubClient” and “MQTT” (Paho).
Both work fine, when using short simple/short publish(), like:

  • topic:/v1.6/devices/MQTT_Test
    payload:{“wlan”: 1.00}
  • topic:/v1.6/devices/MQTT_Test/wlan
    payload:{“value”: 2.00}
  • topic:/v1.6/devices/MQTT_Test
    payload:{“wlan”: {“value”: 4.00}}

By the way publishing:
topic:/v1.6/devices/MQTT_Test/wlan
payload:{3.00}
is NOT working - it would be a good pendant to subscribe() to /v1.6/devices/MQTT_Test/wlan/lv, but its okay.

Issue using Arduino:
The problems I have is with longer payloads, like using your example in the API documentation:
using publish() with
topic:/v1.6/devices/label_ds
payload:{“temperature”: 10, “luminosity”: {“value”:10}, “wind_speed”: [{“value”: 11, “timestamp”:10000}, {“value”: 12, “timestamp”:13000}]}
is NOT working. when sending from the Arduino.

Countermeasures:

  • #define MQTT_BUFFER_SIZE 256
    as recommended for MQTTClient.h (Paho) I enlarged MQTT_BUFFER_SIZE from 128 (default) to 256 already
  • \define MQTT_MAX_PACKET_SIZE 256 for “PubSubClient”
  • receiving long strings by subsribe() seems not to be the problem

Any ideas?

P.S.:
Fine when using node.js
When I sent the same string using node.js from the terminal - just in order to double-check myself -
everything is fine!!!
Both for:

  • QoS:1 and QoS:0 as well as
  • ‘retain’:false or ‘retain’:true

P.P.S. (After more tests):

It seems, that itisnotaproblem with the keywords “timestamp” or “context”, but with the length of the payload:

This works fine - strlen(payload)=95:

  • topic:/v1.6/devices/MQTT_Test
  • payload:{“wlan”: [{“value”: 4000}, {“value”: 4001}, {“value”: 4002}, {“value”: 4003}, {“value”: 4004}]}

but this does NOT work -strlen(payload)=129:

  • topic:/v1.6/devices/MQTT_Test
  • payload: {“wlan”: [{“value”: 4000}, {“value”: 4001}, {“value”: 4002}, {“value”: 4003}, {“value”: 4004}, {"val
    : 4005}, {“value”: 4006}]}

As reported above I have an issue posting longer MQTT-Messages on Arduino, as a summary as of now:

  • It looks like its NOT an Ubidots-API iusse
  • I do receive an “FALSE” as a return of publish() when the payload is longer than about 100 char
    (I’ll find the exact number later). That’s why I believe it is on the
    ARDUINO side.
  • #define MQTT_BUFFER_SIZE 256 prior to \#include <MQTTClient.h>does not help
  • hard setting of MAX_MQTT_PACKET_SIZE = 256 in <\lib\MQTTClient.h> (which is included by MQTTClient.h) does not help either

Any suggestions?

@JLeib @JLeib are you basing in some particular library? maybe it can be a garbage collector or MKR1000 gets an issue with its any memory block.

The last week i used MQTT library of hirotakaster and i sent a lot of message in one payload.

Best regards,
Metavix

Hi Mateo,

thanks for the suggestion. MQTT library of hirotakaster

solved the issue of publishing longer MQTT payloads

right away.

  1. The library is based on PubSubClient and allows intrinsically for packets of 255 bytes.
    // MQTT_MAX_PACKET_SIZE : Maximum packet size
    // this size is total of [MQTT Header(Max:5byte) + Topic Name Length + Topic Name + Message ID(QoS1|2) + Payload]
    #define MQTT_MAX_PACKET_SIZE 255

  2. The library allows for QoS1 as well. Nice!

Best regards
Juergen

P.S: I have no idea yet, why changing MQTT_MAX_PACKET_SIZE in the PubSubClient.h didn’t do a similar job …

1 Like

@JLeib Yeah sometimes Arduino code is very crazy and when you are doing something the code doesn’t work, usually that happens because there is an memory allocation error.

Best regards,
Metavix

1 Like