MQTT - Lightweight pub/sub message broker
MQTT is a standard messaging protocol for IoT devices - it implements publish/subscribe model, which in short is asynchronous communication system that allow to receive any published messages for all devices that are subscribed to the topic. It has vast usage in communication of IoT devices, due to high performance and low bandwidth usage. Most devices sold by different manufactures can be configured to communicate via MQTT or HTTP REST service and at the same time they are hooked to 3rd party IoT platforms like TUYA for instance (in order to access the device from the Internet). Fortunately you can change that and configure the device to run within your home network, all you need to do is to flash different firmware, but you need to know that it will void the warranty most of the time. If you are concerned about privacy I will show you in this post how to configure and connect your devices to your MQTT broker without third party platforms like TUYA and make them available even if you are outside of your home network.
Private Cloud for IoT made simple
For people that are not into IT world Tuya is very convenient platform, you just buy the IoT product install app on your phone, and you can access your device from the internet. But if you don’t trust the Chinese giant in terms of privacy and collecting data, then I would encourage you to build your own private network.
It’s all about firmware
the most important step to have full control of how your device is configured is to make sure that you have proper firmware that allows you to upload your custom settings like MQTT broker for instance. I’m using Tasmota for most of my devices and I can say it works flawlessly. It is open source project that currently has >5M downloads and >15k stars on github: https://github.com/arendst/Tasmota. It is highly customizable firmware and supports lots of different devices. Flashing process might be quite problematic if you are not familiar with soldering PCB board. But I have seen possibilities to flash firmware without soldering - they are often called OTA flash (over the air flash) In this post I will not focus on how to do it, there are very good tutorials on the web you can easily google them. Instead I will show you how you can talk directly to MQTT broker.
Installing MQTT broker - Mosquitto
There are many different brokers, I’m using Mosquitto - it’s an open source software vastly used and easy to configure. Here is official Docker Image for Eclipse Mosquitto and will be using that in our Demo. First Let’s configure Mosquitto service in docker compose:
version: "3.5"
services:
mosquitto:
image: eclipse-mosquitto
container_name: mosquitto
ports:
- "1883:1883"
networks:
- home
volumes:
- ./mosquitto:/mosquitto/config
restart: unless-stopped
depends_on:
- wireguard
networks:
home:
name: home
as you can see I’m attaching volume to the mosquitto config - This is the main configuration file so you might want to edit some of the settings, for instance to bind ip address and port:
# listener port-number [ip address/host name/unix socket path]
listener 1883 0.0.0.0
there is also another service that mosquitto is dependent on - Wireguard
depends_on:
- wireguard
In order to have an access to our MQTT broker and our devices running at home we need somehow access our home network (from the Internet) and we can do that using Wireguard which basically is a VPN Tunnel. On the Wireguard home page you will not find Docker image for the Wireguard, but linuxserver.io team did a great job and they created unofficial image which I’m going to use in our example, below Wireguard configured in docker-compose.yml:
wireguard:
image: ghcr.io/linuxserver/wireguard
container_name: wireguard
cap_add:
- NET_ADMIN
- SYS_MODULE
environment:
- PUID=1000
- PGID=1000
- TZ=Europe/Warsaw
- SERVERURL=your-custom-dynamic-dns-entry.duckdns.org
- SERVERPORT=51820
- PEERS=1
- PEERDNS=8.8.8.8
- INTERNAL_SUBNET=10.13.13.0
volumes:
- ./wireguard/config:/config
- /lib/modules:/lib/modules
ports:
- 51820:51820/udp
sysctls:
- net.ipv4.conf.all.src_valid_mark=1
restart: unless-stopped
as you can see Wireguard is using udp port: 51820. So there is one step required in order to establish VPN connection to your home network - we need to forward this high udp port. Most home routers even those very simple ones are able to forward ports. This is screenshot from my router’s port forwarding configuration:
The IP address for your device will probably be different, you just want to set ip address of the machine where the MQTT broker and Wireguard containers will be running. Another thing is your external IP - if it’s not static then you need to configure dynamic dns. In short it’s a web service that will always point to your public ip address even if it was changed recently. I can highly recommend DuckDNS It’s web API hosted on AWS which provides update URL endpoint that you need to call in order to update your public ip address DuckDNS provides various scripts for different platforms. You just need to create an account and you will receive unique url along with TOKEN, then use some of their scripts and you are ready to go.
Python app
let’s have a look on our python app and paho mqtt library - which in my opinion is very straightforward and easy to use. Our app will be responsible for receiving messages from certain topics. this is folder structure for our app:
mqtt_tele/
├── app
│ ├── tele_mqtt_handler.py
│ └── tele_mqtt_tasks.py
├── Dockerfile
└── requirements.txt
as you can see it contain 2 python files, the first one: tele_mqtt_handler.py is our handler that subscribes certain topic and handles the messages. Second file tele_mqtt_tasks.py is where we have our logic i.e what to do with specific messages. We are going to use smart plug to measure power usage from the wall. I have washing machine connected to that smart plug, and by measuring power usage I can determine whether the washing program has stopped running, it’s very useful if you don’t have smart washing machine but you want to receive information when washing is finished. Below configuration files and Python script:
Dockerfile:
FROM python:3.9.0-alpine
WORKDIR /app
COPY requirements.txt .
RUN apk add tzdata && \
cp /usr/share/zoneinfo/Europe/Warsaw /etc/localtime && \
echo "Europe/Warsaw" > /etc/timezone && \
apk del tzdata && \
pip install -r requirements.txt
ENTRYPOINT [ "python", "tele_mqtt_handler.py" ]
tele_mqtt_handler.py:
from paho.mqtt import client as mqtt_client
import tele_mqtt_tasks
from datetime import datetime
broker = '192.168.0.123'
client_id = 'tele-python-docker-client'
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
client.subscribe('tele/+/+')
else:
print(f"Failed to connect, return code {rc}")
def on_message(client, userdata, msg):
received_msg = msg.payload.decode()
try:
_, device_name, func_name = msg.topic.split('/')
obj_attr = getattr(tele_mqtt_tasks, func_name)
func = obj_attr.get(device_name, None)
if func is not None:
# print(f'{datetime.now()} received msg on: {msg.topic}')
func(device_name, received_msg)
except Exception as e:
pass
def make_client():
client = mqtt_client.Client(client_id)
client.on_message = on_message
client.on_connect = on_connect
client.connect(broker, port=1883)
return client
if __name__ == "__main__":
client = make_client()
client.loop_forever()
as you can see handler is subscribing: tele/+/+
“+” sign is a wildcard which means that python app can receive messages from different topics for example:
tele/bedroom/thermometer
tele/bathroom/thermometer
tele/deviceX/sensorA
tele/deviceY/sensorB
by the way there is quite common naming convention for topics, for example tele abbreviation comes from telemetry this indicates that topic is only collecting data. cmnd on the other hand means command and it might be responsible to run certain commands for example turn ON/OFF device It’s a good practices to follow that conventions in order to make it less complex and more readable.
tele_mqtt_tasks.py:
import redis
import json
import os
import requests
from datetime import datetime
r = redis.Redis(host='192.168.0.123')
def post_to_slack(slack_msg):
api_url = 'https://slack.com/api/chat.postMessage'
token = os.getenv('SLACK_TOKEN')
headers = {
'Authorization': f'Bearer {token}',
'Content-type': 'application/json'
}
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
data = {
'channel': '#smart-home',
'text': f'{now}\n{slack_msg}'
}
r = requests.post(api_url, headers=headers, json=data)
def monitor_washing(device, msg):
'''
pralka ustawiona na tym topicu: tele/gniazdko-4/SENSOR
'''
washing_state = r.get('washing_state')
last_value = r.get('last_value')
counter = r.get('counter')
json_data = json.loads(msg)
current_value = json_data['ENERGY']['Today']
if washing_state is None or last_value is None or counter is None:
r.set('last_value', current_value)
r.set('washing_state', 'stopped')
r.set('counter', 0)
else:
last_value = float(last_value)
counter = int(counter)
washing_state = washing_state.decode('utf-8')
if washing_state == 'stopped':
if current_value > last_value:
counter += 1
r.set('counter', counter)
if counter >= 3:
print('register: washing started')
r.set('washing_state', 'started')
r.set('counter', 0)
else:
r.set('counter', 0)
elif washing_state == 'started':
if current_value == last_value:
counter += 1
r.set('counter', counter)
if counter >= 2:
print('register: washing is finished')
r.set('washing_state', 'stopped')
r.set('counter', 0)
post_to_slack('wyjmij pranie')
else:
r.set('counter', 0)
r.set('last_value', current_value)
print(f'current counter: {counter} current value: {current_value}')
# topic full name: eg. tele/gniazdko-4/SENSOR
SENSOR = {
'gniazdko-4': monitor_washing
}
tele_mqtt_tasks.py contains all the logic that is required in order to measure power usage and then notify user when device is no longer working. In order to save state we need to store information somewhere, it could be simple file but in this example I’m using in-memory database: Redis which is very handy to store temporary data. I will make another post in the future about Redis because it’s an awesome database engine. In tele_mqtt_tasks.py there is post_to_slack() method which is basically posting new message to specified channel when the washing is completed. If you want to know more about Bot API from Slack check this post. As usual I will store entire configuration in my project repository so feel free to clone it and play around. For more complex solutions I highly recommend to get familiar with paho mqtt project
Have a nice day and happy coding!