Skip to content

MQTT API

This Library is a high-performance [MQTT] library for IoT publish/subscribe communications.

The library supports MQTT version 3.1.1 and has the following features:

  • Supports connect, publish, subscribe, ping and disconnect messages.
  • Message quality of service for reliable delivery.
  • Retained messages.
  • TLS encryption with ALPN over port 443.
  • High message throughput with exceptionally low overhead.
  • Wait for delivery or acknowledgement options.
  • Auto reconnect on network failures.
  • Parallelism via fiber coroutines.

Extensions

Mqtt MQTT Protocol.

Functions

Mqtt *mqttAlloc(RSocket *sock, cchar *clientId, MqttEventProc proc)
 Allocate an MQTT object.
intmqttConnect(Mqtt *mq, cchar *username, cchar *password, int flags, MqttWaitFlags waitFlags)
 Establish a session with the MQTT broker.
voidmqttFree(Mqtt *mq)
 Free an Mqtt instance.
cchar *mqttGetError(struct Mqtt *mq)
 Returns an error message for error code, error.
boolmqttIsConnected(Mqtt *mq)
 Return true if the MQTT instance is connected to a peer.
intmqttMsgsInQueue(Mqtt *mq)
 Get the number of messages to send in the send queue.
intmqttMsgsToSend(Mqtt *mq)
 Get the number of messages in the queue.
intmqttPing(Mqtt *mq)
 Ping the broker.
intmqttPublish(Mqtt *mq, cvoid *msg, ssize size, int qos, MqttWaitFlags waitFlags, cchar *topic, ...)
 Publish an application message to the MQTT broker.
intmqttPublishRetained(Mqtt *mq, cvoid *msg, ssize size, int qos, MqttWaitFlags waitFlags, cchar *topic, ...)
 Publish a retained message to the MQTT broker.
voidmqttSetMessageSize(Mqtt *mq, int size)
 Set the maximum message size.
voidmqttSetWill(Mqtt *mq, cchar *topic, cvoid *msg, ssize length)
 Set the will and testament message.
intmqttSubscribe(Mqtt *mq, MqttCallback callback, int maxQos, MqttWaitFlags waitFlags, cchar *topic, ...)
 Subscribe to a topic.
intmqttUnsubscribe(Mqtt *mq, cchar *topic, MqttWaitFlags wait)
 Unsubscribe from a topic.

Typedefs

MqttMQTT instance.
MqttCallbackMessage receipt callback.
MqttEventProcMQTt event callback.
MqttHdrFixed header of a packet.
MqttMsgMqtt message.
MqttRecvA struct used to deserialize/interpret an incoming packet from the broker.

Defines

#defineMQTT_BUF_SIZE   4096
 Receive buffer size.
#defineMQTT_EVENT_CONNECT   1
 A new connection was established.
#defineMQTT_EVENT_DISCONNECT   2
 The connection was terminated.
#defineMQTT_EVENT_STOPPING   3
 The application is stopping.
#defineMQTT_INLINE_BUF_SIZE   128
 Size of inline buffer.
#defineMQTT_KEEP_ALIVE   (20 * 60)
 Default connection keep alive time in seconds.
#defineMQTT_MAX_MESSAGE_SIZE   256 * 1024 * 1024
 Max message size.
#defineMQTT_MSG_TIMEOUT   30
 Default message timeout in seconds.
#defineMQTT_PROTOCOL_LEVEL   0x04
 Protocol version 3.1.1.
#defineMQTT_TOPIC_SIZE   128
 Max topic size.
#defineMQTT_PROTOCOL_LEVEL   0x04
 Protocol version 3.1.1.

Mqtt

Mqtt

MQTT Protocol.

API Stability:
Evolving.
Fields:
RBuf *buf I/O read buffer.
uintconnected Mqtt is currently connected flag.
interror Mqtt error flag.
char *errorMsg Mqtt error message.
MqttMsghead Head of message queue.
char *id Client ID.
intkeepAlive Keep alive duration in seconds.
time_tlastSend Time of last send in seconds.
intmask R library wait event mask.
intmaxMessage Maximum message size.
intmsgTimeout Message timeout for retransmit.
intnextId Next message ID.
MqttEventProcproc Notification event callback
RSocket *sock Underlying socket transport.
uintsubscribedApi Reserved.
intthrottle Throttle delay in msec.
TimethrottleLastPub Time of last publish or throttle.
TimethrottleMark Throttle sending until Time.
RList *topics List of subscribed topics.
char *willMsg Will and testament message.
ssizewillMsgSize Size of will message.
char *willTopic Will and testament topic.

Mqtt * * mqttAlloc (RSocket *sock, cchar *clientId, MqttEventProc proc)

Allocate an MQTT object.

Description:
This layers an MQTT transport over an existing connected socket.
Parameters:
sockA connected socket object created via rAllocSocket and rConnectSocket.
clientIdUnique client identifier string.
procEvent notification callback procedure.
API Stability:
Evolving.

int mqttConnect (Mqtt *mq, cchar *username, cchar *password, int flags, MqttWaitFlags waitFlags)

Establish a session with the MQTT broker.

Parameters:
mqThe Mqtt object.
usernameThe username to use when establishing the session with the MQTT broker. Set to NULL if a username is not required.
passwordThe password to use when establishing the session with the MQTT broker. Set to NULL if a password is not required.
flagsAdditional MqttConnectFlags to use when establishing the connection. These flags are for forcing the session to start clean: MQTT_CONNECT_CLEAN_SESSION, the QOS level to publish the will and testament messages with, and whether or not the broker should retain the will_message, MQTT_CONNECT_WILL_RETAIN.
waitFlagsWait flags. Set to MQTT_WAIT_NONE, MQTT_WAIT_SENT or MQTT_WAIT_ACK.
Returns:
Zero if successful.
API Stability:
Evolving.

cchar * * mqttGetError (struct Mqtt *mq)

Returns an error message for error code, error.

Parameters:
mqMqtt object.
Returns:
The associated error message.
API Stability:
Evolving.

bool mqttIsConnected (Mqtt *mq)

Return true if the MQTT instance is connected to a peer.

Parameters:
mqThe MQTT mq.
Returns:
True if connected.
API Stability:
Prototype.

int mqttMsgsInQueue (Mqtt *mq)

Get the number of messages to send in the send queue.

Parameters:
mqThe MQTT mq.
Returns:
The number of messages in the queue to send.
API Stability:
Evolving.

int mqttMsgsToSend (Mqtt *mq)

Get the number of messages in the queue.

Parameters:
mqThe MQTT mq.
Returns:
The number of messages in the queue.
API Stability:
Evolving.

int mqttPing (Mqtt *mq)

Ping the broker.

Parameters:
mqThe MQTT mq.
Returns:
Zero if successful.
API Stability:
Evolving.

int mqttPublish (Mqtt *mq, cvoid *msg, ssize size, int qos, MqttWaitFlags waitFlags, cchar *topic, ...)

Publish an application message to the MQTT broker.

Parameters:
mqThe Mqtt object.
msgThe data to be published.
sizeThe size of application_message in bytes.
qosQuality of service. 0, 1, or 2.
waitFlagsWait flags.
topicPrintf style topic string.
...Topic args.
Returns:
Zero if successful.

int mqttPublishRetained (Mqtt *mq, cvoid *msg, ssize size, int qos, MqttWaitFlags waitFlags, cchar *topic, ...)

Publish a retained message to the MQTT broker.

Parameters:
mqThe Mqtt object.
msgThe data to be published.
sizeThe size of application_message in bytes.
qosQuality of service. 0, 1, or 2.
waitFlagsWait flags.
topicPrintf style topic string.
...Topic args.
Returns:
Zero if successful.

void mqttSetMessageSize (Mqtt *mq, int size)

Set the maximum message size.

Description:
AWS supports a smaller maximum message size.
Parameters:
mqThe MQTT mq.
sizeThe maximum message size.
API Stability:
Evolving.

void mqttSetWill (Mqtt *mq, cchar *topic, cvoid *msg, ssize length)

Set the will and testament message.

Parameters:
mqThe MQTT mq.
topicWill message topic.
msgMessage to send.
lengthMessage size.
API Stability:
Evolving.

int mqttSubscribe (Mqtt *mq, MqttCallback callback, int maxQos, MqttWaitFlags waitFlags, cchar *topic, ...)

Subscribe to a topic.

Parameters:
mqMqtt object.
callbackFunction to invoke on receipt of messages.
maxQosMaximum quality of service message to receive.
waitFlagsWait flags.
topicPrintf style topic string.
...Topic args.
Returns:
Zero if successful.
API Stability:
Evolving.

int mqttUnsubscribe (Mqtt *mq, cchar *topic, MqttWaitFlags wait)

Unsubscribe from a topic.

Parameters:
mqThe MQTT mq.
topicThe name of the topic to unsubscribe from.
waitWait flags.
Returns:
Zero if successful.
API Stability:
Evolving.

Functions

void mqttFree (Mqtt *mq)

Free an Mqtt instance.

Parameters:
mqMqtt instance allocated via mqttAlloc

Typedefs

typedef void(* MqttCallback) (struct MqttRecv *resp).

Message receipt callback.

Parameters:
respMessage received structure.
API Stability:
Evolving.

typedef void(* MqttEventProc) (struct Mqtt *mq, int event).

MQTt event callback.

Parameters:
mqMqtt object created via mqttAlloc
eventEvent type, set to MQTT_EVENT_CONNECT, MQTT_EVENT_DISCONNECT or MQTT_EVENT_STOPPING.
API Stability:
Evolving.

Mqtt

MQTT instance.

API Stability:
Evolving.
Fields:
RBuf *buf I/O read buffer.
uintconnected Mqtt is currently connected flag.
interror Mqtt error flag.
char *errorMsg Mqtt error message.
MqttMsghead Head of message queue.
char *id Client ID.
intkeepAlive Keep alive duration in seconds.
time_tlastSend Time of last send in seconds.
intmask R library wait event mask.
intmaxMessage Maximum message size.
intmsgTimeout Message timeout for retransmit.
intnextId Next message ID.
MqttEventProcproc Notification event callback
RSocket *sock Underlying socket transport.
uintsubscribedApi Reserved.
intthrottle Throttle delay in msec.
TimethrottleLastPub Time of last publish or throttle.
TimethrottleMark Throttle sending until Time.
RList *topics List of subscribed topics.
char *willMsg Will and testament message.
ssizewillMsgSize Size of will message.
char *willTopic Will and testament topic.

MqttHdr

Fixed header of a packet.

API Stability:
Evolving.
Fields:
intflags Packet control flags.
intlength Size in of the variable portion after fixed header and packet length.

MqttMsg

Mqtt message.

API Stability:
Internal.
Fields:
uchar *buf External message text buffer for large messages.
uchar *end End of message.
uchar *endbuf End of message buffer.
RFiber *fiber Message fiber to process the message.
intid Message sequence ID.
ucharinlineBuf[MQTT_INLINE_BUF_SIZE] Inline message text buffer for small message efficiency.
struct MqttMsg *next Next message in the queue.
struct MqttMsg *prev Previous message in the queue.
intqos Message quality of service.
time_tsent Time in seconds the message was sent.
uchar *start Start of message.
MqttMsgStatestate Message send status.
MqttPacketTypetype Message packet type.
MqttWaitFlagswait Message wait for send flags.

MqttRecv

A struct used to deserialize/interpret an incoming packet from the broker.

API Stability:
Evolving.
Fields:
MqttConnCodecode Connection response code.
cuchar *codes Array of return codes for subscribed topics.
char *data Published message.
intdataSize Size of data.
uchardup Set to 0 on first attempt to send packet.
uinthasSession Connection using an existing session.
struct MqttHdrhdr MQTT message fixed header.
intid Message ID.
MqttTopic *matched Matched topic.
struct Mqtt *mq Message queue.
intnumCodes Size of codes.
ucharqos Quality of service.
ucharretain Message is retained.
char *topic Topic string.