MQTT Skill
MQTT 3.1.1 client for publish/subscribe messaging with cloud-side MQTT brokers.
Invoke with: /mqtt
Key APIs
c
/* Lifecycle */
Mqtt *mqttAlloc(cchar *clientId, MqttEventProc proc);
int mqttConnect(Mqtt *mq, RSocket *sock, int flags, MqttWaitFlags waitFlags);
int mqttDisconnect(Mqtt *mq);
void mqttFree(Mqtt *mq);
/* Publish */
int mqttPublish(Mqtt *mq, cvoid *msg, size_t size, int qos,
MqttWaitFlags waitFlags, cchar *topic, ...);
/* Subscribe */
int mqttSubscribe(Mqtt *mq, MqttCallback callback, int maxQos,
MqttWaitFlags waitFlags, cchar *topic, ...);
int mqttUnsubscribe(Mqtt *mq, cchar *topic, MqttWaitFlags wait);Wait Flags
MQTT_WAIT_NONE— Return immediately (fire and forget)MQTT_WAIT_SENT— Wait until the packet is written to the socketMQTT_WAIT_ACK— Wait for broker acknowledgment
Publishing Telemetry
c
PUBLIC int ioStart(void)
{
char *msg = sfmt("{\"temp\": 23.5, \"humidity\": 65}");
mqttPublish(ioto->mqtt, msg, 0, 1, MQTT_WAIT_NONE,
"/devices/%s/telemetry", ioto->product);
rFree(msg);
return 0;
}Subscribing to Commands
c
static void incoming(MqttRecv *rp)
{
char *msg = snclone(rp->data, rp->dataSize);
rTrace("mqtt", "Received from topic %s: %s", rp->topic, msg);
rFree(msg);
}
PUBLIC int ioStart(void)
{
mqttSubscribe(ioto->mqtt, (MqttCallback) incoming, 2, MQTT_WAIT_ACK,
"/devices/%s/commands", ioto->product);
return 0;
}
PUBLIC void ioStop(void)
{
mqttUnsubscribe(ioto->mqtt, "/devices/%s/commands", ioto->product);
}Master Subscriptions
Subscribe once at the protocol level, then route locally to multiple handlers:
c
mqttSubscribeMaster(ioto->mqtt, 1, MQTT_WAIT_ACK,
"devices/%s/#", ioto->product);
mqttSubscribe(ioto->mqtt, onTelemetry, 1, MQTT_WAIT_NONE,
"devices/%s/telemetry", ioto->product);
mqttSubscribe(ioto->mqtt, onCommands, 1, MQTT_WAIT_NONE,
"devices/%s/commands", ioto->product);Configuration
json5
{
services: { mqtt: true },
mqtt: {
authority: '@certs/aws.crt',
schedule: '* * * * *',
delay: 15,
},
}Important Notes
- MQTT uses
ioto->mqttwhen running inside the Ioto agent. - All MQTT calls are fiber-aware and yield while waiting for I/O.
- Always use TLS in production.
- Functions return
0on success,< 0on error.
