MQTT 核心概念
MQTT(Message Queuing Telemetry Transport)是由 IBM 工程师 Andy Stanford-Clark 在 1999 年为监控石油管道传感器而设计的协议。它基于发布/订阅(Publish/Subscribe)模式,极其轻量(最小报文仅 2 字节),非常适合带宽有限、网络不稳定的 IoT 场景。
MQTT 发布/订阅模型:
传感器 ESP32 Broker(Mosquitto) 手机 App / 服务器
│ │ │
│── CONNECT ──────────►│ │
│── PUBLISH ──────────►│ topic: home/sensor/temp│
│ "25.6°C" │◄── SUBSCRIBE ───────────│
│ │ topic: home/sensor/+│
│ │─── PUBLISH ────────────►│
│ │ "25.6°C" │
│ │ │
控制器 ESP32 │ │
│◄─── SUBSCRIBE ───────│ topic: home/light/cmd │
│ topic: home/light/cmd │
│ │◄── PUBLISH ─────────────│
│ │ "ON" │
│◄─── PUBLISH ─────────│ │
│ "ON" │ │
关键:发布者和订阅者互不知晓对方存在,完全解耦!
核心术语词表
Broker(代理服务器)
所有消息的中转中心,负责接收发布、分发给订阅者。常用实现:Mosquitto(轻量开源)、EMQ X(企业级)、AWS IoT Core、阿里云IoT。
Topic(主题)
消息路由标识,类似文件路径层级,用 / 分隔。如 home/bedroom/temperature。订阅时支持通配符:+ 匹配单级(home/+/temperature),# 匹配多级(home/#)。
QoS(服务质量)
消息投递保证级别。QoS 0:最多一次(可能丢失);QoS 1:至少一次(可能重复);QoS 2:恰好一次(最可靠但开销最大)。IoT 中最常用 QoS 1。
Retain(保留消息)
Broker 保存该 Topic 最新的一条消息,新订阅者连接后立即收到,无需等待下次发布。适合设备状态、配置信息等场景。
LWT(遗嘱消息)
Last Will and Testament:客户端在 CONNECT 时预设的消息,当客户端异常断线时 Broker 自动发布该消息,通知其他订阅者设备已离线。
Client ID
每个 MQTT 客户端的唯一标识符,Broker 用它区分客户端。同一 Broker 下不得重复,重复连接会踢掉之前的连接。
QoS 机制对比
| QoS 级别 | 保证 | 握手流程 | 适用场景 |
|---|---|---|---|
| QoS 0(At most once) | 最多收到一次,可能丢失 | PUBLISH(单向发送) | 传感器数据频繁上报,偶发丢失可接受 |
| QoS 1(At least once) | 至少收到一次,可能重复 | PUBLISH → PUBACK | IoT 设备状态上报,最常用 |
| QoS 2(Exactly once) | 恰好收到一次 | PUBLISH → PUBREC → PUBREL → PUBCOMP(四次握手) | 支付指令、控制命令,不允许重复执行 |
Mosquitto Broker 部署
# Ubuntu / Debian 安装 Mosquitto
sudo apt-get install mosquitto mosquitto-clients
# 启动服务
sudo systemctl start mosquitto
sudo systemctl enable mosquitto
# 测试订阅(在终端1)
mosquitto_sub -h localhost -t "test/#" -v
# 测试发布(在终端2)
mosquitto_pub -h localhost -t "test/hello" -m "Hello ESP32!"
# 带认证的发布
mosquitto_pub -h localhost -p 1883 -u user -P pass \
-t "home/sensor/temp" -m "25.6" -q 1 -r
# /etc/mosquitto/mosquitto.conf 关键配置
# listener 1883
# allow_anonymous false
# password_file /etc/mosquitto/passwd
esp-mqtt 组件使用
ESP-IDF 内置 esp-mqtt 组件,基于 MQTT v3.1.1/v5.0,支持 TLS/TCP/WebSocket 传输,事件驱动设计。
#include "mqtt_client.h"
#include "esp_log.h"
static const char *TAG = "MQTT";
static esp_mqtt_client_handle_t mqtt_client;
static void mqtt_event_handler(void *arg, esp_event_base_t base,
int32_t event_id, void *event_data)
{
esp_mqtt_event_handle_t event = event_data;
switch ((esp_mqtt_event_id_t)event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT 已连接");
/* 订阅主题 */
esp_mqtt_client_subscribe(mqtt_client, "home/light/cmd", 1);
/* 发布上线消息 */
esp_mqtt_client_publish(mqtt_client,
"home/sensor/status", "online", 0, 1, 1); // retain=1
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "收到消息:");
ESP_LOGI(TAG, " Topic: %.*s", event->topic_len, event->topic);
ESP_LOGI(TAG, " Data: %.*s", event->data_len, event->data);
/* 解析命令 */
if (strncmp(event->data, "ON", event->data_len) == 0) {
gpio_set_level(GPIO_NUM_2, 1);
}
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGW(TAG, "MQTT 断开,将自动重连");
break;
case MQTT_EVENT_ERROR:
ESP_LOGE(TAG, "MQTT 错误");
break;
default: break;
}
}
void mqtt_app_start(void)
{
esp_mqtt_client_config_t mqtt_cfg = {
.broker = {
.address.uri = "mqtt://192.168.1.100:1883",
},
.credentials = {
.client_id = "esp32_sensor_01",
.username = "mqttuser",
.authentication.password = "mqttpass",
},
.session = {
.last_will = { // LWT 遗嘱消息
.topic = "home/sensor/status",
.msg = "offline",
.qos = 1,
.retain = 1,
},
.keepalive = 60, // 60 秒心跳
},
};
mqtt_client = esp_mqtt_client_init(&mqtt_cfg);
esp_mqtt_client_register_event(mqtt_client, ESP_EVENT_ANY_ID,
mqtt_event_handler, NULL);
esp_mqtt_client_start(mqtt_client);
}
/* 定期发布传感器数据 */
void publish_sensor_data(float temp, float hum)
{
char payload[64];
snprintf(payload, sizeof(payload),
"{\"temperature\":%.1f,\"humidity\":%.1f}", temp, hum);
esp_mqtt_client_publish(mqtt_client,
"home/sensor/data", payload, 0, 1, 0);
}
MQTT Topic 设计规范
推荐的 Topic 层级格式:{组织}/{位置}/{设备类型}/{设备ID}/{数据类型}
例如:acme/building1/esp32/device001/temperature
避免使用 # 做过于宽泛的订阅,会产生大量无关消息;避免使用空格和特殊字符;设备 ID 建议使用 MAC 地址。
JSON 消息序列化
MQTT 消息是纯二进制/文本,通常使用 JSON 格式封装结构化数据。ESP-IDF 提供轻量级的 cJSON 库:
#include "cJSON.h"
void publish_json(float temp, float hum, int battery_mv)
{
cJSON *root = cJSON_CreateObject();
cJSON_AddNumberToObject(root, "temperature", (double)temp);
cJSON_AddNumberToObject(root, "humidity", (double)hum);
cJSON_AddNumberToObject(root, "battery_mv", battery_mv);
cJSON_AddStringToObject(root, "device_id", "esp32_001");
char *json_str = cJSON_PrintUnformatted(root); // 不含缩进,节省字节
esp_mqtt_client_publish(mqtt_client, "home/sensor",
json_str, strlen(json_str), 1, 0);
cJSON_free(json_str);
cJSON_Delete(root);
}
void parse_command(const char *payload, int len)
{
cJSON *root = cJSON_ParseWithLength(payload, len);
if (!root) return;
cJSON *cmd = cJSON_GetObjectItem(root, "cmd");
if (cJSON_IsString(cmd)) {
ESP_LOGI("CMD", "收到命令: %s", cmd->valuestring);
}
cJSON_Delete(root);
}