Internet of Things (Part 2)

Neil HaddleyMarch 6, 2021

Node-RED can publish messages to a MQTT broker and subscribe to MQTT topics.

Node-RED

Node-RED can be used to publish messages to an MQTT broker.

Node-RED is able to subscribe to MQTT broker messages.

$ docker run -it -p 1880:1880 nodered/node-red

node-red Docker image

node-red Docker image

Connecting Node-RED to the MQTT broker

Connecting Node-RED to the MQTT broker

"mqtt in" node subscribes to "outTopic" messages

"mqtt in" node subscribes to "outTopic" messages

"debug" node displays the incoming "outTopic" messages on debug panel

"debug" node displays the incoming "outTopic" messages on debug panel

"mqtt out" node publishes "1" or "0" to the "inTopic" and ESP8266 LED turns on or off.

"mqtt out" node publishes "1" or "0" to the "inTopic" and ESP8266 LED turns on or off.

MQTT topics and payloads

In the example above "inTopic" is a command topic and "outTopic" is a topic for telemetry.

Command Topics

In the example above the ESP8266 microprocessor subscribes to "inTopic" messages and when it sees a message with the payload "1" the builtin LED is turned on.

In fact dozens or hundreds of IoT devices could subscribe to "inTopic" messages turning on or off together.

But, what if a developer would like a single ESP8266 device's builtin LED to turn on? What if a developer would like to turn on a fan connected to a single ESP8266 device? What if a developer would like to turn off the fans connected to all ESP8266 devices on given floor of a given building?

Well every device that connects to an MQTT broker has a unique "thing name" (and a unique MQTT client id) but how would the publisher of a control message specify that they wanted to turn off the fan connected to the ESP8266 with a given "thing name"?

Topic structure

If a device with thing name "thing1" is in conference room 2 on floor 2 of building 3 a developer can arrange for that thing's builtin LED to turn off whenever a message with payload "0" is published with this topic:

cmd/hvac/building3/floor2/confroom2/thing1/builtinled

A developer can arrange for a fan controlled by thing1 to be turned on whenever a message with payload "1" is published with this topic:

cmd/hvac/building3/floor2/confroom2/thing1/fan

A developer can arrange for fans controlled by all things on floor 2 to be turned off when a message with payload "0" is published with this topic:

cmd/hvac/building3/floor2/fan

Sample ESP8266 code is shown below.

publishing to "cmd/hvac/building3/floor2/confroom2/thing1/builtinled"

publishing to "cmd/hvac/building3/floor2/confroom2/thing1/builtinled"

subscribing to "cmd/hvac/#'

subscribing to "cmd/hvac/#'

Response

If the publisher of a Command Topic message wants to receive responses they might provide a JSON payload that includes a "session-id" and a "res-topic". The session-id would be passed to the ESP8266 device so that it can differentiate between commands that are repeated or re-sent over time. The res-topic would be used as the response message's topic (it is better not to hardcode the response message's topic).

Instead of passing the string "1" the Command Topic message's payload would be:

{

"session-id":"session-123456789012",

"res-topic":"cmd/hvac/building3/floor2/res",

"value":"1"

}

Topic for telemetry

Telemetry is read-only data that is transmitted by a device and aggregated elsewhere.

A temperature sensor reading might be published by the ESP8266 using this topic

dt/hvac/building3/floor2/confroom2/thing1/temperature

the temperature units might be published using this topic

dt/hvac/building3/floor2/confroom2/thing1/temperature/units

A humidity sensor reading might be published by the ESP8266 using this topic

dt/hvac/building3/floor2/confroom2/thing1/humidity

How often an ESP8266 should publish a value depend on the application. The device could publish an update every few minutes, every few seconds or every time the value changes.

Data subscriptions

While telemetry messages are being published to the MQTT server client applications can subscribe to individual topics or to multiple topics using wildcards.

The # wildcard matches multiple topic name levels.

The + wildcard matches a single topic name level.

Humidity sensor readings from 2nd floor of building 3.

Humidity sensor readings from 2nd floor of building 3.

mqtt_esp8266_topic_structure.ino

TEXT
1#include <ESP8266WiFi.h>
2#include <PubSubClient.h>
3
4const char* THING_NAME = "thing1";
5
6const char* ssid = "<WIFI NAME>";
7const char* password = "<WIFI PASSWORD>"
8const char* mqtt_server = "<MQTT BROKER ADDRESS>";
9const int CONNECTED_FAN = 4; // GOIP D2
10
11WiFiClient espClient;
12PubSubClient client(espClient);
13unsigned long lastMsg = 0;
14#define MSG_BUFFER_SIZE	(50)
15char msg[MSG_BUFFER_SIZE];
16int value = 0;
17
18void setup_wifi() {
19
20  delay(10);
21  Serial.println();
22  Serial.print("Connecting to ");
23  Serial.println(ssid);
24
25  WiFi.mode(WIFI_STA);
26  WiFi.begin(ssid, password);
27
28  while (WiFi.status() != WL_CONNECTED) {
29    delay(500);
30    Serial.print(".");
31  }
32
33  Serial.println("");
34  Serial.println("WiFi connected");
35  Serial.println("IP address: ");
36  Serial.println(WiFi.localIP());
37}
38
39void callback(char* topic, byte* payload, unsigned int length) {
40  Serial.print("Message arrived [");
41  Serial.print(topic);
42  Serial.print("] ");
43  for (int i = 0; i < length; i++) {
44    Serial.print((char)payload[i]);
45  }
46  Serial.println();
47
48  String strTopic(topic);
49  if (strTopic.endsWith("/builtinled")) {
50    if ((char)payload[0] == '1') {
51      digitalWrite(BUILTIN_LED, LOW);   // Turn the LED on (Note that LOW is the voltage level
52    } else {
53      digitalWrite(BUILTIN_LED, HIGH);  // Turn the LED off by making the voltage HIGH
54    }
55  }
56  if (strTopic.endsWith("/fan")) {
57    if ((char)payload[0] == '1') {
58      digitalWrite(CONNECTED_FAN, LOW);   // Turn the FAN off
59    } else {
60      digitalWrite(CONNECTED_FAN, HIGH);  // Turn the FAN on
61    }
62  }
63
64
65}
66
67void reconnect() {
68  // Loop until we're reconnected
69  while (!client.connected()) {
70    Serial.print("Attempting MQTT connection...");
71
72    // Create a "thing name"/client ID
73    String strThingName(THING_NAME);
74    String fullpathThingName = "cmd/hvac/building3/floor2/confroom2/"+strThingName+"/+";
75
76    // Attempt to connect
77    if (client.connect(THING_NAME)) {
78      Serial.println("connected");
79      // resubscribe
80      client.subscribe(fullpathThingName.c_str());
81      client.subscribe("cmd/hvac/building3/floor2/confroom2/+");
82      client.subscribe("cmd/hvac/building3/floor2/+");
83      client.subscribe("cmd/hvac/building3/+");
84    } else {
85      Serial.print("failed, rc=");
86      Serial.print(client.state());
87      Serial.println(" try again in 5 seconds");
88      // Wait 5 seconds before retrying
89      delay(5000);
90    }
91  }
92}
93
94void setup() {
95  pinMode(BUILTIN_LED, OUTPUT);     // Initialize the BUILTIN_LED pin as an output
96  pinMode(CONNECTED_FAN, OUTPUT);   // Initialize CONNECTED_FAN  - GOIP D2 - as an output
97  Serial.begin(115200);
98  setup_wifi();
99  client.setServer(mqtt_server, 1883);
100  client.setCallback(callback);
101}
102
103void loop() {
104
105  if (!client.connected()) {
106    reconnect();
107  }
108  client.loop();
109
110}

mqtt_esp8266_topic_structure_telemetry.ino

TEXT
1#include <ESP8266WiFi.h>
2#include <PubSubClient.h>
3#include "DHT.h"
4
5const char* THING_NAME = "thing1";
6
7// Create a "thing name"/client ID
8static String strThingName(THING_NAME);
9
10const char* ssid = "<WIFI NAME>";
11const char* password = "<WIFI PASSWORD>"
12const char* mqtt_server = "<MQTT BROKER ADDRESS>";
13const int CONNECTED_FAN = 4; // GOIP D2
14
15#define DHTTYPE DHT11   // DHT 11
16// DHT Sensor
17const int DHTPin = 5; // GPIO D1
18// Initialize DHT sensor.
19DHT dht(DHTPin, DHTTYPE);
20
21// Temporary variables
22static char celsiusTemp[7];
23static char fahrenheitTemp[7];
24static char humidityTemp[7];
25
26#define  FLT_MIN   1.17549435E-38F
27
28static float h_old = FLT_MIN;
29static float t_old = FLT_MIN;
30
31WiFiClient espClient;
32PubSubClient client(espClient);
33unsigned long lastMsg = 0;
34#define MSG_BUFFER_SIZE	(50)
35char msg[MSG_BUFFER_SIZE];
36
37void setup_wifi() {
38
39  delay(10);
40  // We start by connecting to a WiFi network
41  Serial.println();
42  Serial.print("Connecting to ");
43  Serial.println(ssid);
44
45  WiFi.mode(WIFI_STA);
46  WiFi.begin(ssid, password);
47
48  while (WiFi.status() != WL_CONNECTED) {
49    delay(500);
50    Serial.print(".");
51  }
52
53  Serial.println("");
54  Serial.println("WiFi connected");
55  Serial.println("IP address: ");
56  Serial.println(WiFi.localIP());
57}
58
59void callback(char* topic, byte* payload, unsigned int length) {
60  Serial.print("Message arrived [");
61  Serial.print(topic);
62  Serial.print("] ");
63  for (int i = 0; i < length; i++) {
64    Serial.print((char)payload[i]);
65  }
66  Serial.println();
67
68  String strTopic(topic);
69  if (strTopic.endsWith("/builtinled")) {
70    if ((char)payload[0] == '1') {
71      digitalWrite(BUILTIN_LED, LOW);   // Turn the LED on (Note that LOW is the voltage level
72    } else {
73      digitalWrite(BUILTIN_LED, HIGH);  // Turn the LED off by making the voltage HIGH
74    }
75  }
76  if (strTopic.endsWith("/fan")) {
77    if ((char)payload[0] == '1') {
78      digitalWrite(CONNECTED_FAN, LOW);   // Turn the FAN off
79    } else {
80      digitalWrite(CONNECTED_FAN, HIGH);  // Turn the FAN on
81    }
82  }
83
84}
85
86void reconnect() {
87  // Loop until we're reconnected
88  while (!client.connected()) {
89    Serial.print("Attempting MQTT connection...");
90
91    String fullpathThingName = "cmd/hvac/building3/floor2/confroom2/" + strThingName + "/+";
92
93    // Attempt to connect
94    if (client.connect(THING_NAME)) {
95      Serial.println("connected");
96      // resubscribe
97      client.subscribe(fullpathThingName.c_str());
98      client.subscribe("cmd/hvac/building3/floor2/confroom2/+");
99      client.subscribe("cmd/hvac/building3/floor2/+");
100      client.subscribe("cmd/hvac/building3/+");
101    } else {
102      Serial.print("failed, rc=");
103      Serial.print(client.state());
104      Serial.println(" try again in 5 seconds");
105      // Wait 5 seconds before retrying
106      delay(5000);
107    }
108  }
109}
110
111void setup() {
112  pinMode(BUILTIN_LED, OUTPUT);     // Initialize the BUILTIN_LED pin as an output
113  pinMode(CONNECTED_FAN, OUTPUT);
114  Serial.begin(115200);
115  setup_wifi();
116  client.setServer(mqtt_server, 1883);
117  client.setCallback(callback);
118}
119
120void loop() {
121
122  if (!client.connected()) {
123    reconnect();
124  }
125  client.loop();
126
127  unsigned long now = millis();
128  if (now - lastMsg > 5000) {
129
130    // SENSOR READ START
131
132    // Sensor readings may also be up to 5 seconds 'old' (its a very slow sensor)
133    float h = dht.readHumidity();
134    // Read temperature as Celsius (the default)
135    float t = dht.readTemperature();
136    // Read temperature as Fahrenheit (isFahrenheit = true)
137    float f = dht.readTemperature(true);
138    // Check if any reads failed and exit early (to try again).
139    if (isnan(h) || isnan(t) || isnan(f)) {
140      Serial.println("Failed to read from DHT sensor!");
141      strcpy(celsiusTemp, "Failed");
142      strcpy(fahrenheitTemp, "Failed");
143      strcpy(humidityTemp, "Failed");
144    }
145    else {
146      // Computes temperature values in Celsius + Fahrenheit and Humidity
147      float hic = dht.computeHeatIndex(t, h, false);
148      dtostrf(hic, 6, 2, celsiusTemp);
149      float hif = dht.computeHeatIndex(f, h);
150      dtostrf(hif, 6, 2, fahrenheitTemp);
151      dtostrf(h, 6, 2, humidityTemp);
152      // You can delete the following Serial.print's, it's just for debugging purposes
153      Serial.print("Humidity: ");
154      Serial.print(h);
155      Serial.print(" %\t Temperature: ");
156      Serial.print(t);
157      Serial.print(" *C ");
158      Serial.print(f);
159      Serial.print(" *F\t Heat index: ");
160      Serial.print(hic);
161      Serial.print(" *C ");
162      Serial.print(hif);
163      Serial.println(" *F");
164
165      if (h != h_old) {
166        String fullpathThingNameHumidity = "dt/hvac/building3/floor2/confroom2/" +
167                                           strThingName + "/humidity";
168        client.publish(fullpathThingNameHumidity.c_str(), humidityTemp);
169        h_old = h;
170      }
171      if (t != t_old) {
172        String fullpathThingNameTemperature = "dt/hvac/building3/floor2/confroom2/"
173                                              + strThingName + "/temperature";
174        client.publish(fullpathThingNameTemperature.c_str(), celsiusTemp);
175        t_old=t;
176      }
177    }
178
179    // SENSOR READ END
180
181    lastMsg = now;
182  
183  }
184}