MQTT API
This Library is a high-performance [MQTT] library for IoT publish/subscribe communications.
The library supports MQTT version 3.1.1 and has the following features:
- Supports connect, publish, subscribe, ping and disconnect messages.
- Message quality of service for reliable delivery.
- Retained messages.
- TLS encryption with ALPN over port 443.
- High message throughput with exceptionally low overhead.
- Wait for delivery or acknowledgement options.
- Auto reconnect on network failures.
- Parallelism via fiber coroutines.
Function Index
Mqtt * | mqttAlloc(cchar *clientId, MqttEventProc proc) |
Allocate an MQTT client instance. | |
bool | mqttCheckQueue(Mqtt *mq) |
Check if there are messages in the transmission queue. | |
int | mqttConnect(Mqtt *mq, RSocket *sock, int flags, MqttWaitFlags waitFlags) |
Establish a session with the MQTT broker. | |
int | mqttDisconnect(Mqtt *mq) |
Send a disconnection packet to the MQTT broker. | |
void | mqttFree(Mqtt *mq) |
Free an MQTT client instance. | |
cchar * | mqttGetError(struct Mqtt *mq) |
Get the last error message for the MQTT instance. | |
Ticks | mqttGetLastActivity(Mqtt *mq) |
Return the time of last I/O activity. | |
bool | mqttIsConnected(Mqtt *mq) |
Check if the MQTT instance is connected to a broker. | |
int | mqttMsgsToSend(Mqtt *mq) |
Get the number of messages pending transmission. | |
int | mqttPing(Mqtt *mq) |
Send a ping request to the broker. | |
int | mqttPublish(Mqtt *mq, cvoid *msg, ssize size, int qos, MqttWaitFlags waitFlags, cchar *topic, ...) |
Publish an application message to the MQTT broker. | |
int | mqttPublishRetained(Mqtt *mq, cvoid *msg, ssize size, int qos, MqttWaitFlags waitFlags, cchar *topic, ...) |
Publish a retained message to the MQTT broker. | |
int | mqttSetCredentials(Mqtt *mq, cchar *username, cchar *password) |
Set authentication credentials for broker connection. | |
void | mqttSetKeepAlive(Mqtt *mq, Ticks keepAlive) |
Set the keep-alive timeout interval. | |
void | mqttSetMessageSize(Mqtt *mq, int size) |
Set the maximum message size for this MQTT instance. | |
void | mqttSetTimeout(Mqtt *mq, Ticks timeout) |
Set the idle connection timeout. | |
int | mqttSetWill(Mqtt *mq, cchar *topic, cvoid *msg, ssize length) |
Set the last will and testament message. | |
int | mqttSubscribe(Mqtt *mq, MqttCallback callback, int maxQos, MqttWaitFlags waitFlags, cchar *topic, ...) |
Subscribe to a topic pattern. | |
int | mqttSubscribeMaster(Mqtt *mq, int maxQos, MqttWaitFlags waitFlags, cchar *topic, ...) |
Establish a master subscription for efficient topic management. | |
void | mqttThrottle(Mqtt *mq) |
Enable transmission throttling. | |
int | mqttUnsubscribe(Mqtt *mq, cchar *topic, MqttWaitFlags wait) |
Unsubscribe from a topic. | |
int | mqttUnsubscribeMaster(Mqtt *mq, cchar *topic, MqttWaitFlags wait) |
Unsubscribe from a master topic. |
Typedef Index
Mqtt | MQTT instance. |
MqttCallback | Message receipt callback. |
MqttEventProc | MQTt event callback. |
MqttHdr | Fixed header of a packet. |
MqttMsg | Mqtt message. |
MqttRecv | A struct used to deserialize/interpret an incoming packet from the broker. |
Defines
#define | MQTT_BUF_SIZE 4096 |
Receive buffer size. | |
#define | MQTT_EVENT_ATTACH 1 |
Attach a socket. | |
#define | MQTT_EVENT_CONNECTED 2 |
A new connection was established. | |
#define | MQTT_EVENT_DISCONNECT 3 |
Connection closed. | |
#define | MQTT_EVENT_TIMEOUT 4 |
The idle connection has timed out. | |
#define | MQTT_INLINE_BUF_SIZE 128 |
Size of inline buffer. | |
#define | MQTT_KEEP_ALIVE (20 * 60 * TPS) |
Default connection keep alive time. | |
#define | MQTT_MAX_MESSAGE_SIZE 256 * 1024 * 1024 |
Max message size. | |
#define | MQTT_MAX_TOPIC_SIZE 128 |
Max topic size. | |
#define | MQTT_MSG_TIMEOUT (30 * TPS) |
Default message timeout. | |
#define | MQTT_PROTOCOL_LEVEL 0x04 |
Protocol version 3.1.1. | |
#define | MQTT_TIMEOUT (MAXINT) |
Default connection timeout in msec. | |
#define | MQTT_WAIT_ACK 0x2 |
Wait for ack. | |
#define | MQTT_WAIT_FAST 0x4 |
Fast callback. | |
#define | MQTT_WAIT_NONE 0x0 |
Wait flags. | |
#define | MQTT_WAIT_SENT 0x1 |
Wait for sent. |
Typedefs
Message receipt callback.
- Parameters:
resp Message received structure.
- API Stability:
- Evolving.
MQTt event callback.
- Parameters:
mq Mqtt object created via mqttAlloc. event Event type, set to MQTT_EVENT_CONNECT, MQTT_EVENT_DISCONNECT or MQTT_EVENT_STOPPING.
- API Stability:
- Evolving.
MQTT instance.
- API Stability:
- Evolving.
- Fields:
RBuf * buf I/O read buffer. uint connected Mqtt is currently connected flag. uint destroyed Mqtt instance is destroyed - just for debugging. int error Mqtt error flag. char * errorMsg Mqtt error message. int fiberCount Number of fibers waiting for a message. MqttMsg head Head of message queue. char * id Client ID. Ticks keepAlive Server side keep alive duration in seconds. REvent keepAliveEvent Keep alive event. Ticks lastActivity Time of last I/O activity. int mask R library wait event mask. RList * masterTopics List of master subscription topics. int maxMessage Maximum message size. int msgTimeout Message timeout for retransmit. int nextId Next message ID. char * password Username for connect. MqttEventProc proc Notification event callback. uint processing ProcessMqtt is running. RSocket * sock Underlying socket transport. uint subscribedApi Reserved. Ticks throttle Throttle delay in msec. Ticks throttleLastPub Time of last publish or throttle. Ticks throttleMark Throttle sending until Time. Ticks timeout Inactivity timeout for on-demand connections. RList * topics List of subscribed topics. char * username Password for connect. char * willMsg Will and testament message. ssize willMsgSize Size of will message. char * willTopic Will and testament topic.
Fixed header of a packet.
- API Stability:
- Evolving.
- Fields:
int flags Packet control flags. uint length Size in of the variable portion after fixed header and packet length.
Mqtt message.
- API Stability:
- Internal.
- Fields:
uchar * buf External message text buffer for large messages. uchar * end End of message. uchar * endbuf End of message buffer. RFiber * fiber Message fiber to process the message. int hold Dont free message. int id Message sequence ID. uchar inlineBuf[MQTT_INLINE_BUF_SIZE] Inline message text buffer for small message efficiency. struct MqttMsg * next Next message in the queue. struct MqttMsg * prev Previous message in the queue. int qos Message quality of service. Ticks sent Time the message was sent. uchar * start Start of message. MqttMsgState state Message send status. MqttPacketType type Message packet type. MqttWaitFlags wait Message wait flags.
A struct used to deserialize/interpret an incoming packet from the broker.
- API Stability:
- Evolving.
- Fields:
MqttConnCode code Connection response code. cuchar * codes Array of return codes for subscribed topics. char * data Published message. int dataSize Size of data. uchar dup Set to 0 on first attempt to send packet. uint hasSession Connection using an existing session. struct MqttHdr hdr MQTT message fixed header. int id Message ID. MqttTopic * matched Matched topic. struct Mqtt * mq Message queue. int numCodes Size of codes. uchar qos Quality of service. uchar retain Message is retained. cuchar * start Start of message. char * topic Topic string. int topicSize Size of topic.
Functions
Allocate an MQTT client instance.
- Description:
- Create a new MQTT client instance with the specified client ID and event callback. The client ID must be unique among all clients connecting to the same broker.
- Parameters:
clientId Unique client identifier string. Maximum length is MQTT_MAX_CLIENT_ID_SIZE. proc Optional event notification callback procedure for connection/disconnection events. Set to NULL if not required.
- Returns:
- Returns an allocated Mqtt instance or NULL on allocation failure.
- API Stability:
- Evolving.
Check if there are messages in the transmission queue.
- Description:
- Determine if there are pending messages waiting to be processed or transmitted to the broker.
- Parameters:
mq The MQTT object.
- Returns:
- True if messages are queued, false if queue is empty.
- API Stability:
- Internal.
Establish a session with the MQTT broker.
- Description:
- This call establishes a new MQTT connection to the broker using the supplied socket. The MQTT object keeps a reference to the socket. If the socket is closed or freed by the caller, the caller must call mqttDisconnect first. This function must be called before any publish or subscribe operations.
- Parameters:
mq The Mqtt object allocated via mqttAlloc. sock The underlying socket to use for communications. This socket must be connected to an MQTT broker. flags Additional MqttConnectFlags to use when establishing the connection. These flags control session behavior: MQTT_CONNECT_CLEAN_SESSION to start fresh, QOS levels for will messages, and MQTT_CONNECT_WILL_RETAIN for retained will messages. waitFlags Wait flags. Set to MQTT_WAIT_NONE, MQTT_WAIT_SENT or MQTT_WAIT_ACK.
- Returns:
- Zero if successful, negative on error.
- API Stability:
- Evolving.
Send a disconnection packet to the MQTT broker.
- Description:
- This sends a DISCONNECT packet to gracefully terminate the MQTT session. This will not close the underlying socket. The broker, upon receiving the disconnection packet, will close the connection. Call this before freeing the socket or MQTT instance.
- Parameters:
mq The Mqtt object.
- Returns:
- Zero if successful, negative on error.
- API Stability:
- Evolving.
Free an MQTT client instance.
- Description:
- Release all resources associated with the MQTT instance. This will automatically disconnect from the broker if still connected and free all subscriptions and queued messages.
- Parameters:
mq Mqtt instance allocated via mqttAlloc. Pass NULL to ignore.
- API Stability:
- Evolving.
Get the last error message for the MQTT instance.
- Description:
- Returns a human-readable error message describing the last error that occurred on this MQTT instance. This is useful for debugging connection and protocol issues.
- Parameters:
mq Mqtt object.
- Returns:
- The associated error message string or empty string if no error.
- API Stability:
- Evolving.
Return the time of last I/O activity.
- Description:
- Get the timestamp of the last network I/O activity on the MQTT connection. This is useful for implementing connection monitoring and keep-alive logic.
- Parameters:
mq The Mqtt object.
- Returns:
- The time in Ticks of the last I/O activity.
- API Stability:
- Evolving.
Check if the MQTT instance is connected to a broker.
- Description:
- Test whether the MQTT client currently has an active connection to an MQTT broker and can send/receive messages.
- Parameters:
mq The MQTT object.
- Returns:
- True if connected to a broker, false otherwise.
- API Stability:
- Prototype.
Get the number of messages pending transmission.
- Description:
- Returns the count of messages in the outbound queue waiting to be sent or acknowledged by the broker. This is useful for flow control and monitoring.
- Parameters:
mq The MQTT object.
- Returns:
- The number of messages in the transmission queue.
- API Stability:
- Evolving.
Send a ping request to the broker.
- Description:
- Send a PINGREQ packet to the broker to test connectivity and reset the keep-alive timer. The broker will respond with a PINGRESP packet.
- Parameters:
mq The MQTT object.
- Returns:
- Zero if successful, negative on error.
- API Stability:
- Evolving.
Publish an application message to the MQTT broker.
- Description:
- Publish a message to the specified topic. The MQTT instance must be connected to a broker via mqttConnect before calling this function.
- Parameters:
mq The Mqtt object. msg The data to be published. Can be binary data. size The size of the message in bytes. Maximum size is MQTT_MAX_MESSAGE_SIZE. qos Quality of service level: 0 (at most once), 1 (at least once), or 2 (exactly once). If QOS 0 is used, MQTT_WAIT_ACK will be ignored. waitFlags Wait flags. Set to MQTT_WAIT_NONE (async), MQTT_WAIT_SENT (wait for send), or MQTT_WAIT_ACK (wait for broker acknowledgment). topic Printf-style topic string. Maximum length is MQTT_MAX_TOPIC_SIZE. ... Topic formatting arguments.
- Returns:
- Zero if successful, negative on error.
- API Stability:
- Evolving.
Publish a retained message to the MQTT broker.
- Description:
- Publish a message with the retain flag set. Retained messages are stored by the broker and delivered to new subscribers immediately upon subscription. The MQTT instance must be connected to a broker via mqttConnect.
- Parameters:
mq The Mqtt object. msg The data to be published. Can be binary data. size The size of the message in bytes. Maximum size is MQTT_MAX_MESSAGE_SIZE. qos Quality of service level: 0 (at most once), 1 (at least once), or 2 (exactly once). waitFlags Wait flags. Set to MQTT_WAIT_NONE (async), MQTT_WAIT_SENT (wait for send), or MQTT_WAIT_ACK (wait for broker acknowledgment). topic Printf-style topic string. Maximum length is MQTT_MAX_TOPIC_SIZE. ... Topic formatting arguments.
- Returns:
- Zero if successful, negative on error.
- API Stability:
- Evolving.
Set authentication credentials for broker connection.
- Description:
- Define the username and password to use when connecting to the MQTT broker. These credentials must be set before calling mqttConnect if the broker requires authentication.
- Parameters:
mq The Mqtt object. username The username to use when establishing the session with the MQTT broker. Set to NULL if a username is not required. Maximum length is MQTT_MAX_USERNAME_SIZE. password The password to use when establishing the session with the MQTT broker. Set to NULL if a password is not required. Maximum length is MQTT_MAX_PASSWORD_SIZE.
- Returns:
- Zero if successful, negative on error.
- API Stability:
- Evolving.
Set the keep-alive timeout interval.
- Description:
- Configure the maximum time interval between client messages to the broker. If no messages are sent within this period, a PINGREQ packet will be sent automatically to maintain the connection.
- Parameters:
mq The MQTT object. keepAlive Time interval in milliseconds before sending a keep-alive message. Set to 0 to disable keep-alive.
- API Stability:
- Evolving.
Set the maximum message size for this MQTT instance.
- Description:
- Configure the maximum allowed message size for publish operations. Some brokers like AWS IoT have smaller limits than the default. This setting helps prevent oversized messages from being rejected.
- Parameters:
mq The MQTT object. size The maximum message size in bytes. Must be positive and less than MQTT_MAX_MESSAGE_SIZE.
- API Stability:
- Evolving.
Set the idle connection timeout.
- Description:
- Configure how long the connection can remain idle before being automatically closed. This is useful for on-demand connections that should disconnect when not in use.
- Parameters:
mq The MQTT object. timeout Time to wait in milliseconds before closing an idle connection. Set to MAXINT to disable automatic disconnection.
- API Stability:
- Evolving.
Set the last will and testament message.
- Description:
- Configure a message that the broker will publish if this client disconnects unexpectedly. The will message is sent when the broker detects an abnormal disconnection (network failure, client crash, etc.).
- Parameters:
mq The MQTT object. topic Topic where the will message will be published. Maximum length is MQTT_MAX_TOPIC_SIZE. msg Will message data. Can be binary data. length Size of the will message in bytes.
- Returns:
- Zero if successful, negative on error.
- API Stability:
- Evolving.
Subscribe to a topic pattern.
- Description:
- Subscribe to receive messages published to topics matching the specified pattern. The MQTT instance must be connected to a broker via mqttConnect. Topic patterns support MQTT wildcards: '+' for single level, '#' for multi-level.
- Parameters:
mq Mqtt object. callback Function to invoke when messages are received on this topic. Set to NULL to use a default handler. maxQos Maximum quality of service level to accept: 0, 1, or 2. waitFlags Wait flags. Set to MQTT_WAIT_NONE (async), MQTT_WAIT_SENT (wait for send), or MQTT_WAIT_ACK (wait for broker acknowledgment). topic Printf-style topic pattern string supporting MQTT wildcards. ... Topic formatting arguments.
- Returns:
- Zero if successful, negative on error.
- API Stability:
- Evolving.
Establish a master subscription for efficient topic management.
- Description:
- To minimize the number of active MQTT protocol subscriptions, this call establishes a master subscription. Subsequent local subscriptions using this master topic as a prefix will not incur additional MQTT protocol subscriptions but will be processed locally off the master subscription. This is useful for applications with many related topic subscriptions.
- Parameters:
mq Mqtt object. maxQos Maximum quality of service level to accept: 0, 1, or 2. This QOS level applies to all local subscriptions using this master. waitFlags Wait flags. Set to MQTT_WAIT_NONE (async), MQTT_WAIT_SENT (wait for send), or MQTT_WAIT_ACK (wait for broker acknowledgment). topic Printf-style master topic pattern string. ... Topic formatting arguments.
- Returns:
- Zero if successful, negative on error.
- API Stability:
- Evolving.
Enable transmission throttling.
- Description:
- Temporarily throttle message transmission to prevent overwhelming the broker or network. This is used internally for flow control.
- Parameters:
mq The MQTT object.
- API Stability:
- Internal.
Unsubscribe from a topic.
- Description:
- Remove a subscription for the specified topic pattern. If the topic is a local subscription under a master topic, it will be removed locally without affecting the master subscription.
- Parameters:
mq The MQTT object. topic The topic pattern to unsubscribe from. Must match a previously subscribed topic. wait Wait flags. Set to MQTT_WAIT_NONE (async), MQTT_WAIT_SENT (wait for send), or MQTT_WAIT_ACK (wait for broker acknowledgment).
- Returns:
- Zero if successful, negative on error.
- API Stability:
- Evolving.
Unsubscribe from a master topic.
- Description:
- Remove a master subscription and all associated local subscriptions. This will send an UNSUBSCRIBE packet to the broker for the master topic.
- Parameters:
mq The MQTT object. topic The master topic pattern to unsubscribe from. wait Wait flags. Set to MQTT_WAIT_NONE (async), MQTT_WAIT_SENT (wait for send), or MQTT_WAIT_ACK (wait for broker acknowledgment).
- Returns:
- Zero if successful, negative on error.
- API Stability:
- Evolving.