Skip to content

MQTT API

This Library is a high-performance [MQTT] library for IoT publish/subscribe communications.

The library supports MQTT version 3.1.1 and has the following features:

  • Supports connect, publish, subscribe, ping and disconnect messages.
  • Message quality of service for reliable delivery.
  • Retained messages.
  • TLS encryption with ALPN over port 443.
  • High message throughput with exceptionally low overhead.
  • Wait for delivery or acknowledgement options.
  • Auto reconnect on network failures.
  • Parallelism via fiber coroutines.

Function Index

Mqtt *mqttAlloc(cchar *clientId, MqttEventProc proc)
 Allocate an MQTT client instance.
boolmqttCheckQueue(Mqtt *mq)
 Check if there are messages in the transmission queue.
intmqttConnect(Mqtt *mq, RSocket *sock, int flags, MqttWaitFlags waitFlags)
 Establish a session with the MQTT broker.
intmqttDisconnect(Mqtt *mq)
 Send a disconnection packet to the MQTT broker.
voidmqttFree(Mqtt *mq)
 Free an MQTT client instance.
cchar *mqttGetError(struct Mqtt *mq)
 Get the last error message for the MQTT instance.
TicksmqttGetLastActivity(Mqtt *mq)
 Return the time of last I/O activity.
boolmqttIsConnected(Mqtt *mq)
 Check if the MQTT instance is connected to a broker.
intmqttMsgsToSend(Mqtt *mq)
 Get the number of messages pending transmission.
intmqttPing(Mqtt *mq)
 Send a ping request to the broker.
intmqttPublish(Mqtt *mq, cvoid *msg, ssize size, int qos, MqttWaitFlags waitFlags, cchar *topic, ...)
 Publish an application message to the MQTT broker.
intmqttPublishRetained(Mqtt *mq, cvoid *msg, ssize size, int qos, MqttWaitFlags waitFlags, cchar *topic, ...)
 Publish a retained message to the MQTT broker.
intmqttSetCredentials(Mqtt *mq, cchar *username, cchar *password)
 Set authentication credentials for broker connection.
voidmqttSetKeepAlive(Mqtt *mq, Ticks keepAlive)
 Set the keep-alive timeout interval.
voidmqttSetMessageSize(Mqtt *mq, int size)
 Set the maximum message size for this MQTT instance.
voidmqttSetTimeout(Mqtt *mq, Ticks timeout)
 Set the idle connection timeout.
intmqttSetWill(Mqtt *mq, cchar *topic, cvoid *msg, ssize length)
 Set the last will and testament message.
intmqttSubscribe(Mqtt *mq, MqttCallback callback, int maxQos, MqttWaitFlags waitFlags, cchar *topic, ...)
 Subscribe to a topic pattern.
intmqttSubscribeMaster(Mqtt *mq, int maxQos, MqttWaitFlags waitFlags, cchar *topic, ...)
 Establish a master subscription for efficient topic management.
voidmqttThrottle(Mqtt *mq)
 Enable transmission throttling.
intmqttUnsubscribe(Mqtt *mq, cchar *topic, MqttWaitFlags wait)
 Unsubscribe from a topic.
intmqttUnsubscribeMaster(Mqtt *mq, cchar *topic, MqttWaitFlags wait)
 Unsubscribe from a master topic.

Typedef Index

MqttMQTT instance.
MqttCallbackMessage receipt callback.
MqttEventProcMQTt event callback.
MqttHdrFixed header of a packet.
MqttMsgMqtt message.
MqttRecvA struct used to deserialize/interpret an incoming packet from the broker.

Defines

#defineMQTT_BUF_SIZE   4096
 Receive buffer size.
#defineMQTT_EVENT_ATTACH   1
 Attach a socket.
#defineMQTT_EVENT_CONNECTED   2
 A new connection was established.
#defineMQTT_EVENT_DISCONNECT   3
 Connection closed.
#defineMQTT_EVENT_TIMEOUT   4
 The idle connection has timed out.
#defineMQTT_INLINE_BUF_SIZE   128
 Size of inline buffer.
#defineMQTT_KEEP_ALIVE   (20 * 60 * TPS)
 Default connection keep alive time.
#defineMQTT_MAX_MESSAGE_SIZE   256 * 1024 * 1024
 Max message size.
#defineMQTT_MAX_TOPIC_SIZE   128
 Max topic size.
#defineMQTT_MSG_TIMEOUT   (30 * TPS)
 Default message timeout.
#defineMQTT_PROTOCOL_LEVEL   0x04
 Protocol version 3.1.1.
#defineMQTT_TIMEOUT   (MAXINT)
 Default connection timeout in msec.
#defineMQTT_WAIT_ACK   0x2
 Wait for ack.
#defineMQTT_WAIT_FAST   0x4
 Fast callback.
#defineMQTT_WAIT_NONE   0x0
 Wait flags.
#defineMQTT_WAIT_SENT   0x1
 Wait for sent.

Typedefs

typedef void(* MqttCallback) (const struct MqttRecv *resp)

Message receipt callback.

Parameters:
respMessage received structure.
API Stability:
Evolving.

typedef void(* MqttEventProc) (struct Mqtt *mq, int event)

MQTt event callback.

Parameters:
mqMqtt object created via mqttAlloc.
eventEvent type, set to MQTT_EVENT_CONNECT, MQTT_EVENT_DISCONNECT or MQTT_EVENT_STOPPING.
API Stability:
Evolving.

Mqtt

MQTT instance.

API Stability:
Evolving.
Fields:
RBuf *buf I/O read buffer.
uintconnectedMqtt is currently connected flag.
uintdestroyedMqtt instance is destroyed - just for debugging.
interrorMqtt error flag.
char *errorMsgMqtt error message.
intfiberCount Number of fibers waiting for a message.
MqttMsghead Head of message queue.
char *id Client ID.
TickskeepAlive Server side keep alive duration in seconds.
REventkeepAliveEvent Keep alive event.
TickslastActivity Time of last I/O activity.
intmask R library wait event mask.
RList *masterTopics List of master subscription topics.
intmaxMessage Maximum message size.
intmsgTimeout Message timeout for retransmit.
intnextId Next message ID.
char *password Username for connect.
MqttEventProcproc Notification event callback.
uintprocessing ProcessMqtt is running.
RSocket *sock Underlying socket transport.
uintsubscribedApi Reserved.
Ticksthrottle Throttle delay in msec.
TicksthrottleLastPub Time of last publish or throttle.
TicksthrottleMark Throttle sending until Time.
Tickstimeout Inactivity timeout for on-demand connections.
RList *topics List of subscribed topics.
char *username Password for connect.
char *willMsg Will and testament message.
ssizewillMsgSize Size of will message.
char *willTopic Will and testament topic.

MqttHdr

Fixed header of a packet.

API Stability:
Evolving.
Fields:
intflags Packet control flags.
uintlength Size in of the variable portion after fixed header and packet length.

MqttMsg

Mqtt message.

API Stability:
Internal.
Fields:
uchar *buf External message text buffer for large messages.
uchar *end End of message.
uchar *endbuf End of message buffer.
RFiber *fiber Message fiber to process the message.
inthold Dont free message.
intid Message sequence ID.
ucharinlineBuf[MQTT_INLINE_BUF_SIZE] Inline message text buffer for small message efficiency.
struct MqttMsg *next Next message in the queue.
struct MqttMsg *prev Previous message in the queue.
intqos Message quality of service.
Tickssent Time the message was sent.
uchar *start Start of message.
MqttMsgStatestate Message send status.
MqttPacketTypetype Message packet type.
MqttWaitFlagswait Message wait flags.

MqttRecv

A struct used to deserialize/interpret an incoming packet from the broker.

API Stability:
Evolving.
Fields:
MqttConnCodecode Connection response code.
cuchar *codes Array of return codes for subscribed topics.
char *data Published message.
intdataSize Size of data.
uchardup Set to 0 on first attempt to send packet.
uinthasSession Connection using an existing session.
struct MqttHdrhdr MQTT message fixed header.
intid Message ID.
MqttTopic *matched Matched topic.
struct Mqtt *mq Message queue.
intnumCodes Size of codes.
ucharqos Quality of service.
ucharretain Message is retained.
cuchar *start Start of message.
char *topic Topic string.
inttopicSize Size of topic.

Functions

Mqtt * mqttAlloc (cchar *clientId, MqttEventProc proc)

Allocate an MQTT client instance.

Description:
Create a new MQTT client instance with the specified client ID and event callback. The client ID must be unique among all clients connecting to the same broker.
Parameters:
clientIdUnique client identifier string. Maximum length is MQTT_MAX_CLIENT_ID_SIZE.
procOptional event notification callback procedure for connection/disconnection events. Set to NULL if not required.
Returns:
Returns an allocated Mqtt instance or NULL on allocation failure.
API Stability:
Evolving.

bool mqttCheckQueue (Mqtt *mq)

Check if there are messages in the transmission queue.

Description:
Determine if there are pending messages waiting to be processed or transmitted to the broker.
Parameters:
mqThe MQTT object.
Returns:
True if messages are queued, false if queue is empty.
API Stability:
Internal.

int mqttConnect (Mqtt *mq, RSocket *sock, int flags, MqttWaitFlags waitFlags)

Establish a session with the MQTT broker.

Description:
This call establishes a new MQTT connection to the broker using the supplied socket. The MQTT object keeps a reference to the socket. If the socket is closed or freed by the caller, the caller must call mqttDisconnect first. This function must be called before any publish or subscribe operations.
Parameters:
mqThe Mqtt object allocated via mqttAlloc.
sockThe underlying socket to use for communications. This socket must be connected to an MQTT broker.
flagsAdditional MqttConnectFlags to use when establishing the connection. These flags control session behavior: MQTT_CONNECT_CLEAN_SESSION to start fresh, QOS levels for will messages, and MQTT_CONNECT_WILL_RETAIN for retained will messages.
waitFlagsWait flags. Set to MQTT_WAIT_NONE, MQTT_WAIT_SENT or MQTT_WAIT_ACK.
Returns:
Zero if successful, negative on error.
API Stability:
Evolving.

int mqttDisconnect (Mqtt *mq)

Send a disconnection packet to the MQTT broker.

Description:
This sends a DISCONNECT packet to gracefully terminate the MQTT session. This will not close the underlying socket. The broker, upon receiving the disconnection packet, will close the connection. Call this before freeing the socket or MQTT instance.
Parameters:
mqThe Mqtt object.
Returns:
Zero if successful, negative on error.
API Stability:
Evolving.

void mqttFree (Mqtt *mq)

Free an MQTT client instance.

Description:
Release all resources associated with the MQTT instance. This will automatically disconnect from the broker if still connected and free all subscriptions and queued messages.
Parameters:
mqMqtt instance allocated via mqttAlloc. Pass NULL to ignore.
API Stability:
Evolving.

cchar * mqttGetError (struct Mqtt *mq)

Get the last error message for the MQTT instance.

Description:
Returns a human-readable error message describing the last error that occurred on this MQTT instance. This is useful for debugging connection and protocol issues.
Parameters:
mqMqtt object.
Returns:
The associated error message string or empty string if no error.
API Stability:
Evolving.

Ticks mqttGetLastActivity (Mqtt *mq)

Return the time of last I/O activity.

Description:
Get the timestamp of the last network I/O activity on the MQTT connection. This is useful for implementing connection monitoring and keep-alive logic.
Parameters:
mqThe Mqtt object.
Returns:
The time in Ticks of the last I/O activity.
API Stability:
Evolving.

bool mqttIsConnected (Mqtt *mq)

Check if the MQTT instance is connected to a broker.

Description:
Test whether the MQTT client currently has an active connection to an MQTT broker and can send/receive messages.
Parameters:
mqThe MQTT object.
Returns:
True if connected to a broker, false otherwise.
API Stability:
Prototype.

int mqttMsgsToSend (Mqtt *mq)

Get the number of messages pending transmission.

Description:
Returns the count of messages in the outbound queue waiting to be sent or acknowledged by the broker. This is useful for flow control and monitoring.
Parameters:
mqThe MQTT object.
Returns:
The number of messages in the transmission queue.
API Stability:
Evolving.

int mqttPing (Mqtt *mq)

Send a ping request to the broker.

Description:
Send a PINGREQ packet to the broker to test connectivity and reset the keep-alive timer. The broker will respond with a PINGRESP packet.
Parameters:
mqThe MQTT object.
Returns:
Zero if successful, negative on error.
API Stability:
Evolving.

int mqttPublish (Mqtt *mq, cvoid *msg, ssize size, int qos, MqttWaitFlags waitFlags, cchar *topic, ...)

Publish an application message to the MQTT broker.

Description:
Publish a message to the specified topic. The MQTT instance must be connected to a broker via mqttConnect before calling this function.
Parameters:
mqThe Mqtt object.
msgThe data to be published. Can be binary data.
sizeThe size of the message in bytes. Maximum size is MQTT_MAX_MESSAGE_SIZE.
qosQuality of service level: 0 (at most once), 1 (at least once), or 2 (exactly once). If QOS 0 is used, MQTT_WAIT_ACK will be ignored.
waitFlagsWait flags. Set to MQTT_WAIT_NONE (async), MQTT_WAIT_SENT (wait for send), or MQTT_WAIT_ACK (wait for broker acknowledgment).
topicPrintf-style topic string. Maximum length is MQTT_MAX_TOPIC_SIZE.
...Topic formatting arguments.
Returns:
Zero if successful, negative on error.
API Stability:
Evolving.

int mqttPublishRetained (Mqtt *mq, cvoid *msg, ssize size, int qos, MqttWaitFlags waitFlags, cchar *topic, ...)

Publish a retained message to the MQTT broker.

Description:
Publish a message with the retain flag set. Retained messages are stored by the broker and delivered to new subscribers immediately upon subscription. The MQTT instance must be connected to a broker via mqttConnect.
Parameters:
mqThe Mqtt object.
msgThe data to be published. Can be binary data.
sizeThe size of the message in bytes. Maximum size is MQTT_MAX_MESSAGE_SIZE.
qosQuality of service level: 0 (at most once), 1 (at least once), or 2 (exactly once).
waitFlagsWait flags. Set to MQTT_WAIT_NONE (async), MQTT_WAIT_SENT (wait for send), or MQTT_WAIT_ACK (wait for broker acknowledgment).
topicPrintf-style topic string. Maximum length is MQTT_MAX_TOPIC_SIZE.
...Topic formatting arguments.
Returns:
Zero if successful, negative on error.
API Stability:
Evolving.

int mqttSetCredentials (Mqtt *mq, cchar *username, cchar *password)

Set authentication credentials for broker connection.

Description:
Define the username and password to use when connecting to the MQTT broker. These credentials must be set before calling mqttConnect if the broker requires authentication.
Parameters:
mqThe Mqtt object.
usernameThe username to use when establishing the session with the MQTT broker. Set to NULL if a username is not required. Maximum length is MQTT_MAX_USERNAME_SIZE.
passwordThe password to use when establishing the session with the MQTT broker. Set to NULL if a password is not required. Maximum length is MQTT_MAX_PASSWORD_SIZE.
Returns:
Zero if successful, negative on error.
API Stability:
Evolving.

void mqttSetKeepAlive (Mqtt *mq, Ticks keepAlive)

Set the keep-alive timeout interval.

Description:
Configure the maximum time interval between client messages to the broker. If no messages are sent within this period, a PINGREQ packet will be sent automatically to maintain the connection.
Parameters:
mqThe MQTT object.
keepAliveTime interval in milliseconds before sending a keep-alive message. Set to 0 to disable keep-alive.
API Stability:
Evolving.

void mqttSetMessageSize (Mqtt *mq, int size)

Set the maximum message size for this MQTT instance.

Description:
Configure the maximum allowed message size for publish operations. Some brokers like AWS IoT have smaller limits than the default. This setting helps prevent oversized messages from being rejected.
Parameters:
mqThe MQTT object.
sizeThe maximum message size in bytes. Must be positive and less than MQTT_MAX_MESSAGE_SIZE.
API Stability:
Evolving.

void mqttSetTimeout (Mqtt *mq, Ticks timeout)

Set the idle connection timeout.

Description:
Configure how long the connection can remain idle before being automatically closed. This is useful for on-demand connections that should disconnect when not in use.
Parameters:
mqThe MQTT object.
timeoutTime to wait in milliseconds before closing an idle connection. Set to MAXINT to disable automatic disconnection.
API Stability:
Evolving.

int mqttSetWill (Mqtt *mq, cchar *topic, cvoid *msg, ssize length)

Set the last will and testament message.

Description:
Configure a message that the broker will publish if this client disconnects unexpectedly. The will message is sent when the broker detects an abnormal disconnection (network failure, client crash, etc.).
Parameters:
mqThe MQTT object.
topicTopic where the will message will be published. Maximum length is MQTT_MAX_TOPIC_SIZE.
msgWill message data. Can be binary data.
lengthSize of the will message in bytes.
Returns:
Zero if successful, negative on error.
API Stability:
Evolving.

int mqttSubscribe (Mqtt *mq, MqttCallback callback, int maxQos, MqttWaitFlags waitFlags, cchar *topic, ...)

Subscribe to a topic pattern.

Description:
Subscribe to receive messages published to topics matching the specified pattern. The MQTT instance must be connected to a broker via mqttConnect. Topic patterns support MQTT wildcards: '+' for single level, '#' for multi-level.
Parameters:
mqMqtt object.
callbackFunction to invoke when messages are received on this topic. Set to NULL to use a default handler.
maxQosMaximum quality of service level to accept: 0, 1, or 2.
waitFlagsWait flags. Set to MQTT_WAIT_NONE (async), MQTT_WAIT_SENT (wait for send), or MQTT_WAIT_ACK (wait for broker acknowledgment).
topicPrintf-style topic pattern string supporting MQTT wildcards.
...Topic formatting arguments.
Returns:
Zero if successful, negative on error.
API Stability:
Evolving.

int mqttSubscribeMaster (Mqtt *mq, int maxQos, MqttWaitFlags waitFlags, cchar *topic, ...)

Establish a master subscription for efficient topic management.

Description:
To minimize the number of active MQTT protocol subscriptions, this call establishes a master subscription. Subsequent local subscriptions using this master topic as a prefix will not incur additional MQTT protocol subscriptions but will be processed locally off the master subscription. This is useful for applications with many related topic subscriptions.
Parameters:
mqMqtt object.
maxQosMaximum quality of service level to accept: 0, 1, or 2. This QOS level applies to all local subscriptions using this master.
waitFlagsWait flags. Set to MQTT_WAIT_NONE (async), MQTT_WAIT_SENT (wait for send), or MQTT_WAIT_ACK (wait for broker acknowledgment).
topicPrintf-style master topic pattern string.
...Topic formatting arguments.
Returns:
Zero if successful, negative on error.
API Stability:
Evolving.

void mqttThrottle (Mqtt *mq)

Enable transmission throttling.

Description:
Temporarily throttle message transmission to prevent overwhelming the broker or network. This is used internally for flow control.
Parameters:
mqThe MQTT object.
API Stability:
Internal.

int mqttUnsubscribe (Mqtt *mq, cchar *topic, MqttWaitFlags wait)

Unsubscribe from a topic.

Description:
Remove a subscription for the specified topic pattern. If the topic is a local subscription under a master topic, it will be removed locally without affecting the master subscription.
Parameters:
mqThe MQTT object.
topicThe topic pattern to unsubscribe from. Must match a previously subscribed topic.
waitWait flags. Set to MQTT_WAIT_NONE (async), MQTT_WAIT_SENT (wait for send), or MQTT_WAIT_ACK (wait for broker acknowledgment).
Returns:
Zero if successful, negative on error.
API Stability:
Evolving.

int mqttUnsubscribeMaster (Mqtt *mq, cchar *topic, MqttWaitFlags wait)

Unsubscribe from a master topic.

Description:
Remove a master subscription and all associated local subscriptions. This will send an UNSUBSCRIBE packet to the broker for the master topic.
Parameters:
mqThe MQTT object.
topicThe master topic pattern to unsubscribe from.
waitWait flags. Set to MQTT_WAIT_NONE (async), MQTT_WAIT_SENT (wait for send), or MQTT_WAIT_ACK (wait for broker acknowledgment).
Returns:
Zero if successful, negative on error.
API Stability:
Evolving.