MQTT source

Introduction

The MQTT protocol is a lightweight, publish-subscribe, machine to machine network protocol for message queue/message queuing service.

Basic examples

Prerequisites

In order to run all the examples successfully, you will need Mosquitto and CrateDB. The easiest way is to use a Docker Compose file we provide for that purpose.

wget https://github.com/daq-tools/lorrystream/raw/main/docker-compose.yml
docker compose up --detach cratedb mosquitto

Connectivity

The most basic example is to subscribe to a data source, and to relay it to STDOUT. This means the data will be either printed to your terminal, or redirected to a file or pipe. It is useful to verify if connecting to the data source works well, and to inspect data or metadata information.

# Subscribe to MQTT topic and relay data stream to terminal.
lorry relay "mqtt://localhost/testdrive/%23"

This command effectively connects to an MQTT broker on localhost, subscribes to the MQTT topic testdrive/#, and relays the data stream to your terminal, without applying any kind of decoding. Please note to use %23 for the topic wildcard character #.

You can verify everything works by publishing an example message to the MQTT topic, for example using mosquitto_pub.

# Submit message.
echo "hello world" | mosquitto_pub -t 'testdrive/foo' -l

Storage

For the next examples, let’s define a message payload in JSON format.

DATA='{"id": "device-42", "temperature": 42.42, "humidity": 84.84}'

An only slightly more advanced example would be to converge data from MQTT into an SQLite database.

# Start relay.
lorry relay \
    "mqtt://localhost/testdrive/%23?content-type=json" \
    "sqlite:///data.sqlite?table=testdrive"

# Submit data.
echo "${DATA}" | mosquitto_pub -t 'testdrive/foo' -l

# Verify data has been stored.
sqlite3 data.sqlite "SELECT * FROM testdrive;"

If you are aiming to store high volumes of data, consider using a database designed for that purpose.

lorry relay \
    "mqtt://localhost/testdrive/%23?content-type=json" \
    "crate://localhost/?table=testdrive"

LorryStream uses SQLAlchemy for connecting to the target database, which supports a wide range of databases. In order learn more details, please visit the documentation section about the Database sink.