Chapter 04

MQTT 消息协议

IoT 领域最流行的轻量级消息协议,实现设备与云端的可靠双向通信

MQTT 核心概念

MQTT(Message Queuing Telemetry Transport)是由 IBM 工程师 Andy Stanford-Clark 在 1999 年为监控石油管道传感器而设计的协议。它基于发布/订阅(Publish/Subscribe)模式,极其轻量(最小报文仅 2 字节),非常适合带宽有限、网络不稳定的 IoT 场景。

MQTT 发布/订阅模型: 传感器 ESP32 Broker(Mosquitto) 手机 App / 服务器 │ │ │ │── CONNECT ──────────►│ │ │── PUBLISH ──────────►│ topic: home/sensor/temp│ │ "25.6°C" │◄── SUBSCRIBE ───────────│ │ │ topic: home/sensor/+│ │ │─── PUBLISH ────────────►│ │ │ "25.6°C" │ │ │ │ 控制器 ESP32 │ │ │◄─── SUBSCRIBE ───────│ topic: home/light/cmd │ │ topic: home/light/cmd │ │ │◄── PUBLISH ─────────────│ │ │ "ON" │ │◄─── PUBLISH ─────────│ │ │ "ON" │ │ 关键:发布者和订阅者互不知晓对方存在,完全解耦!

核心术语词表

Broker(代理服务器)
所有消息的中转中心,负责接收发布、分发给订阅者。常用实现:Mosquitto(轻量开源)、EMQ X(企业级)、AWS IoT Core、阿里云IoT。
Topic(主题)
消息路由标识,类似文件路径层级,用 / 分隔。如 home/bedroom/temperature。订阅时支持通配符:+ 匹配单级(home/+/temperature),# 匹配多级(home/#)。
QoS(服务质量)
消息投递保证级别。QoS 0:最多一次(可能丢失);QoS 1:至少一次(可能重复);QoS 2:恰好一次(最可靠但开销最大)。IoT 中最常用 QoS 1。
Retain(保留消息)
Broker 保存该 Topic 最新的一条消息,新订阅者连接后立即收到,无需等待下次发布。适合设备状态、配置信息等场景。
LWT(遗嘱消息)
Last Will and Testament:客户端在 CONNECT 时预设的消息,当客户端异常断线时 Broker 自动发布该消息,通知其他订阅者设备已离线。
Client ID
每个 MQTT 客户端的唯一标识符,Broker 用它区分客户端。同一 Broker 下不得重复,重复连接会踢掉之前的连接。

QoS 机制对比

QoS 级别保证握手流程适用场景
QoS 0(At most once) 最多收到一次,可能丢失 PUBLISH(单向发送) 传感器数据频繁上报,偶发丢失可接受
QoS 1(At least once) 至少收到一次,可能重复 PUBLISH → PUBACK IoT 设备状态上报,最常用
QoS 2(Exactly once) 恰好收到一次 PUBLISH → PUBREC → PUBREL → PUBCOMP(四次握手) 支付指令、控制命令,不允许重复执行

Mosquitto Broker 部署

# Ubuntu / Debian 安装 Mosquitto
sudo apt-get install mosquitto mosquitto-clients

# 启动服务
sudo systemctl start mosquitto
sudo systemctl enable mosquitto

# 测试订阅(在终端1)
mosquitto_sub -h localhost -t "test/#" -v

# 测试发布(在终端2)
mosquitto_pub -h localhost -t "test/hello" -m "Hello ESP32!"

# 带认证的发布
mosquitto_pub -h localhost -p 1883 -u user -P pass \
  -t "home/sensor/temp" -m "25.6" -q 1 -r

# /etc/mosquitto/mosquitto.conf 关键配置
# listener 1883
# allow_anonymous false
# password_file /etc/mosquitto/passwd

esp-mqtt 组件使用

ESP-IDF 内置 esp-mqtt 组件,基于 MQTT v3.1.1/v5.0,支持 TLS/TCP/WebSocket 传输,事件驱动设计。

#include "mqtt_client.h"
#include "esp_log.h"

static const char *TAG = "MQTT";
static esp_mqtt_client_handle_t mqtt_client;

static void mqtt_event_handler(void *arg, esp_event_base_t base,
                                int32_t event_id, void *event_data)
{
    esp_mqtt_event_handle_t event = event_data;
    switch ((esp_mqtt_event_id_t)event_id) {
        case MQTT_EVENT_CONNECTED:
            ESP_LOGI(TAG, "MQTT 已连接");
            /* 订阅主题 */
            esp_mqtt_client_subscribe(mqtt_client, "home/light/cmd", 1);
            /* 发布上线消息 */
            esp_mqtt_client_publish(mqtt_client,
                "home/sensor/status", "online", 0, 1, 1); // retain=1
            break;

        case MQTT_EVENT_DATA:
            ESP_LOGI(TAG, "收到消息:");
            ESP_LOGI(TAG, "  Topic: %.*s", event->topic_len, event->topic);
            ESP_LOGI(TAG, "  Data:  %.*s", event->data_len, event->data);
            /* 解析命令 */
            if (strncmp(event->data, "ON", event->data_len) == 0) {
                gpio_set_level(GPIO_NUM_2, 1);
            }
            break;

        case MQTT_EVENT_DISCONNECTED:
            ESP_LOGW(TAG, "MQTT 断开,将自动重连");
            break;

        case MQTT_EVENT_ERROR:
            ESP_LOGE(TAG, "MQTT 错误");
            break;

        default: break;
    }
}

void mqtt_app_start(void)
{
    esp_mqtt_client_config_t mqtt_cfg = {
        .broker = {
            .address.uri = "mqtt://192.168.1.100:1883",
        },
        .credentials = {
            .client_id = "esp32_sensor_01",
            .username  = "mqttuser",
            .authentication.password = "mqttpass",
        },
        .session = {
            .last_will = {              // LWT 遗嘱消息
                .topic  = "home/sensor/status",
                .msg    = "offline",
                .qos    = 1,
                .retain = 1,
            },
            .keepalive = 60,           // 60 秒心跳
        },
    };
    mqtt_client = esp_mqtt_client_init(&mqtt_cfg);
    esp_mqtt_client_register_event(mqtt_client, ESP_EVENT_ANY_ID,
                                    mqtt_event_handler, NULL);
    esp_mqtt_client_start(mqtt_client);
}

/* 定期发布传感器数据 */
void publish_sensor_data(float temp, float hum)
{
    char payload[64];
    snprintf(payload, sizeof(payload),
             "{\"temperature\":%.1f,\"humidity\":%.1f}", temp, hum);
    esp_mqtt_client_publish(mqtt_client,
        "home/sensor/data", payload, 0, 1, 0);
}
MQTT Topic 设计规范

推荐的 Topic 层级格式:{组织}/{位置}/{设备类型}/{设备ID}/{数据类型}
例如:acme/building1/esp32/device001/temperature
避免使用 # 做过于宽泛的订阅,会产生大量无关消息;避免使用空格和特殊字符;设备 ID 建议使用 MAC 地址。

MQTT over TLS(加密通信)

生产环境中的 MQTT 必须启用 TLS 加密,防止数据被窃听和中间人攻击。ESP-IDF 的 esp-mqtt 组件内置 mbedTLS,配置简单:

#include "mqtt_client.h"

/* CA 证书(Mosquitto 服务器证书的根证书)
 * 可以用 openssl s_client -connect broker:8883 来获取 */
extern const uint8_t ca_cert_pem_start[] asm("_binary_ca_cert_pem_start");
extern const uint8_t ca_cert_pem_end[]   asm("_binary_ca_cert_pem_end");

void mqtt_tls_start(void)
{
    esp_mqtt_client_config_t cfg = {
        .broker = {
            /* mqtts:// 表示 MQTT over TLS,端口 8883 */
            .address.uri      = "mqtts://192.168.1.100:8883",
            .verification.certificate = (const char*)ca_cert_pem_start,
        },
        .credentials = {
            .client_id = "esp32_001",
            .username  = "user",
            .authentication.password = "pass",
        },
    };
    esp_mqtt_client_handle_t client = esp_mqtt_client_init(&cfg);
    esp_mqtt_client_start(client);
}

/* 双向 TLS(mTLS):设备也提供客户端证书
 * 适合高安全场景(工业控制、医疗等)*/
extern const uint8_t client_cert_start[] asm("_binary_client_cert_pem_start");
extern const uint8_t client_key_start[]  asm("_binary_client_key_pem_start");

esp_mqtt_client_config_t mtls_cfg = {
    .broker.address.uri = "mqtts://secure-broker:8883",
    .broker.verification.certificate = (const char*)ca_cert_pem_start,
    .credentials.authentication = {
        .certificate = (const char*)client_cert_start,  /* 设备证书 */
        .key         = (const char*)client_key_start,   /* 设备私钥 */
    },
};
证书文件嵌入方式

CA 证书等文件通过 CMake 的 target_add_binary_data 嵌入固件:
target_add_binary_data(app.elf "certs/ca.pem" TEXT)
嵌入后可通过 _binary_ca_pem_start / _binary_ca_pem_end 访问。注意证书必须是 PEM 格式(Base64 文本),不能是 DER 格式(二进制)。证书到期后需要重新刷固件,建议结合 OTA 做证书轮换。

JSON 消息序列化

MQTT 消息是纯二进制/文本,通常使用 JSON 格式封装结构化数据。ESP-IDF 提供轻量级的 cJSON 库:

#include "cJSON.h"

void publish_json(float temp, float hum, int battery_mv)
{
    cJSON *root = cJSON_CreateObject();
    cJSON_AddNumberToObject(root, "temperature", (double)temp);
    cJSON_AddNumberToObject(root, "humidity", (double)hum);
    cJSON_AddNumberToObject(root, "battery_mv", battery_mv);
    cJSON_AddStringToObject(root, "device_id", "esp32_001");

    char *json_str = cJSON_PrintUnformatted(root);  // 不含缩进,节省字节
    esp_mqtt_client_publish(mqtt_client, "home/sensor",
                             json_str, strlen(json_str), 1, 0);

    cJSON_free(json_str);
    cJSON_Delete(root);
}

void parse_command(const char *payload, int len)
{
    cJSON *root = cJSON_ParseWithLength(payload, len);
    if (!root) return;

    cJSON *cmd = cJSON_GetObjectItem(root, "cmd");
    if (cJSON_IsString(cmd)) {
        ESP_LOGI("CMD", "收到命令: %s", cmd->valuestring);
    }
    cJSON_Delete(root);
}

MQTT 断线重连与离线消息缓存

IoT 设备网络不稳定,需要处理断线重连和离线期间数据缓存:

/* esp-mqtt 内置自动重连,可通过配置控制重连间隔 */
esp_mqtt_client_config_t cfg = {
    .broker.address.uri = "mqtt://broker:1883",
    .network = {
        .reconnect_timeout_ms   = 5000,   /* 5秒后重连 */
        .timeout_ms             = 10000,  /* 连接超时 */
    },
    .session = {
        .keepalive = 30,    /* 30秒心跳(Broker 超时 = 1.5倍 keepalive)*/
        /* clean_session=false:断线期间 Broker 保留订阅和 QoS1/2 消息 */
        .disable_clean_session = 1,
    },
};

/* 离线数据缓存:断线期间保存到 NVS,重连后批量上报 */
#define MAX_OFFLINE_MSGS  50
typedef struct {
    float temp;
    float humi;
    int64_t timestamp;
} SensorRecord_t;

static SensorRecord_t offline_cache[MAX_OFFLINE_MSGS];
static int cache_count = 0;

void save_to_cache(float t, float h) {
    if (cache_count < MAX_OFFLINE_MSGS) {
        offline_cache[cache_count++] = (SensorRecord_t){t, h, time(NULL)};
    }
}

void flush_cache_on_reconnect(void) {
    for (int i = 0; i < cache_count; i++) {
        char buf[80];
        snprintf(buf, sizeof(buf),
            "{\"t\":%.1f,\"h\":%.1f,\"ts\":%lld}",
            offline_cache[i].temp,
            offline_cache[i].humi,
            offline_cache[i].timestamp);
        esp_mqtt_client_publish(mqtt_client, "sensor/batch", buf, 0, 1, 0);
        vTaskDelay(pdMS_TO_TICKS(100));  /* 避免发送过快 */
    }
    cache_count = 0;
    ESP_LOGI("MQTT", "离线缓存已上报 %d 条", cache_count);
}
本章小结

MQTT 是 IoT 通信的首选协议,核心机制是发布/订阅解耦模型。关键要点:QoS 0 适合高频传感器数据,QoS 1 适合状态上报,QoS 2 适合控制指令;Retain 让新订阅者立刻获得当前状态;LWT 自动通知其他方设备离线。ESP-IDF 的 esp-mqtt 组件支持自动重连。生产环境必须启用 TLS(mqtts://,端口 8883),通过 target_add_binary_data 将 CA 证书嵌入固件。网络不稳定场景建议实现离线数据缓存,重连后批量上报。