这是一个非常典型的物联网应用场景。在嵌入式开发中,利用 MQTT 转发传感器数据,核心流程可以概括为:采集 (Acquire) -> 序列化 (Serialize) -> 发布 (Publish)。
我们将基于上一轮讨论的 C语言 (Paho Embedded C) 环境,为您展示如何编写代码将温度/湿度传感器的数据转发到 MQTT 服务器。
在代码实现之前,我们需要定义好数据的主题 (Topic) 和 载荷 (Payload) 格式。
site/location/device_id/data_typefactory/line1/sensor_01/environment{"temp": 24.5, "humid": 60.2, "ts": 1618033988}假设你已经完成了上一阶段的 Network 和 Client 初始化。以下是主循环中读取传感器并转发的完整逻辑。
虽然可以使用 sprintf 手动拼接字符串,但在生产环境中,推荐使用 cJSON 或 MbedJSON 等轻量级库来避免缓冲区溢出。这里为了演示清晰,我们使用安全的格式化函数。
C
#include <stdio.h>
#include <string.h>
#include "MQTTClient.h"
// 假设这是你的传感器驱动函数
typedef struct {
float temperature;
float humidity;
} SensorData;
void Read_SHT30(SensorData* data) {
// 模拟读取传感器硬件寄存器
data->temperature = 25.4 + ((rand() % 10) / 10.0);
data->humidity = 55.0 + ((rand() % 10) / 10.0);
}
// 主任务循环
void MQTT_Sensor_Task(MQTTClient* client) {
SensorData currentData;
char payload_buffer[128]; // 确保缓冲区足够大
MQTTMessage message;
int rc;
while (1) {
// 1. 读取传感器数据
Read_SHT30(¤tData);
// 2. 数据打包 (序列化为 JSON)
// 格式示例: {"dev":"sensor_01","t":25.4,"h":55.2}
memset(payload_buffer, 0, sizeof(payload_buffer));
snprintf(payload_buffer, sizeof(payload_buffer),
"{\"dev\":\"sensor_01\",\"t\":%.2f,\"h\":%.2f}",
currentData.temperature,
currentData.humidity);
// 3. 配置 MQTT 消息包
message.qos = QOS1; // QOS1 确保至少到达一次
message.retained = 0; // 传感器流数据通常不需要 Retained
message.payload = payload_buffer;
message.payloadlen = strlen(payload_buffer);
// 4. 发布消息 (Forwarding)
// Topic: factory/line1/environment
printf("Publishing: %s\n", payload_buffer);
rc = MQTTPublish(client, "factory/line1/environment", &message);
if (rc != SUCCESS) {
printf("Error publishing data: %d\n", rc);
// 这里可以添加重连逻辑
}
// 5. 核心:Yield 保持心跳
// 假设我们每 5 秒发送一次数据
// 我们需要拆分延时,频繁调用 Yield 以处理下行消息(如远程控制指令)
for (int i = 0; i < 50; i++) {
MQTTYield(client, 100); // 每次 Yield 100ms
}
}
}
有时候你的 MCU 不是直接产生数据,而是作为一个网关,通过 串口 (UART)、蓝牙 (BLE) 或 Zigbee 接收其他小传感器的数据,然后通过 WiFi/以太网 转发给 MQTT。
这种情况下,代码逻辑需要由“定时采集”变为“事件驱动”:
C
// 伪代码:串口中断接收 -> MQTT 转发
void HAL_UART_RxCpltCallback(UART_HandleTypeDef *huart) {
// 1. 从串口缓冲区获取数据 (例如 Modbus 协议数据)
uint8_t* raw_data = UartBuffer;
// 2. 解析数据
float temp = ParseModbusTemp(raw_data);
// 3. 组装 MQTT Payload
char mqtt_payload[64];
snprintf(mqtt_payload, 64, "{\"id\":\"slave_01\",\"val\":%.2f}", temp);
// 4. 设置标志位,通知主循环发送 (不要在中断里直接发送网络包!)
SetFlag_ReadyToPublish(mqtt_payload);
}
// 主循环中
if (GetFlag_ReadyToPublish()) {
MQTTPublish(&client, "gateway/rx_data", &msg);
}
{"temperature": 25.5} (长) -> {"t": 25.5} (短)。
- 或者使用二进制格式(如 Protocol Buffers),但这会增加解析复杂度。data (所有设备都往这里发)
- 正确: device/ID_001/status, device/ID_002/status
- 这样后端订阅时,可以使用通配符 device/+/status 统一处理,也可以单独监听某个设备。