MQTT 核心概念
MQTT(Message Queuing Telemetry Transport)是由 IBM 工程师 Andy Stanford-Clark 在 1999 年为监控石油管道传感器而设计的协议。它基于发布/订阅(Publish/Subscribe)模式,极其轻量(最小报文仅 2 字节),非常适合带宽有限、网络不稳定的 IoT 场景。
核心术语词表
QoS 机制对比
| QoS 级别 | 保证 | 握手流程 | 适用场景 |
|---|---|---|---|
| QoS 0(At most once) | 最多收到一次,可能丢失 | PUBLISH(单向发送) | 传感器数据频繁上报,偶发丢失可接受 |
| QoS 1(At least once) | 至少收到一次,可能重复 | PUBLISH → PUBACK | IoT 设备状态上报,最常用 |
| QoS 2(Exactly once) | 恰好收到一次 | PUBLISH → PUBREC → PUBREL → PUBCOMP(四次握手) | 支付指令、控制命令,不允许重复执行 |
Mosquitto Broker 部署
# Ubuntu / Debian 安装 Mosquitto
sudo apt-get install mosquitto mosquitto-clients
# 启动服务
sudo systemctl start mosquitto
sudo systemctl enable mosquitto
# 测试订阅(在终端1)
mosquitto_sub -h localhost -t "test/#" -v
# 测试发布(在终端2)
mosquitto_pub -h localhost -t "test/hello" -m "Hello ESP32!"
# 带认证的发布
mosquitto_pub -h localhost -p 1883 -u user -P pass \
-t "home/sensor/temp" -m "25.6" -q 1 -r
# /etc/mosquitto/mosquitto.conf 关键配置
# listener 1883
# allow_anonymous false
# password_file /etc/mosquitto/passwd
esp-mqtt 组件使用
ESP-IDF 内置 esp-mqtt 组件,基于 MQTT v3.1.1/v5.0,支持 TLS/TCP/WebSocket 传输,事件驱动设计。
#include "mqtt_client.h"
#include "esp_log.h"
static const char *TAG = "MQTT";
static esp_mqtt_client_handle_t mqtt_client;
static void mqtt_event_handler(void *arg, esp_event_base_t base,
int32_t event_id, void *event_data)
{
esp_mqtt_event_handle_t event = event_data;
switch ((esp_mqtt_event_id_t)event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT 已连接");
/* 订阅主题 */
esp_mqtt_client_subscribe(mqtt_client, "home/light/cmd", 1);
/* 发布上线消息 */
esp_mqtt_client_publish(mqtt_client,
"home/sensor/status", "online", 0, 1, 1); // retain=1
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "收到消息:");
ESP_LOGI(TAG, " Topic: %.*s", event->topic_len, event->topic);
ESP_LOGI(TAG, " Data: %.*s", event->data_len, event->data);
/* 解析命令 */
if (strncmp(event->data, "ON", event->data_len) == 0) {
gpio_set_level(GPIO_NUM_2, 1);
}
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGW(TAG, "MQTT 断开,将自动重连");
break;
case MQTT_EVENT_ERROR:
ESP_LOGE(TAG, "MQTT 错误");
break;
default: break;
}
}
void mqtt_app_start(void)
{
esp_mqtt_client_config_t mqtt_cfg = {
.broker = {
.address.uri = "mqtt://192.168.1.100:1883",
},
.credentials = {
.client_id = "esp32_sensor_01",
.username = "mqttuser",
.authentication.password = "mqttpass",
},
.session = {
.last_will = { // LWT 遗嘱消息
.topic = "home/sensor/status",
.msg = "offline",
.qos = 1,
.retain = 1,
},
.keepalive = 60, // 60 秒心跳
},
};
mqtt_client = esp_mqtt_client_init(&mqtt_cfg);
esp_mqtt_client_register_event(mqtt_client, ESP_EVENT_ANY_ID,
mqtt_event_handler, NULL);
esp_mqtt_client_start(mqtt_client);
}
/* 定期发布传感器数据 */
void publish_sensor_data(float temp, float hum)
{
char payload[64];
snprintf(payload, sizeof(payload),
"{\"temperature\":%.1f,\"humidity\":%.1f}", temp, hum);
esp_mqtt_client_publish(mqtt_client,
"home/sensor/data", payload, 0, 1, 0);
}
推荐的 Topic 层级格式:{组织}/{位置}/{设备类型}/{设备ID}/{数据类型}
例如:acme/building1/esp32/device001/temperature
避免使用 # 做过于宽泛的订阅,会产生大量无关消息;避免使用空格和特殊字符;设备 ID 建议使用 MAC 地址。
MQTT over TLS(加密通信)
生产环境中的 MQTT 必须启用 TLS 加密,防止数据被窃听和中间人攻击。ESP-IDF 的 esp-mqtt 组件内置 mbedTLS,配置简单:
#include "mqtt_client.h"
/* CA 证书(Mosquitto 服务器证书的根证书)
* 可以用 openssl s_client -connect broker:8883 来获取 */
extern const uint8_t ca_cert_pem_start[] asm("_binary_ca_cert_pem_start");
extern const uint8_t ca_cert_pem_end[] asm("_binary_ca_cert_pem_end");
void mqtt_tls_start(void)
{
esp_mqtt_client_config_t cfg = {
.broker = {
/* mqtts:// 表示 MQTT over TLS,端口 8883 */
.address.uri = "mqtts://192.168.1.100:8883",
.verification.certificate = (const char*)ca_cert_pem_start,
},
.credentials = {
.client_id = "esp32_001",
.username = "user",
.authentication.password = "pass",
},
};
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&cfg);
esp_mqtt_client_start(client);
}
/* 双向 TLS(mTLS):设备也提供客户端证书
* 适合高安全场景(工业控制、医疗等)*/
extern const uint8_t client_cert_start[] asm("_binary_client_cert_pem_start");
extern const uint8_t client_key_start[] asm("_binary_client_key_pem_start");
esp_mqtt_client_config_t mtls_cfg = {
.broker.address.uri = "mqtts://secure-broker:8883",
.broker.verification.certificate = (const char*)ca_cert_pem_start,
.credentials.authentication = {
.certificate = (const char*)client_cert_start, /* 设备证书 */
.key = (const char*)client_key_start, /* 设备私钥 */
},
};
CA 证书等文件通过 CMake 的 target_add_binary_data 嵌入固件:
target_add_binary_data(app.elf "certs/ca.pem" TEXT)
嵌入后可通过 _binary_ca_pem_start / _binary_ca_pem_end 访问。注意证书必须是 PEM 格式(Base64 文本),不能是 DER 格式(二进制)。证书到期后需要重新刷固件,建议结合 OTA 做证书轮换。
JSON 消息序列化
MQTT 消息是纯二进制/文本,通常使用 JSON 格式封装结构化数据。ESP-IDF 提供轻量级的 cJSON 库:
#include "cJSON.h"
void publish_json(float temp, float hum, int battery_mv)
{
cJSON *root = cJSON_CreateObject();
cJSON_AddNumberToObject(root, "temperature", (double)temp);
cJSON_AddNumberToObject(root, "humidity", (double)hum);
cJSON_AddNumberToObject(root, "battery_mv", battery_mv);
cJSON_AddStringToObject(root, "device_id", "esp32_001");
char *json_str = cJSON_PrintUnformatted(root); // 不含缩进,节省字节
esp_mqtt_client_publish(mqtt_client, "home/sensor",
json_str, strlen(json_str), 1, 0);
cJSON_free(json_str);
cJSON_Delete(root);
}
void parse_command(const char *payload, int len)
{
cJSON *root = cJSON_ParseWithLength(payload, len);
if (!root) return;
cJSON *cmd = cJSON_GetObjectItem(root, "cmd");
if (cJSON_IsString(cmd)) {
ESP_LOGI("CMD", "收到命令: %s", cmd->valuestring);
}
cJSON_Delete(root);
}
MQTT 断线重连与离线消息缓存
IoT 设备网络不稳定,需要处理断线重连和离线期间数据缓存:
/* esp-mqtt 内置自动重连,可通过配置控制重连间隔 */
esp_mqtt_client_config_t cfg = {
.broker.address.uri = "mqtt://broker:1883",
.network = {
.reconnect_timeout_ms = 5000, /* 5秒后重连 */
.timeout_ms = 10000, /* 连接超时 */
},
.session = {
.keepalive = 30, /* 30秒心跳(Broker 超时 = 1.5倍 keepalive)*/
/* clean_session=false:断线期间 Broker 保留订阅和 QoS1/2 消息 */
.disable_clean_session = 1,
},
};
/* 离线数据缓存:断线期间保存到 NVS,重连后批量上报 */
#define MAX_OFFLINE_MSGS 50
typedef struct {
float temp;
float humi;
int64_t timestamp;
} SensorRecord_t;
static SensorRecord_t offline_cache[MAX_OFFLINE_MSGS];
static int cache_count = 0;
void save_to_cache(float t, float h) {
if (cache_count < MAX_OFFLINE_MSGS) {
offline_cache[cache_count++] = (SensorRecord_t){t, h, time(NULL)};
}
}
void flush_cache_on_reconnect(void) {
for (int i = 0; i < cache_count; i++) {
char buf[80];
snprintf(buf, sizeof(buf),
"{\"t\":%.1f,\"h\":%.1f,\"ts\":%lld}",
offline_cache[i].temp,
offline_cache[i].humi,
offline_cache[i].timestamp);
esp_mqtt_client_publish(mqtt_client, "sensor/batch", buf, 0, 1, 0);
vTaskDelay(pdMS_TO_TICKS(100)); /* 避免发送过快 */
}
cache_count = 0;
ESP_LOGI("MQTT", "离线缓存已上报 %d 条", cache_count);
}
MQTT 是 IoT 通信的首选协议,核心机制是发布/订阅解耦模型。关键要点:QoS 0 适合高频传感器数据,QoS 1 适合状态上报,QoS 2 适合控制指令;Retain 让新订阅者立刻获得当前状态;LWT 自动通知其他方设备离线。ESP-IDF 的 esp-mqtt 组件支持自动重连。生产环境必须启用 TLS(mqtts://,端口 8883),通过 target_add_binary_data 将 CA 证书嵌入固件。网络不稳定场景建议实现离线数据缓存,重连后批量上报。