Skip to content

MQTT

MQTT is a messaging protocol for (IoT). It is designed as a lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth.

The Ioto MQTT support is an efficient MQTT client that is ideal for connecting to the cloud.

MQTT Features

  • MQTT 3.1.1 support
  • Connect, publish, subscribe, ping and disconnect messages.
  • TLS encryption with ALPN over port 443 to get through firewalls.
  • Message quality of service for reliable delivery.
  • Retained messages.
  • High message throughput with exceptionally low overhead.
  • Wait for delivery or acknowledgement options.
  • Resilient reconnect on network interruptions.
  • Automatic configuration after cloud provisioning.
  • Parallelism via fiber coroutines. No ugly callbacks or complex threads.
  • Tiny footprint of 8K code.

MQTT Configuration

The Ioto MQTT connection is opened automatically based on the config/ioto.json5 configuration settings:

mqtt: {
    authority: 'certs/root.crt',
    client: 'my-device-client-id',
    endpoint: 'https://example.com',
    alpn: 'x-amzn-mqtt-ca',
    port: 443,
    timeout: '30secs',
},
tls: {
    certificate: 'certs/mqtt.crt',
    key: 'certs/mqtt.key',
}

These properties describe the MQTT endpoint, port, certificates and keys to use to establish communications with an MQTT broker.

When using a device cloud, your certificate, key and endpoint are automatically provisioned from the Builder. These are saved in the config/provision.json5 and override the settings of the mqtt property collection.

API Quick Tour

When using cloud-based management, Ioto will typically connect to the cloud once the device is claimed. The MQTT endpoint will be made available via:

ioto->mqtt

This means you do not have to explicitly connect to the AWS IoT Core MQTT broker.

However, you can connect to any MQTT endpoint at any time via mqttConnect if you wish.

1
2
3
Mqtt *mq = mqttAlloc("my-client-id", onEvents);

mqttConnect(Mqtt *mq, RSocket *sock, int flags, MqttWaitFlags waitFlags);
In this case, you create and connect the Socket "sock" using the Safe runtime socket APIs before connecting with MQTT.

On-Demand Connections

Ioto will transparently connect and disconnect from the device cloud as required to minimize idle MQTT connections. The mqtt.timeout property defines the maximum idle time before Ioto will disconnect the MQTT connection.

If you require a permanent MQTT connection, set the timeout to "forever". It is best practice to NOT use permanent connections wherever possible.

Publishing Messages

Publishing messages is done via the mqttPublish API:

Ioto creates a MQTT connection to the device cloud on startup. This connection is referenced via ioto->mqtt.

mqttPublish(ioto->mqtt, "Initialized", 0, MQTT_QOS_1, MQTT_WAIT_NONE, "myDevice/init");

This will publish an "initialized" message with quality of service (1) which means "send at least once". The message will be published on the "myDevice/init" topic and this API call will not block for sending or acknowledgement of the message. The sending will happen in the background with any required re-transmissions.

Subscribing for Messages

You can subscribe to receive incoming messages on a topic via the mqttSubscribe API:

mqttSubscribe(ioto->mqtt, incoming, MQTT_QOS_1, MQTT_WAIT_NONE, "myDevice/change");

When messages are received on the "myDevice/change" topic, the function incoming will be invoked with the message.

The incoming callback will be passed the message response packet.

1
2
3
4
static void incoming(MqttResp *rp)
{
    printf(stdout, "Received from topic %s: %s", rp->topic, rp->data);
}

The message topic will be a null terminated string in rp->topic.

The message data will be passed in rp->data and the size of the data will be defined in rp->dataSize. The data will always be null terminated which is useful when passing message strings.

Unsubscribing

To unsubscribe from a topic, use the mqttUnsubscribe API:

mqttUnsubscribe(ioto->mqtt, "myDevice/unsub", MQTT_WAIT_NONE);

Waiting for Completion

The MQTT APIs can take a MQTT_WAIT argument that indicates if the API call should wait or not for completion.

With the MQTT_WAIT_NONE flag value, the API call will not wait and transmission will happen in the background. The MQTT_WAIT_SENT flag will cause the API to wait until the message has been fully sent from the client, i.e. it has been fully transmitted over the network. The MQTT_WAIT_ACK will cause the API to wait until an acknowledgement message has been received from the peer.

Note

Don't confuse this with MQTT quality of service levels which define whether a message is reliably delivered or not. In contrast, the WAIT flag determines how the API itself should block and wait.

Parallelism

The MQTT API uses the Ioto underlying Fiber Coroutine support to implement parallelism. When an MQTT call needs to wait for an acknowledgement or for network I/O, it will resume other fibers in the application transparently. Ioto is single threaded, but can run any number of fibers simultaneously without needing any locks. See Fiber Coroutines for more details.

Responding to Events

When Ioto connects to AWS IoT core, it will issue a "mqtt:connect" event and when disconnected it will issue a "mqtt:disconnect" event.

These events can be monitored by calling the rWatch API. For example:

1
2
3
4
5
6
static void disconnected(cvoid *data, cvoid *arg)
{
    printf("MQTT disconnected\n");
}

rWatch("mqtt:disconnect", disconnected, NULL);

The data argument is the value provided as the last parameter to rWatch. The arg argument is the argument provided when rSignal was called by Ioto, which in this case will be set to NUL.

Ioto Key/Value Store

Many devices maintain simple device settings that can be sent to the cloud and stored in a key/value store.

An Ioto device cloud provides such a key/value store in the device database called the Store. Values can easily be written and read to/from the Store using the ioSet APIs.

iotSet("model", "Acme Rocket");
iotSetNum("cpu", 55);

These values can be displayed in the Device Manager UI and can be retrieved in the device via the ioGet APIs.

char *model = iotSet("model");
double cpu = iotSetNum("cpu");

These APIs are simple wrappers over the mqttPublish and mqttRequest APIs.

Creating Metrics

When you publish data to the Store, you can create a metric for the published data. This metric can then be displayed using graphical widgets in the Device Manager.

To enable metrics for your data, define your metrics under the Store processing definition in the Device schema and upload that schema to your manager.

Note: this is not available when using the Eval cloud.

For example:

{
    Store: {
        enable: 'both',
        sync: 'up',
        notify: 'default',
        metrics: [
            {
                namespace: 'Embedthis/Device',
                fields: [{CPU: 'value'}],
                where: 'key == "cpu"',
                dimensions: [{Device: 'deviceId'}],
                buffer: {count: 5, elapsed: 10},
            },
        ],
    },
}

See Database Metrics for details.

See the MQTT API for more details.

Other Useful Routines

Use mqttIsConnected to test if the MQTT handle is currently connected to the MQTT broker.

References

MQTT Configuration Properties MQTT 3.1.1 spec