Skip to content

MQTT Skill

MQTT 3.1.1 client for publish/subscribe messaging with cloud-side MQTT brokers.

Invoke with: /mqtt

Key APIs

c
/* Lifecycle */
Mqtt *mqttAlloc(cchar *clientId, MqttEventProc proc);
int mqttConnect(Mqtt *mq, RSocket *sock, int flags, MqttWaitFlags waitFlags);
int mqttDisconnect(Mqtt *mq);
void mqttFree(Mqtt *mq);

/* Publish */
int mqttPublish(Mqtt *mq, cvoid *msg, size_t size, int qos,
    MqttWaitFlags waitFlags, cchar *topic, ...);

/* Subscribe */
int mqttSubscribe(Mqtt *mq, MqttCallback callback, int maxQos,
    MqttWaitFlags waitFlags, cchar *topic, ...);
int mqttUnsubscribe(Mqtt *mq, cchar *topic, MqttWaitFlags wait);

Wait Flags

  • MQTT_WAIT_NONE — Return immediately (fire and forget)
  • MQTT_WAIT_SENT — Wait until the packet is written to the socket
  • MQTT_WAIT_ACK — Wait for broker acknowledgment

Publishing Telemetry

c
PUBLIC int ioStart(void)
{
    char *msg = sfmt("{\"temp\": 23.5, \"humidity\": 65}");
    mqttPublish(ioto->mqtt, msg, 0, 1, MQTT_WAIT_NONE,
        "/devices/%s/telemetry", ioto->product);
    rFree(msg);
    return 0;
}

Subscribing to Commands

c
static void incoming(MqttRecv *rp)
{
    char *msg = snclone(rp->data, rp->dataSize);
    rTrace("mqtt", "Received from topic %s: %s", rp->topic, msg);
    rFree(msg);
}

PUBLIC int ioStart(void)
{
    mqttSubscribe(ioto->mqtt, (MqttCallback) incoming, 2, MQTT_WAIT_ACK,
        "/devices/%s/commands", ioto->product);
    return 0;
}

PUBLIC void ioStop(void)
{
    mqttUnsubscribe(ioto->mqtt, "/devices/%s/commands", ioto->product);
}

Master Subscriptions

Subscribe once at the protocol level, then route locally to multiple handlers:

c
mqttSubscribeMaster(ioto->mqtt, 1, MQTT_WAIT_ACK,
    "devices/%s/#", ioto->product);

mqttSubscribe(ioto->mqtt, onTelemetry, 1, MQTT_WAIT_NONE,
    "devices/%s/telemetry", ioto->product);
mqttSubscribe(ioto->mqtt, onCommands, 1, MQTT_WAIT_NONE,
    "devices/%s/commands", ioto->product);

Configuration

json5
{
    services: { mqtt: true },
    mqtt: {
        authority: '@certs/aws.crt',
        schedule: '* * * * *',
        delay: 15,
    },
}

Important Notes

  • MQTT uses ioto->mqtt when running inside the Ioto agent.
  • All MQTT calls are fiber-aware and yield while waiting for I/O.
  • Always use TLS in production.
  • Functions return 0 on success, < 0 on error.