首先,你需要安装 paho-mqtt 库。打开终端或命令行执行:
Bash
pip install paho-mqtt
注意:Paho MQTT 在 2.0.0 版本(2024年初发布)引入了一些 API 变更。本教程基于 2.x 版本 编写,以确保代码是最新的。
Python 的 MQTT 开发是基于事件驱动的。你不需要写死循环去轮询“有没有消息”,而是定义好“当这件事发生时做什么”。
on_connect: 当连接到 Broker 成功时触发(通常在这里进行 subscribe)。on_message: 当收到订阅的主题消息时触发(处理核心业务逻辑)。on_publish: 当消息成功发送出去后触发。订阅者负责监听消息。我们将连接到一个公共测试服务器 (broker.emqx.io)。
新建文件 mqtt_sub.py:
Python
import paho.mqtt.client as mqtt
from paho.mqtt.enums import CallbackAPIVersion
# 1. 定义连接回调:连接成功后,自动订阅 Topic
def on_connect(client, userdata, flags, rc, properties=None):
if rc == 0:
print("✅ 连接成功!")
# 订阅主题:test/python/demo
client.subscribe("test/python/demo")
else:
print(f"❌ 连接失败,返回码: {rc}")
# 2. 定义消息回调:收到消息后执行的操作
def on_message(client, userdata, msg):
print(f"📩 收到消息 | Topic: {msg.topic} | Payload: {msg.payload.decode('utf-8')}")
# 3. 创建客户端实例 (使用 V2 API)
client = mqtt.Client(callback_api_version=CallbackAPIVersion.VERSION2)
# 4. 绑定回调函数
client.on_connect = on_connect
client.on_message = on_message
# 5. 连接 Broker (公共测试服务器,无需密码)
broker_address = "broker.emqx.io"
port = 1883
print(f"正在连接到 {broker_address}...")
client.connect(broker_address, port, 60)
# 6. 开启阻塞循环 (保持程序一直运行,监听网络)
try:
client.loop_forever()
except KeyboardInterrupt:
print("程序已停止")
client.disconnect()
发布者负责发送数据。我们可以模拟一个传感器,每隔 2 秒发一次数据。
新建文件 mqtt_pub.py:
Python
import paho.mqtt.client as mqtt
from paho.mqtt.enums import CallbackAPIVersion
import time
import json
import random
# 创建客户端
client = mqtt.Client(callback_api_version=CallbackAPIVersion.VERSION2)
# 连接 Broker
broker_address = "broker.emqx.io"
client.connect(broker_address, 1883, 60)
# 开启网络循环(后台线程模式,非阻塞)
# 这一点很重要:如果不开启 loop,消息可能发不出去
client.loop_start()
topic = "test/python/demo"
try:
while True:
# 模拟生成一些数据 (通常建议使用 JSON 格式)
payload = {
"temperature": round(random.uniform(20.0, 30.0), 2),
"humidity": random.randint(40, 80),
"status": "active"
}
# 将字典转换为 JSON 字符串
payload_str = json.dumps(payload)
# 发布消息
msg_info = client.publish(topic, payload_str, qos=1)
# 等待发布完成(可选,用于确保可靠性)
msg_info.wait_for_publish()
print(f"🚀 已发送: {payload_str}")
time.sleep(2)
except KeyboardInterrupt:
print("停止发布")
client.loop_stop()
client.disconnect()
Bash
python mqtt_sub.py
你会看到:“正在连接...” -> “连接成功!”
Bash
python mqtt_pub.py
在 IoT 开发中,传输纯文本(String)的情况很少,通常都传输 JSON。
json.dumps(data) 将 Python 字典转为 JSON 字符串。on_message 中使用 json.loads(msg.payload) 将数据还原为 Python 字典方便处理。生产环境的 Broker 肯定不是公开的,需要账号密码。在 connect 之前添加:
Python
client.username_pw_set("your_username", "your_password")
client.connect("your_broker_ip", 1883)
loop) 的选择Paho 提供了两种处理网络流量的方式,新手容易混淆:
client.loop_forever():
- 阻塞式。代码执行到这里就会卡住,直到程序断开。
- 适用场景:专门的接收程序(Subscriber),除了收消息不做别的。client.loop_start():
- 非阻塞式。它会启动一个新的后台线程来处理网络。
- 适用场景:发布者(Publisher),或者带有 GUI(图形界面)、Web 服务的程序,需要主线程去处理其他逻辑。Paho 库内置了重连机制。如果网络波动导致断开,库会自动尝试重连。你可以在 on_disconnect 回调中添加日志来监控这种情况:
Python
def on_disconnect(client, userdata, flags, rc, properties=None):
if rc != 0:
print("⚠️ 意外断开,正在尝试重连...")