Chapter 04

MQTT 消息协议

IoT 领域最流行的轻量级消息协议,实现设备与云端的可靠双向通信

MQTT 核心概念

MQTT(Message Queuing Telemetry Transport)是由 IBM 工程师 Andy Stanford-Clark 在 1999 年为监控石油管道传感器而设计的协议。它基于发布/订阅(Publish/Subscribe)模式,极其轻量(最小报文仅 2 字节),非常适合带宽有限、网络不稳定的 IoT 场景。

MQTT 发布/订阅模型: 传感器 ESP32 Broker(Mosquitto) 手机 App / 服务器 │ │ │ │── CONNECT ──────────►│ │ │── PUBLISH ──────────►│ topic: home/sensor/temp│ │ "25.6°C" │◄── SUBSCRIBE ───────────│ │ │ topic: home/sensor/+│ │ │─── PUBLISH ────────────►│ │ │ "25.6°C" │ │ │ │ 控制器 ESP32 │ │ │◄─── SUBSCRIBE ───────│ topic: home/light/cmd │ │ topic: home/light/cmd │ │ │◄── PUBLISH ─────────────│ │ │ "ON" │ │◄─── PUBLISH ─────────│ │ │ "ON" │ │ 关键:发布者和订阅者互不知晓对方存在,完全解耦!

核心术语词表

Broker(代理服务器)
所有消息的中转中心,负责接收发布、分发给订阅者。常用实现:Mosquitto(轻量开源)、EMQ X(企业级)、AWS IoT Core、阿里云IoT。
Topic(主题)
消息路由标识,类似文件路径层级,用 / 分隔。如 home/bedroom/temperature。订阅时支持通配符:+ 匹配单级(home/+/temperature),# 匹配多级(home/#)。
QoS(服务质量)
消息投递保证级别。QoS 0:最多一次(可能丢失);QoS 1:至少一次(可能重复);QoS 2:恰好一次(最可靠但开销最大)。IoT 中最常用 QoS 1。
Retain(保留消息)
Broker 保存该 Topic 最新的一条消息,新订阅者连接后立即收到,无需等待下次发布。适合设备状态、配置信息等场景。
LWT(遗嘱消息)
Last Will and Testament:客户端在 CONNECT 时预设的消息,当客户端异常断线时 Broker 自动发布该消息,通知其他订阅者设备已离线。
Client ID
每个 MQTT 客户端的唯一标识符,Broker 用它区分客户端。同一 Broker 下不得重复,重复连接会踢掉之前的连接。

QoS 机制对比

QoS 级别保证握手流程适用场景
QoS 0(At most once) 最多收到一次,可能丢失 PUBLISH(单向发送) 传感器数据频繁上报,偶发丢失可接受
QoS 1(At least once) 至少收到一次,可能重复 PUBLISH → PUBACK IoT 设备状态上报,最常用
QoS 2(Exactly once) 恰好收到一次 PUBLISH → PUBREC → PUBREL → PUBCOMP(四次握手) 支付指令、控制命令,不允许重复执行

Mosquitto Broker 部署

# Ubuntu / Debian 安装 Mosquitto
sudo apt-get install mosquitto mosquitto-clients

# 启动服务
sudo systemctl start mosquitto
sudo systemctl enable mosquitto

# 测试订阅(在终端1)
mosquitto_sub -h localhost -t "test/#" -v

# 测试发布(在终端2)
mosquitto_pub -h localhost -t "test/hello" -m "Hello ESP32!"

# 带认证的发布
mosquitto_pub -h localhost -p 1883 -u user -P pass \
  -t "home/sensor/temp" -m "25.6" -q 1 -r

# /etc/mosquitto/mosquitto.conf 关键配置
# listener 1883
# allow_anonymous false
# password_file /etc/mosquitto/passwd

esp-mqtt 组件使用

ESP-IDF 内置 esp-mqtt 组件,基于 MQTT v3.1.1/v5.0,支持 TLS/TCP/WebSocket 传输,事件驱动设计。

#include "mqtt_client.h"
#include "esp_log.h"

static const char *TAG = "MQTT";
static esp_mqtt_client_handle_t mqtt_client;

static void mqtt_event_handler(void *arg, esp_event_base_t base,
                                int32_t event_id, void *event_data)
{
    esp_mqtt_event_handle_t event = event_data;
    switch ((esp_mqtt_event_id_t)event_id) {
        case MQTT_EVENT_CONNECTED:
            ESP_LOGI(TAG, "MQTT 已连接");
            /* 订阅主题 */
            esp_mqtt_client_subscribe(mqtt_client, "home/light/cmd", 1);
            /* 发布上线消息 */
            esp_mqtt_client_publish(mqtt_client,
                "home/sensor/status", "online", 0, 1, 1); // retain=1
            break;

        case MQTT_EVENT_DATA:
            ESP_LOGI(TAG, "收到消息:");
            ESP_LOGI(TAG, "  Topic: %.*s", event->topic_len, event->topic);
            ESP_LOGI(TAG, "  Data:  %.*s", event->data_len, event->data);
            /* 解析命令 */
            if (strncmp(event->data, "ON", event->data_len) == 0) {
                gpio_set_level(GPIO_NUM_2, 1);
            }
            break;

        case MQTT_EVENT_DISCONNECTED:
            ESP_LOGW(TAG, "MQTT 断开,将自动重连");
            break;

        case MQTT_EVENT_ERROR:
            ESP_LOGE(TAG, "MQTT 错误");
            break;

        default: break;
    }
}

void mqtt_app_start(void)
{
    esp_mqtt_client_config_t mqtt_cfg = {
        .broker = {
            .address.uri = "mqtt://192.168.1.100:1883",
        },
        .credentials = {
            .client_id = "esp32_sensor_01",
            .username  = "mqttuser",
            .authentication.password = "mqttpass",
        },
        .session = {
            .last_will = {              // LWT 遗嘱消息
                .topic  = "home/sensor/status",
                .msg    = "offline",
                .qos    = 1,
                .retain = 1,
            },
            .keepalive = 60,           // 60 秒心跳
        },
    };
    mqtt_client = esp_mqtt_client_init(&mqtt_cfg);
    esp_mqtt_client_register_event(mqtt_client, ESP_EVENT_ANY_ID,
                                    mqtt_event_handler, NULL);
    esp_mqtt_client_start(mqtt_client);
}

/* 定期发布传感器数据 */
void publish_sensor_data(float temp, float hum)
{
    char payload[64];
    snprintf(payload, sizeof(payload),
             "{\"temperature\":%.1f,\"humidity\":%.1f}", temp, hum);
    esp_mqtt_client_publish(mqtt_client,
        "home/sensor/data", payload, 0, 1, 0);
}
MQTT Topic 设计规范

推荐的 Topic 层级格式:{组织}/{位置}/{设备类型}/{设备ID}/{数据类型}
例如:acme/building1/esp32/device001/temperature
避免使用 # 做过于宽泛的订阅,会产生大量无关消息;避免使用空格和特殊字符;设备 ID 建议使用 MAC 地址。

JSON 消息序列化

MQTT 消息是纯二进制/文本,通常使用 JSON 格式封装结构化数据。ESP-IDF 提供轻量级的 cJSON 库:

#include "cJSON.h"

void publish_json(float temp, float hum, int battery_mv)
{
    cJSON *root = cJSON_CreateObject();
    cJSON_AddNumberToObject(root, "temperature", (double)temp);
    cJSON_AddNumberToObject(root, "humidity", (double)hum);
    cJSON_AddNumberToObject(root, "battery_mv", battery_mv);
    cJSON_AddStringToObject(root, "device_id", "esp32_001");

    char *json_str = cJSON_PrintUnformatted(root);  // 不含缩进,节省字节
    esp_mqtt_client_publish(mqtt_client, "home/sensor",
                             json_str, strlen(json_str), 1, 0);

    cJSON_free(json_str);
    cJSON_Delete(root);
}

void parse_command(const char *payload, int len)
{
    cJSON *root = cJSON_ParseWithLength(payload, len);
    if (!root) return;

    cJSON *cmd = cJSON_GetObjectItem(root, "cmd");
    if (cJSON_IsString(cmd)) {
        ESP_LOGI("CMD", "收到命令: %s", cmd->valuestring);
    }
    cJSON_Delete(root);
}