Internet of Things (Part 2)
Neil Haddley • March 6, 2021
Node-RED can publish messages to a MQTT broker and subscribe to MQTT topics.
Node-RED
Node-RED can be used to publish messages to an MQTT broker.
Node-RED is able to subscribe to MQTT broker messages.
$ docker run -it -p 1880:1880 nodered/node-red

node-red Docker image

Connecting Node-RED to the MQTT broker

"mqtt in" node subscribes to "outTopic" messages

"debug" node displays the incoming "outTopic" messages on debug panel

"mqtt out" node publishes "1" or "0" to the "inTopic" and ESP8266 LED turns on or off.
MQTT topics and payloads
In the example above "inTopic" is a command topic and "outTopic" is a topic for telemetry.
Command Topics
In the example above the ESP8266 microprocessor subscribes to "inTopic" messages and when it sees a message with the payload "1" the builtin LED is turned on.
In fact dozens or hundreds of IoT devices could subscribe to "inTopic" messages turning on or off together.
But, what if a developer would like a single ESP8266 device's builtin LED to turn on? What if a developer would like to turn on a fan connected to a single ESP8266 device? What if a developer would like to turn off the fans connected to all ESP8266 devices on given floor of a given building?
Well every device that connects to an MQTT broker has a unique "thing name" (and a unique MQTT client id) but how would the publisher of a control message specify that they wanted to turn off the fan connected to the ESP8266 with a given "thing name"?
Topic structure
If a device with thing name "thing1" is in conference room 2 on floor 2 of building 3 a developer can arrange for that thing's builtin LED to turn off whenever a message with payload "0" is published with this topic:
cmd/hvac/building3/floor2/confroom2/thing1/builtinled
A developer can arrange for a fan controlled by thing1 to be turned on whenever a message with payload "1" is published with this topic:
cmd/hvac/building3/floor2/confroom2/thing1/fan
A developer can arrange for fans controlled by all things on floor 2 to be turned off when a message with payload "0" is published with this topic:
cmd/hvac/building3/floor2/fan
Sample ESP8266 code is shown below.

publishing to "cmd/hvac/building3/floor2/confroom2/thing1/builtinled"

subscribing to "cmd/hvac/#'
Response
If the publisher of a Command Topic message wants to receive responses they might provide a JSON payload that includes a "session-id" and a "res-topic". The session-id would be passed to the ESP8266 device so that it can differentiate between commands that are repeated or re-sent over time. The res-topic would be used as the response message's topic (it is better not to hardcode the response message's topic).
Instead of passing the string "1" the Command Topic message's payload would be:
{
"session-id":"session-123456789012",
"res-topic":"cmd/hvac/building3/floor2/res",
"value":"1"
}
Topic for telemetry
Telemetry is read-only data that is transmitted by a device and aggregated elsewhere.
A temperature sensor reading might be published by the ESP8266 using this topic
dt/hvac/building3/floor2/confroom2/thing1/temperature
the temperature units might be published using this topic
dt/hvac/building3/floor2/confroom2/thing1/temperature/units
A humidity sensor reading might be published by the ESP8266 using this topic
dt/hvac/building3/floor2/confroom2/thing1/humidity
How often an ESP8266 should publish a value depend on the application. The device could publish an update every few minutes, every few seconds or every time the value changes.
Data subscriptions
While telemetry messages are being published to the MQTT server client applications can subscribe to individual topics or to multiple topics using wildcards.
The # wildcard matches multiple topic name levels.
The + wildcard matches a single topic name level.

Humidity sensor readings from 2nd floor of building 3.
mqtt_esp8266_topic_structure.ino
TEXT
1#include <ESP8266WiFi.h> 2#include <PubSubClient.h> 3 4const char* THING_NAME = "thing1"; 5 6const char* ssid = "<WIFI NAME>"; 7const char* password = "<WIFI PASSWORD>" 8const char* mqtt_server = "<MQTT BROKER ADDRESS>"; 9const int CONNECTED_FAN = 4; // GOIP D2 10 11WiFiClient espClient; 12PubSubClient client(espClient); 13unsigned long lastMsg = 0; 14#define MSG_BUFFER_SIZE (50) 15char msg[MSG_BUFFER_SIZE]; 16int value = 0; 17 18void setup_wifi() { 19 20 delay(10); 21 Serial.println(); 22 Serial.print("Connecting to "); 23 Serial.println(ssid); 24 25 WiFi.mode(WIFI_STA); 26 WiFi.begin(ssid, password); 27 28 while (WiFi.status() != WL_CONNECTED) { 29 delay(500); 30 Serial.print("."); 31 } 32 33 Serial.println(""); 34 Serial.println("WiFi connected"); 35 Serial.println("IP address: "); 36 Serial.println(WiFi.localIP()); 37} 38 39void callback(char* topic, byte* payload, unsigned int length) { 40 Serial.print("Message arrived ["); 41 Serial.print(topic); 42 Serial.print("] "); 43 for (int i = 0; i < length; i++) { 44 Serial.print((char)payload[i]); 45 } 46 Serial.println(); 47 48 String strTopic(topic); 49 if (strTopic.endsWith("/builtinled")) { 50 if ((char)payload[0] == '1') { 51 digitalWrite(BUILTIN_LED, LOW); // Turn the LED on (Note that LOW is the voltage level 52 } else { 53 digitalWrite(BUILTIN_LED, HIGH); // Turn the LED off by making the voltage HIGH 54 } 55 } 56 if (strTopic.endsWith("/fan")) { 57 if ((char)payload[0] == '1') { 58 digitalWrite(CONNECTED_FAN, LOW); // Turn the FAN off 59 } else { 60 digitalWrite(CONNECTED_FAN, HIGH); // Turn the FAN on 61 } 62 } 63 64 65} 66 67void reconnect() { 68 // Loop until we're reconnected 69 while (!client.connected()) { 70 Serial.print("Attempting MQTT connection..."); 71 72 // Create a "thing name"/client ID 73 String strThingName(THING_NAME); 74 String fullpathThingName = "cmd/hvac/building3/floor2/confroom2/"+strThingName+"/+"; 75 76 // Attempt to connect 77 if (client.connect(THING_NAME)) { 78 Serial.println("connected"); 79 // resubscribe 80 client.subscribe(fullpathThingName.c_str()); 81 client.subscribe("cmd/hvac/building3/floor2/confroom2/+"); 82 client.subscribe("cmd/hvac/building3/floor2/+"); 83 client.subscribe("cmd/hvac/building3/+"); 84 } else { 85 Serial.print("failed, rc="); 86 Serial.print(client.state()); 87 Serial.println(" try again in 5 seconds"); 88 // Wait 5 seconds before retrying 89 delay(5000); 90 } 91 } 92} 93 94void setup() { 95 pinMode(BUILTIN_LED, OUTPUT); // Initialize the BUILTIN_LED pin as an output 96 pinMode(CONNECTED_FAN, OUTPUT); // Initialize CONNECTED_FAN - GOIP D2 - as an output 97 Serial.begin(115200); 98 setup_wifi(); 99 client.setServer(mqtt_server, 1883); 100 client.setCallback(callback); 101} 102 103void loop() { 104 105 if (!client.connected()) { 106 reconnect(); 107 } 108 client.loop(); 109 110}
mqtt_esp8266_topic_structure_telemetry.ino
TEXT
1#include <ESP8266WiFi.h> 2#include <PubSubClient.h> 3#include "DHT.h" 4 5const char* THING_NAME = "thing1"; 6 7// Create a "thing name"/client ID 8static String strThingName(THING_NAME); 9 10const char* ssid = "<WIFI NAME>"; 11const char* password = "<WIFI PASSWORD>" 12const char* mqtt_server = "<MQTT BROKER ADDRESS>"; 13const int CONNECTED_FAN = 4; // GOIP D2 14 15#define DHTTYPE DHT11 // DHT 11 16// DHT Sensor 17const int DHTPin = 5; // GPIO D1 18// Initialize DHT sensor. 19DHT dht(DHTPin, DHTTYPE); 20 21// Temporary variables 22static char celsiusTemp[7]; 23static char fahrenheitTemp[7]; 24static char humidityTemp[7]; 25 26#define FLT_MIN 1.17549435E-38F 27 28static float h_old = FLT_MIN; 29static float t_old = FLT_MIN; 30 31WiFiClient espClient; 32PubSubClient client(espClient); 33unsigned long lastMsg = 0; 34#define MSG_BUFFER_SIZE (50) 35char msg[MSG_BUFFER_SIZE]; 36 37void setup_wifi() { 38 39 delay(10); 40 // We start by connecting to a WiFi network 41 Serial.println(); 42 Serial.print("Connecting to "); 43 Serial.println(ssid); 44 45 WiFi.mode(WIFI_STA); 46 WiFi.begin(ssid, password); 47 48 while (WiFi.status() != WL_CONNECTED) { 49 delay(500); 50 Serial.print("."); 51 } 52 53 Serial.println(""); 54 Serial.println("WiFi connected"); 55 Serial.println("IP address: "); 56 Serial.println(WiFi.localIP()); 57} 58 59void callback(char* topic, byte* payload, unsigned int length) { 60 Serial.print("Message arrived ["); 61 Serial.print(topic); 62 Serial.print("] "); 63 for (int i = 0; i < length; i++) { 64 Serial.print((char)payload[i]); 65 } 66 Serial.println(); 67 68 String strTopic(topic); 69 if (strTopic.endsWith("/builtinled")) { 70 if ((char)payload[0] == '1') { 71 digitalWrite(BUILTIN_LED, LOW); // Turn the LED on (Note that LOW is the voltage level 72 } else { 73 digitalWrite(BUILTIN_LED, HIGH); // Turn the LED off by making the voltage HIGH 74 } 75 } 76 if (strTopic.endsWith("/fan")) { 77 if ((char)payload[0] == '1') { 78 digitalWrite(CONNECTED_FAN, LOW); // Turn the FAN off 79 } else { 80 digitalWrite(CONNECTED_FAN, HIGH); // Turn the FAN on 81 } 82 } 83 84} 85 86void reconnect() { 87 // Loop until we're reconnected 88 while (!client.connected()) { 89 Serial.print("Attempting MQTT connection..."); 90 91 String fullpathThingName = "cmd/hvac/building3/floor2/confroom2/" + strThingName + "/+"; 92 93 // Attempt to connect 94 if (client.connect(THING_NAME)) { 95 Serial.println("connected"); 96 // resubscribe 97 client.subscribe(fullpathThingName.c_str()); 98 client.subscribe("cmd/hvac/building3/floor2/confroom2/+"); 99 client.subscribe("cmd/hvac/building3/floor2/+"); 100 client.subscribe("cmd/hvac/building3/+"); 101 } else { 102 Serial.print("failed, rc="); 103 Serial.print(client.state()); 104 Serial.println(" try again in 5 seconds"); 105 // Wait 5 seconds before retrying 106 delay(5000); 107 } 108 } 109} 110 111void setup() { 112 pinMode(BUILTIN_LED, OUTPUT); // Initialize the BUILTIN_LED pin as an output 113 pinMode(CONNECTED_FAN, OUTPUT); 114 Serial.begin(115200); 115 setup_wifi(); 116 client.setServer(mqtt_server, 1883); 117 client.setCallback(callback); 118} 119 120void loop() { 121 122 if (!client.connected()) { 123 reconnect(); 124 } 125 client.loop(); 126 127 unsigned long now = millis(); 128 if (now - lastMsg > 5000) { 129 130 // SENSOR READ START 131 132 // Sensor readings may also be up to 5 seconds 'old' (its a very slow sensor) 133 float h = dht.readHumidity(); 134 // Read temperature as Celsius (the default) 135 float t = dht.readTemperature(); 136 // Read temperature as Fahrenheit (isFahrenheit = true) 137 float f = dht.readTemperature(true); 138 // Check if any reads failed and exit early (to try again). 139 if (isnan(h) || isnan(t) || isnan(f)) { 140 Serial.println("Failed to read from DHT sensor!"); 141 strcpy(celsiusTemp, "Failed"); 142 strcpy(fahrenheitTemp, "Failed"); 143 strcpy(humidityTemp, "Failed"); 144 } 145 else { 146 // Computes temperature values in Celsius + Fahrenheit and Humidity 147 float hic = dht.computeHeatIndex(t, h, false); 148 dtostrf(hic, 6, 2, celsiusTemp); 149 float hif = dht.computeHeatIndex(f, h); 150 dtostrf(hif, 6, 2, fahrenheitTemp); 151 dtostrf(h, 6, 2, humidityTemp); 152 // You can delete the following Serial.print's, it's just for debugging purposes 153 Serial.print("Humidity: "); 154 Serial.print(h); 155 Serial.print(" %\t Temperature: "); 156 Serial.print(t); 157 Serial.print(" *C "); 158 Serial.print(f); 159 Serial.print(" *F\t Heat index: "); 160 Serial.print(hic); 161 Serial.print(" *C "); 162 Serial.print(hif); 163 Serial.println(" *F"); 164 165 if (h != h_old) { 166 String fullpathThingNameHumidity = "dt/hvac/building3/floor2/confroom2/" + 167 strThingName + "/humidity"; 168 client.publish(fullpathThingNameHumidity.c_str(), humidityTemp); 169 h_old = h; 170 } 171 if (t != t_old) { 172 String fullpathThingNameTemperature = "dt/hvac/building3/floor2/confroom2/" 173 + strThingName + "/temperature"; 174 client.publish(fullpathThingNameTemperature.c_str(), celsiusTemp); 175 t_old=t; 176 } 177 } 178 179 // SENSOR READ END 180 181 lastMsg = now; 182 183 } 184}