使用MQTT转发传感器数据

分类: MQTT

这是一个非常典型的物联网应用场景。在嵌入式开发中,利用 MQTT 转发传感器数据,核心流程可以概括为:采集 (Acquire) -> 序列化 (Serialize) -> 发布 (Publish)

我们将基于上一轮讨论的 C语言 (Paho Embedded C) 环境,为您展示如何编写代码将温度/湿度传感器的数据转发到 MQTT 服务器。


1. 核心流程架构

在代码实现之前,我们需要定义好数据的主题 (Topic)载荷 (Payload) 格式。

  • Topic (主题): 就像文件路径,用于归类数据。
  • 推荐格式:site/location/device_id/data_type
  • 示例:factory/line1/sensor_01/environment
  • Payload (载荷): 实际传输的数据。
  • 强烈推荐使用 JSON 格式,而不是原始二进制。JSON 通用性强,后端处理方便。
  • 示例:{"temp": 24.5, "humid": 60.2, "ts": 1618033988}

2. 代码实现 (基于 C 语言)

假设你已经完成了上一阶段的 NetworkClient 初始化。以下是主循环中读取传感器并转发的完整逻辑。

A. 准备工作:JSON 序列化

虽然可以使用 sprintf 手动拼接字符串,但在生产环境中,推荐使用 cJSONMbedJSON 等轻量级库来避免缓冲区溢出。这里为了演示清晰,我们使用安全的格式化函数。

B. 完整代码示例

C

#include <stdio.h>
#include <string.h>
#include "MQTTClient.h"

// 假设这是你的传感器驱动函数
typedef struct {
    float temperature;
    float humidity;
} SensorData;

void Read_SHT30(SensorData* data) {
    // 模拟读取传感器硬件寄存器
    data->temperature = 25.4 + ((rand() % 10) / 10.0); 
    data->humidity = 55.0 + ((rand() % 10) / 10.0);
}

// 主任务循环
void MQTT_Sensor_Task(MQTTClient* client) {
    SensorData currentData;
    char payload_buffer[128]; // 确保缓冲区足够大
    MQTTMessage message;
    int rc;

    while (1) {
        // 1. 读取传感器数据
        Read_SHT30(&currentData);

        // 2. 数据打包 (序列化为 JSON)
        // 格式示例: {"dev":"sensor_01","t":25.4,"h":55.2}
        memset(payload_buffer, 0, sizeof(payload_buffer));
        snprintf(payload_buffer, sizeof(payload_buffer), 
                 "{\"dev\":\"sensor_01\",\"t\":%.2f,\"h\":%.2f}", 
                 currentData.temperature, 
                 currentData.humidity);

        // 3. 配置 MQTT 消息包
        message.qos = QOS1;           // QOS1 确保至少到达一次
        message.retained = 0;         // 传感器流数据通常不需要 Retained
        message.payload = payload_buffer;
        message.payloadlen = strlen(payload_buffer);

        // 4. 发布消息 (Forwarding)
        // Topic: factory/line1/environment
        printf("Publishing: %s\n", payload_buffer);
        rc = MQTTPublish(client, "factory/line1/environment", &message);

        if (rc != SUCCESS) {
            printf("Error publishing data: %d\n", rc);
            // 这里可以添加重连逻辑
        }

        // 5. 核心:Yield 保持心跳
        // 假设我们每 5 秒发送一次数据
        // 我们需要拆分延时,频繁调用 Yield 以处理下行消息(如远程控制指令)
        for (int i = 0; i < 50; i++) {
            MQTTYield(client, 100); // 每次 Yield 100ms
        }
    }
}

3. 进阶场景:网关转发模式 (Gateway Mode)

有时候你的 MCU 不是直接产生数据,而是作为一个网关,通过 串口 (UART)、蓝牙 (BLE) 或 Zigbee 接收其他小传感器的数据,然后通过 WiFi/以太网 转发给 MQTT。

这种情况下,代码逻辑需要由“定时采集”变为“事件驱动”:

C

// 伪代码:串口中断接收 -> MQTT 转发
void HAL_UART_RxCpltCallback(UART_HandleTypeDef *huart) {
    // 1. 从串口缓冲区获取数据 (例如 Modbus 协议数据)
    uint8_t* raw_data = UartBuffer;

    // 2. 解析数据
    float temp = ParseModbusTemp(raw_data);

    // 3. 组装 MQTT Payload
    char mqtt_payload[64];
    snprintf(mqtt_payload, 64, "{\"id\":\"slave_01\",\"val\":%.2f}", temp);

    // 4. 设置标志位,通知主循环发送 (不要在中断里直接发送网络包!)
    SetFlag_ReadyToPublish(mqtt_payload);
}

// 主循环中
if (GetFlag_ReadyToPublish()) {
    MQTTPublish(&client, "gateway/rx_data", &msg);
}

4. 关键注意事项 (Best Practices)

  1. QoS 选择 (服务质量): - 传感器流数据 (如温湿度): 使用 QoS 0。偶尔丢一个包没关系,重要的是实时性和低延迟。 - 告警数据 (如火灾报警): 必须使用 QoS 1QoS 2,确保服务器收到了信息。
  2. Payload 优化: - 虽然 JSON 好用,但如果是在带宽极低的环境(如 NB-IoT),可以使用简写。 - {"temperature": 25.5} (长) -> {"t": 25.5} (短)。 - 或者使用二进制格式(如 Protocol Buffers),但这会增加解析复杂度。
  3. Topic 设计: - 不要把所有数据发到同一个 Topic。 - 错误: data (所有设备都往这里发) - 正确: device/ID_001/status, device/ID_002/status - 这样后端订阅时,可以使用通配符 device/+/status 统一处理,也可以单独监听某个设备。
  4. Retained 消息 (保留消息): - 对于传感器数据,通常不使用 Retained。 - 但在设备状态 (在线/离线) 上报时,必须使用 Retained。这样当 Dashboard (控制面板) 刚连上时,就能立即知道设备是死是活,而不用等设备下一次发包。