MQTT + Python

分类: MQTT

1. 环境准备

首先,你需要安装 paho-mqtt 库。打开终端或命令行执行:

Bash

pip install paho-mqtt

注意:Paho MQTT 在 2.0.0 版本(2024年初发布)引入了一些 API 变更。本教程基于 2.x 版本 编写,以确保代码是最新的。


2. 核心概念:回调函数 (Callbacks)

Python 的 MQTT 开发是基于事件驱动的。你不需要写死循环去轮询“有没有消息”,而是定义好“当这件事发生时做什么”。

  • on_connect: 当连接到 Broker 成功时触发(通常在这里进行 subscribe)。
  • on_message: 当收到订阅的主题消息时触发(处理核心业务逻辑)。
  • on_publish: 当消息成功发送出去后触发。

3. 实战代码:编写一个订阅者 (Subscriber)

订阅者负责监听消息。我们将连接到一个公共测试服务器 (broker.emqx.io)。

新建文件 mqtt_sub.py

Python

import paho.mqtt.client as mqtt
from paho.mqtt.enums import CallbackAPIVersion

# 1. 定义连接回调:连接成功后,自动订阅 Topic
def on_connect(client, userdata, flags, rc, properties=None):
    if rc == 0:
        print("✅ 连接成功!")
        # 订阅主题:test/python/demo
        client.subscribe("test/python/demo")
    else:
        print(f"❌ 连接失败,返回码: {rc}")

# 2. 定义消息回调:收到消息后执行的操作
def on_message(client, userdata, msg):
    print(f"📩 收到消息 | Topic: {msg.topic} | Payload: {msg.payload.decode('utf-8')}")

# 3. 创建客户端实例 (使用 V2 API)
client = mqtt.Client(callback_api_version=CallbackAPIVersion.VERSION2)

# 4. 绑定回调函数
client.on_connect = on_connect
client.on_message = on_message

# 5. 连接 Broker (公共测试服务器,无需密码)
broker_address = "broker.emqx.io"
port = 1883
print(f"正在连接到 {broker_address}...")
client.connect(broker_address, port, 60)

# 6. 开启阻塞循环 (保持程序一直运行,监听网络)
try:
    client.loop_forever()
except KeyboardInterrupt:
    print("程序已停止")
    client.disconnect()

4. 实战代码:编写一个发布者 (Publisher)

发布者负责发送数据。我们可以模拟一个传感器,每隔 2 秒发一次数据。

新建文件 mqtt_pub.py

Python

import paho.mqtt.client as mqtt
from paho.mqtt.enums import CallbackAPIVersion
import time
import json
import random

# 创建客户端
client = mqtt.Client(callback_api_version=CallbackAPIVersion.VERSION2)

# 连接 Broker
broker_address = "broker.emqx.io"
client.connect(broker_address, 1883, 60)

# 开启网络循环(后台线程模式,非阻塞)
# 这一点很重要:如果不开启 loop,消息可能发不出去
client.loop_start()

topic = "test/python/demo"

try:
    while True:
        # 模拟生成一些数据 (通常建议使用 JSON 格式)
        payload = {
            "temperature": round(random.uniform(20.0, 30.0), 2),
            "humidity": random.randint(40, 80),
            "status": "active"
        }

        # 将字典转换为 JSON 字符串
        payload_str = json.dumps(payload)

        # 发布消息
        msg_info = client.publish(topic, payload_str, qos=1)

        # 等待发布完成(可选,用于确保可靠性)
        msg_info.wait_for_publish()

        print(f"🚀 已发送: {payload_str}")

        time.sleep(2)

except KeyboardInterrupt:
    print("停止发布")
    client.loop_stop()
    client.disconnect()

5. 运行测试

  1. 打开第一个终端,运行 订阅者

Bash

python mqtt_sub.py

你会看到:“正在连接...” -> “连接成功!”

  1. 打开第二个终端,运行 发布者

Bash

python mqtt_pub.py

  1. 观察结果
  • 发布者终端会不断打印“已发送...”。
  • 订阅者终端会实时打印收到 JSON 数据。

6. 进阶开发指南 (Best Practices)

A. 处理 JSON 数据

在 IoT 开发中,传输纯文本(String)的情况很少,通常都传输 JSON

  • 发送端:使用 json.dumps(data) 将 Python 字典转为 JSON 字符串。
  • 接收端:在 on_message 中使用 json.loads(msg.payload) 将数据还原为 Python 字典方便处理。

B. 用户名和密码认证

生产环境的 Broker 肯定不是公开的,需要账号密码。在 connect 之前添加:

Python

client.username_pw_set("your_username", "your_password")
client.connect("your_broker_ip", 1883)

C. 网络循环 (loop) 的选择

Paho 提供了两种处理网络流量的方式,新手容易混淆:

  1. client.loop_forever(): - 阻塞式。代码执行到这里就会卡住,直到程序断开。 - 适用场景:专门的接收程序(Subscriber),除了收消息不做别的。
  2. client.loop_start(): - 非阻塞式。它会启动一个新的后台线程来处理网络。 - 适用场景:发布者(Publisher),或者带有 GUI(图形界面)、Web 服务的程序,需要主线程去处理其他逻辑。

D. 自动重连

Paho 库内置了重连机制。如果网络波动导致断开,库会自动尝试重连。你可以在 on_disconnect 回调中添加日志来监控这种情况:

Python

def on_disconnect(client, userdata, flags, rc, properties=None):
    if rc != 0:
        print("⚠️ 意外断开,正在尝试重连...")