MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅(Publish/Subscribe)模式的轻量级通讯协议,非常适合物联网(IoT)和嵌入式设备。
“移植”通常指的是将 MQTT 客户端库(Client Library)适配到特定的硬件平台(如 STM32, ESP32)或操作系统(如 FreeRTOS, Linux, 或裸机)上。
以下是关于 MQTT 开源软件(以最常用的 Eclipse Paho Embedded C 为例)的移植与使用指南。
在开始移植之前,需要选择适合你硬件资源的开源库:
| 库名称 | 适用场景 | 语言 | 特点 |
|---|---|---|---|
| Eclipse Paho Embedded C | 首选。适用于资源受限的 MCU(STM32, NXP 等)。 | C | 极简,同步模式为主,易于移植。 |
| Mosquitto | Linux 网关/服务器。 | C | 功能完整,但依赖较多,不适合小 RAM 单片机。 |
| coreMQTT (FreeRTOS) | 使用 FreeRTOS 的设备。 | C | 专为 AWS 和 FreeRTOS 优化,线程安全。 |
| Paho MQTT C++ | 高性能嵌入式 Linux。 | C++ | 封装更好,适合复杂逻辑。 |
本指南主要针对嵌入式开发中需求最大的 Paho Embedded C 进行讲解。
Paho Embedded C 的核心逻辑是平台无关的,你需要做的就是填充底层的 “网络接口” 和 “定时器接口”。
从 GitHub 下载源码,主要保留 MQTTPacket/src 下的文件。你需要为你的平台创建一个 transport.c(网络层)和 timer.c(时间层)。
MQTT 依赖 TCP。你需要将你的 TCP 栈(如 LwIP, AT 指令, 或 Linux Socket)封装成 Paho 需要的格式。
你需要定义一个 Network 结构体,并实现 read 和 write 函数:
C
typedef struct Network {
int my_socket;
int (*mqttread) (struct Network*, unsigned char*, int, int);
int (*mqttwrite) (struct Network*, unsigned char*, int, int);
} Network;
// 移植写函数:调用底层的 send (以 LwIP 为例)
int linux_read(Network* n, unsigned char* buffer, int len, int timeout_ms) {
// 设置 socket 超时
setsockopt(n->my_socket, SOL_SOCKET, SO_RCVTIMEO, ...);
int bytes = recv(n->my_socket, buffer, len, 0);
return bytes;
}
// 移植读函数:调用底层的 recv
int linux_write(Network* n, unsigned char* buffer, int len, int timeout_ms) {
int bytes = send(n->my_socket, buffer, len, 0);
return bytes;
}
MQTT 需要定时器来处理 Keep-alive(心跳保活) 和 超时重传。
C
typedef struct Timer {
unsigned long systick_period;
unsigned long end_time;
} Timer;
// 检查定时器是否超时
char TimerIsExpired(Timer* timer) {
long left = timer->end_time - HAL_GetTick(); // 使用系统的 Tick
return (left < 0);
}
// 倒计时设置
void TimerCountdownMS(Timer* timer, unsigned int timeout_ms) {
timer->end_time = HAL_GetTick() + timeout_ms;
}
移植完成后,标准的业务代码流程如下:
C
int main(void) {
// 1. 底层网络初始化 (连接 Wi-Fi 或以太网)
NetworkInit(&n);
ConnectNetwork(&n, "broker.emqx.io", 1883); // 连接到 Broker
// 2. MQTT 客户端初始化
MQTTClient(&client, &n, 1000, sendBuf, sizeof(sendBuf), readBuf, sizeof(readBuf));
// 3. 配置连接参数
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
data.MQTTVersion = 3;
data.clientID.cstring = "My_Embedded_Device_001";
data.keepAliveInterval = 60; // 60秒心跳
data.cleansession = 1;
// 4. 发送 CONNECT 包
if (MQTTConnect(&client, &data) != SUCCESS) {
printf("Connection failed\n");
}
}
订阅需要注册一个回调函数(Callback),当收到消息时自动触发。
C
// 消息回调函数
void messageArrived(MessageData* data) {
printf("Message arrived on topic %.*s: %.*s\n",
data->topicName->lenstring.len, data->topicName->lenstring.data,
data->message->payloadlen, (char*)data->message->payload);
}
// 订阅
MQTTSubscribe(&client, "device/control", QOS0, messageArrived);
C
MQTTMessage message;
char payload[30];
sprintf(payload, "Temp: %d", 25);
message.qos = QOS1;
message.retained = 0;
message.payload = payload;
message.payloadlen = strlen(payload);
MQTTPublish(&client, "device/status", &message);
这是最关键的一步。 你必须在主循环中不断调用 MQTTYield,否则客户端无法接收服务器的 ACK 包,也无法处理心跳,导致断连。
C
while(1) {
// 检查是否有数据到达,保持心跳,处理超时
MQTTYield(&client, 100);
// 执行其他任务...
}
readBuf 和 sendBuf 足够大(至少要大于你预期的最大包长)。MQTTYield 调用频率太低,或者单次循环由于其他阻塞任务耗时太长,导致超过了 KeepAlive 时间。
- 对策: 确保主循环非阻塞,或者使用 RTOS 将 MQTT 放在独立任务中。MQTTYield 的时间片。在开发过程中,不要只看代码,要使用工具观察 Broker 端的接收情况: