Reliable message delivery with Mosquitto (MQTT)
Monday, February 20th, 2017
I was looking for a message queue that could reliably handle messages in such a way that I was guaranteed never to miss one, even if the consumer is offline or crashes. Mosquitto (MQTT) comes very close to that goal. However, it wasn't directly obvious how to configure it to be as reliable as possible So this post describes how to use Mosquitto to ensure the most reliable delivery it can handle.
TL;DR: You can't
If you want to do reliable message handling with Mosquitto, the short answer is: You can't. For the long answer, read the rest of the article. Or if you're lazy and stubborn, read the "Limitations" section further down. ;-)
Anyway, let's get on with the show and see how close Mosquitto can get.
Quick overview of Mosquitto
Here's a quick schematic of Mosquitto components:
+----------+ +--------+ +----------+ | producer |---->| broker |---->| consumer | +----------+ +--------+ +----------+
The producer sends messages to a topic on the broker. The broker maintains an internal state of topics and which consumers are interested in which topics. It also maintains a queue of messages which still need to be sent to each consumer. How the broker decided what / when to send to which consumer depends on settings such as the QoS (Quality of Service) and what kind of session the consumer is opening.
Producer and consumer settings
Here's a quick overview of settings that ensure the highest available quality of delivery of messages with Mosquitto. When creating a consumer or producer, ensure you set these settings properly:
- quality-of-service must be
- The consumer must send a client_id.
- clean_session on the consumer must be
These are the base requirements to ensure that each consumer will receive messages exactly once, even if they've been offline for a while. The quality-of-service setting of
2 ensures that the broker requires acknowledgement from the consumer that a message has been received properly. Only then does the broker update its internal state to advance the consumer to the next message in the queue. If the client crashes before acknowledging the message, it'll be resent the next time.
The client_id gives the broker a unique name under which to store session state information such as the last message the consumer has properly acknowledged. Without a client_id, the broker cannot do this.
The clean_session setting lets the consumer inform the broker about whether it wants its session state remembered. Without it, the broker assumes the broker assumes the consumer does not care about past messages and such. It will only receive any new messages that are produced after the consumer has connected to the broker.
Together these settings ensure that messages are reliably delivered from the producer to the broker and to the consumer, even if the consumer has been disconnected for a while or crashes while receiving the message.
The following settings are relevant configuration options on the broker. You can generally find these settings in
- The broker must have persistence set to
Truein the broker configuration.
- You may want to set max_inflight_messages to 1 in the broker configuration to ensure correct ordering of messages.
- Configure max_queued_messsages to the maximum number of messages to retain in a queue.
- Tweak autosave_interval to how often you want the broker to write the in-memory database to disk.
The persistence setting informs the broker that you'd like session state and message queues written to disk. If the broker for some reason, the messages will (mostly) still be there.
You can ensure that messages are sent to consumers in the same order as they were sent to the broker by the producers by setting the max_inflight_messages setting to
1. This will probably severely limit the throughput speed of messages.
The max_queued_messsages determines how many unconfirmed messages should maximally be retained in queues. This should basically be the product of the maximum number of messages per second and the maximum time a consumer might be offline. Say we're processing 1 message per second and we want the consumer to be able to be offline for 2 hours (= 7200 seconds), then the
max_queued_messsages setting should be
1 * 7200 = 7200.
The autosave_interval determines how often you want the broker to write the in-memory database to disk. I suspect that setting this to a very low level will cause severe Disk I/O activity.
Here's an example of a producer and consumer:
import paho.mqtt.client as paho import time client = paho.Client(protocol=paho.MQTTv31) client.connect("localhost", 1883) client.loop_start() client.publish("mytesttopic", str("foo"), qos=2) time.sleep(1) # Give the client loop time to proess the message
import paho.mqtt.client as paho def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload)) client = paho.Client("testclient", clean_session=False, protocol=paho.MQTTv31) client.on_message = on_message client.connect("localhost", 1883) client.subscribe("mytesttopic", qos=2) client.loop_forever()
There are a few pitfalls I ran into when using Mosquitto:
- If the broker or one of the clients doesn't support the
MQTTv32protocol, things will fail silently. So I specify
clientloop needs some time to process the sending and receiving of messages. If you send a single message and exit your program right away, the loop doesn't have time to actually send the message.
subscribermust have already run once before the broker will start keeping messages for it. Otherwise, the broker has no idea that a consumer with QoS=2 is interested in messages (and would have to keep messages for ever). So register your consumer once by just running it, before the producer runs.
Although the settings above make exchanging messages with Mosquitto more reliable, there are still some downsides:
- Exchanging messages in this way is obviously slower than having no consistency checks in place.
- Since the Mosquitto broker only writes the in-memory database to disk every X (where X is configurable) seconds, you may lose data if the broker crashes.
- On the consumer side, it is the MQTT library that confirms the receipt of the message. However, as far as I can tell, there is no way to manually confirm the receipt of a message. So if your client crashes while handling a message, rather than while it is receiving a message, you may still lose the message. If you wish to handle this case, you can store the message on the client as soon as possible. This is, however, not much more reliable. The only other way is to implement some manual protocol via the exchange of messages where the original publisher retains a message and resends it unless its been acknowledged by the consumer.
In other words, as far as I can see, you cannot do reliable message handling with Mosquitto. If your broker crashes or your client crashes, Mosquitto will lose your messages. Other than that, if all you require is reliable delivery of messages to the client, you're good to go.
So what are the alternatives? At this point, I have to honest and say: I don't know yet. I'm personally looking for a lightweight solution, and it seems none of the lightweight Message Queues do reliable message handling (as opposed to reliable messagedelivery, which most do just fine).
When I find an answer, I'll let you know here.