MQTT 开源软件移植与使用

分类: MQTT

MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅(Publish/Subscribe)模式的轻量级通讯协议,非常适合物联网(IoT)和嵌入式设备。

“移植”通常指的是将 MQTT 客户端库(Client Library)适配到特定的硬件平台(如 STM32, ESP32)或操作系统(如 FreeRTOS, Linux, 或裸机)上。

以下是关于 MQTT 开源软件(以最常用的 Eclipse Paho Embedded C 为例)的移植与使用指南。


第一阶段:选型与准备

在开始移植之前,需要选择适合你硬件资源的开源库:

库名称 适用场景 语言 特点
Eclipse Paho Embedded C 首选。适用于资源受限的 MCU(STM32, NXP 等)。 C 极简,同步模式为主,易于移植。
Mosquitto Linux 网关/服务器。 C 功能完整,但依赖较多,不适合小 RAM 单片机。
coreMQTT (FreeRTOS) 使用 FreeRTOS 的设备。 C 专为 AWS 和 FreeRTOS 优化,线程安全。
Paho MQTT C++ 高性能嵌入式 Linux。 C++ 封装更好,适合复杂逻辑。

本指南主要针对嵌入式开发中需求最大的 Paho Embedded C 进行讲解。


第二阶段:移植核心步骤 (Porting)

Paho Embedded C 的核心逻辑是平台无关的,你需要做的就是填充底层的 “网络接口”“定时器接口”

1. 准备文件

从 GitHub 下载源码,主要保留 MQTTPacket/src 下的文件。你需要为你的平台创建一个 transport.c(网络层)和 timer.c(时间层)。

2. 实现 Network 接口 (TCP/IP 适配)

MQTT 依赖 TCP。你需要将你的 TCP 栈(如 LwIP, AT 指令, 或 Linux Socket)封装成 Paho 需要的格式。

你需要定义一个 Network 结构体,并实现 readwrite 函数:

C

typedef struct Network {
    int my_socket;
    int (*mqttread) (struct Network*, unsigned char*, int, int);
    int (*mqttwrite) (struct Network*, unsigned char*, int, int);
} Network;

// 移植写函数:调用底层的 send (以 LwIP 为例)
int linux_read(Network* n, unsigned char* buffer, int len, int timeout_ms) {
    // 设置 socket 超时
    setsockopt(n->my_socket, SOL_SOCKET, SO_RCVTIMEO, ...);
    int bytes = recv(n->my_socket, buffer, len, 0);
    return bytes;
}

// 移植读函数:调用底层的 recv
int linux_write(Network* n, unsigned char* buffer, int len, int timeout_ms) {
    int bytes = send(n->my_socket, buffer, len, 0);
    return bytes;
}

3. 实现 Timer 接口 (时间管理)

MQTT 需要定时器来处理 Keep-alive(心跳保活)超时重传

C

typedef struct Timer {
    unsigned long systick_period;
    unsigned long end_time;
} Timer;

// 检查定时器是否超时
char TimerIsExpired(Timer* timer) {
    long left = timer->end_time - HAL_GetTick(); // 使用系统的 Tick
    return (left < 0);
}

// 倒计时设置
void TimerCountdownMS(Timer* timer, unsigned int timeout_ms) {
    timer->end_time = HAL_GetTick() + timeout_ms;
}

第三阶段:具体使用流程 (Usage)

移植完成后,标准的业务代码流程如下:

1. 初始化与连接

C

int main(void) {
    // 1. 底层网络初始化 (连接 Wi-Fi 或以太网)
    NetworkInit(&n);
    ConnectNetwork(&n, "broker.emqx.io", 1883); // 连接到 Broker

    // 2. MQTT 客户端初始化
    MQTTClient(&client, &n, 1000, sendBuf, sizeof(sendBuf), readBuf, sizeof(readBuf));

    // 3. 配置连接参数
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    data.MQTTVersion = 3;
    data.clientID.cstring = "My_Embedded_Device_001";
    data.keepAliveInterval = 60; // 60秒心跳
    data.cleansession = 1;

    // 4. 发送 CONNECT 包
    if (MQTTConnect(&client, &data) != SUCCESS) {
        printf("Connection failed\n");
    }
}

2. 订阅消息 (Subscribe)

订阅需要注册一个回调函数(Callback),当收到消息时自动触发。

C

// 消息回调函数
void messageArrived(MessageData* data) {
    printf("Message arrived on topic %.*s: %.*s\n",
           data->topicName->lenstring.len, data->topicName->lenstring.data,
           data->message->payloadlen, (char*)data->message->payload);
}

// 订阅
MQTTSubscribe(&client, "device/control", QOS0, messageArrived);

3. 发布消息 (Publish)

C

MQTTMessage message;
char payload[30];
sprintf(payload, "Temp: %d", 25);

message.qos = QOS1;
message.retained = 0;
message.payload = payload;
message.payloadlen = strlen(payload);

MQTTPublish(&client, "device/status", &message);

4. 核心循环 (Yield)

这是最关键的一步。 你必须在主循环中不断调用 MQTTYield,否则客户端无法接收服务器的 ACK 包,也无法处理心跳,导致断连。

C

while(1) {
    // 检查是否有数据到达,保持心跳,处理超时
    MQTTYield(&client, 100); 

    // 执行其他任务...
}

第四阶段:常见问题与调试

  1. 内存泄漏/栈溢出 - 现象: 程序运行一段时间后死机。 - 对策: Paho Embedded C 是静态内存分配,确保 readBufsendBuf 足够大(至少要大于你预期的最大包长)。
  2. 频繁断线 (Keep-alive timeout) - 原因: MQTTYield 调用频率太低,或者单次循环由于其他阻塞任务耗时太长,导致超过了 KeepAlive 时间。 - 对策: 确保主循环非阻塞,或者使用 RTOS 将 MQTT 放在独立任务中。
  3. QoS 1/2 消息发送失败 - 原因: 发送了 PUBLISH 但没收到 PUBACK。 - 对策: 检查网络层是否丢包,增加 MQTTYield 的时间片。

推荐调试工具

在开发过程中,不要只看代码,要使用工具观察 Broker 端的接收情况:

  • MQTT Explorer: 最好用的桌面客户端,可以看到 Topic 层级结构。
  • Wireshark: 抓包分析 TCP 握手和 MQTT 报文细节。