Python + Kafuka 消息队列

分类: 网络编程

Python+kafka学习

kafuka基本概念

Kafka 是一个分布式的消息队列系统’

以把它理解成——一个超强的“中转站”,专门负责在不同程序之间高速传递数据,而且还能扛得住非常大的流量

Kafka 就像一个缓冲仓库:

程序A → 把数据丢进 Kafka
程序B → 从 Kafka 里面慢慢取

即使 B 挂了,数据也不会丢,它还在仓库里等着。

kafka中的角色

🏢 Broker(代理)

​ 相当于物流分发中心里的一个个实体仓库。一个 Kafka 集群通常由多个 Broker 服务器组成,它们负责实际接收、存储和转发包裹(消息)。

🏷️ Topic(主题):

​ 相当于包裹的分类标签(例如“物流状态更新”、“用户注册信息”)。它是一个逻辑上的分类,发货人和收货人都通过认准这个标签来对接工作。

🛤️ Partition(分区):

​ 相当于同一个分类标签下的多条并行传送带。如果“物流状态更新”这个 Topic 的数据量极其庞大,一条传送带肯定会堵死。Kafka 就会把这个 Topic 拆分成多条独立的传送带(Partition)同时运作,分散压力。这就是它实现高并发的核心秘诀。

🏭/🛍️ Producer & Consumer(生产者 & 消费者):

Producer发货的商家,负责产生数据,并贴好标签(Topic)送到仓库。

Consumer收货的买家或处理中心,它们盯着自己需要的标签,从传送带上把数据取走处理。

👥 Consumer Group(消费者组): 相当于一个快递员团队

负载均衡: 同一个团队(Group)里的多个快递员(Consumer),会分工负责不同的传送带(Partition)。这样大家分摊工作,同一个包裹只会被团队里的一个人处理一次,不会重复。

消息广播: 如果有两支完全不同的快递团队(属于不同的 Consumer Group)都关注了同一个 Topic,那么 Kafka 会保证这两支团队都能收到一份完整的包裹清单,互不干扰地独立处理。

编程和kafka

在代码层面,我们通常不需要去编写 Broker(它是提前部署运行好的服务器程序),我们真正需要编写和控制的是 Producer(生产者)Consumer(消费者)。而 TopicPartitionConsumer Group 则是作为配置参数对象属性**融入在我们的代码中。

  1. ⚙️ 连接 Broker: 无论是写生产者还是消费者,代码的第一步都是设置连接参数(通常叫 bootstrap.servers),告诉你的程序 Kafka 集群的 IP 地址和端口,这就相当于给司机导航到物流中心。

  2. 🏭 编写 Producer (发货端):

  • 创建生产者实例: 初始化一个 Producer 对象。
  • 封装消息对象: 创建一个记录(Record),在这个对象里你必须明确指定目标 Topic(发往哪个分类),还可以选择性地指定 Key(Kafka 会根据 Key 的哈希值决定把消息放进哪个 Partition)以及具体的数据内容(Value)。
  • 执行发送: 调用 send() 方法将消息发往 Broker。
  1. 🛍️ 编写 Consumer (收货端):
  • 创建消费者实例: 初始化一个 Consumer 对象。
  • 声明团队归属: 在配置参数中必须写明 Consumer Group 的 ID(group.id),告诉 Kafka 你属于哪个快递团队。
  • 订阅分类: 调用 subscribe() 方法,传入你关注的 Topic 名称。
  • 持续拉取: 编写一个无限循环(比如 while(true)),不断调用 poll() 方法从分配给你的 Partition 上拉取新消息并执行你的业务逻辑。

生产者发送流程

1️⃣ 🔌 建立连接 (Initialization): 第一步是让程序知道物流中心(Broker)在哪里。在代码中,就是初始化一个生产者对象并传入地址,比如 KafkaProducer(bootstrap_servers='localhost:9092')

2️⃣ 📦 打包数据 (Serialization): Kafka 的底层传送带为了追求极致的速度,并不认识 Python 的字符串、列表或字典。它只接受一种最基础的数据格式:字节串 (Bytes)。所以发货前,必须把我们要发送的内容“打包”翻译成字节。

3️⃣ 🚀 执行发送 (Send): 指定这批货物的分类标签(Topic),然后调用发送指令,把打包好的字节数据推入队列。

4️⃣ 🧹 确认收尾 (Flush/Close): 生产者通常会在后台把多个小包裹凑成一车再发货(为了提高效率)。在程序结束前,我们需要执行一个强制发车指令,确保所有还在“发件箱”里的包裹都被真正送出,然后断开连接。

消费者接收流程

1️⃣ 🔌 建立连接与归属 (Initialization): 首先连接到物流中心(Broker)。除了提供地址,这里必须配置 group_id,告诉 Kafka 你属于哪支“快递团队”,这决定了你是要和别人分摊工作,还是独立接收所有消息。

2️⃣ 🏷️ 订阅分类 (Subscription): 明确告诉 Kafka 你要监听哪个或哪几个 Topic(包裹标签)。

3️⃣ 🔄 持续拉取 (Polling): 物流中心不会主动把包裹塞给你,你需要自己去拿。消费者通常会运行在一个无限循环中,不断地从传送带上把属于自己的新包裹拉取(Poll)下来。

4️⃣ 📦 拆包还原 (Deserialization): 把拉取到的底层字节串 (Bytes) 重新“翻译”回我们的程序可以处理的数据格式(比如字符串或字典)。

5️⃣ ✅ 签收确认 (Commit Offset): 这是非常关键的一步。处理完包裹里的数据后,需要告诉 Kafka “这个位置(Offset)的包裹我处理完了”。Kafka 会记录下这个进度,下次就不会再重复派发。

python的kafka

安装 Python 库

pip install kafka-python

生产者

最小实例

根据生产者发送流程发送数据

import json
import time
from kafka import KafkaProducer
from kafka.errors import KafkaError

def run_logistics_producer():
    # 设定发货的分类标签 (Topic)
    topic_name = 'logistics_center_topic'

    # ==========================================
    # 1️⃣ 🔌 建立连接 (Initialization) & 2️⃣ 📦 打包数据 (Serialization)
    # ==========================================
    print("🔌 正在呼叫物流中心 (连接 Broker)...")
    try:
        # 在建立连接的同时,我们雇佣了一个“自动打包机” (value_serializer)
        # 它会自动把 Python 的字典转化为 JSON 字符串,最后压缩成 Kafka 认识的 UTF-8 字节串 (Bytes)
        producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda cargo: json.dumps(cargo).encode('utf-8'),
            retries=3 # 如果遇到网络波动,重试3次
        )
        print("✅ 成功连接到物流中心!")
    except KafkaError as e:
        print(f"❌ 连接失败,请检查物流中心 (Broker) 是否开启: {e}")
        return

    # 准备一批待发送的货物数据
    cargos = [
        {"tracking_number": "A001", "item": "机械键盘", "destination": "北京"},
        {"tracking_number": "A002", "item": "高清显示器", "destination": "上海"},
        {"tracking_number": "A003", "item": "人体工学椅", "destination": "广州"}
    ]

    # ==========================================
    # 3️⃣ 🚀 执行发送 (Send)
    # ==========================================
    print(f"\n🚀 开始向专属通道 '{topic_name}' 发送包裹...")
    for cargo in cargos:
        # send() 是一个异步指令。它把数据塞进内存的“发件箱”后就立刻返回,不会死等
        producer.send(topic_name, value=cargo)
        print(f"📦 已将包裹 [单号: {cargo['tracking_number']}] 推入传送带...")
        time.sleep(0.5)  # 稍微停顿一下,模拟真实业务中的数据生产间隔

    # ==========================================
    # 4️⃣ 🧹 确认收尾 (Flush/Close)
    # ==========================================
    print("\n🧹 准备下班,正在执行强制发车指令...")

    # flush() 是强制发车指令。它会阻塞程序,直到“发件箱”里的所有缓存包裹都确确实实交给了物流中心
    producer.flush() 
    print("🚚 所有缓存包裹已确认送达物流中心!")

    # close() 负责切断连接,释放系统资源
    producer.close()
    print("🏁 物流通道已安全关闭,打卡下班!")

if __name__ == '__main__':
    run_logistics_producer()

流程和参数

初始化与配置模版
producer = KafkaProducer(参数名=参数值, ...)
参数名 默认值 物流隐喻 / 用法说明 生产环境建议配置
bootstrap_servers localhost 物流中心地址。可以是一个或多个 Broker 节点地址的列表 (例如 ['ip1:9092', 'ip2:9092'])。 必填项,尽量填入多个地址以防单点故障。
value_serializer None 货物打包机。将传入的数据(字典、字符串等)转换为 Kafka 认识的字节串。 强烈建议配置。常用 lambda v: json.dumps(v).encode('utf-8')
key_serializer None 运单号打包机。如果发送数据时指定了 key,需要将 key 也序列化。 若使用分区路由机制,需配置。通常设为 lambda k: k.encode('utf-8')
acks 1 签收回执级别。 • 0: 发出去就不管 (最快,最不安全) • 1: Leader节点收到就回复成功 • 'all': 所有副本节点都同步完才算成功 (最慢,最安全) 涉及资金/核心业务选 'all',普通日志收集选 1
retries 0 重试次数。网络波动导致发送失败时,自动重新发送的次数。 建议设为 3 或以上。
batch_size 16384 货厢容量 (字节)。当攒够这么大的数据量时,直接打包发车。 若单条数据小且并发高,可适当调大(如 32768 提高吞吐量)。
linger_ms 0 发车最长等待时间 (毫秒)。即便货厢没满(batch_size 未达到),只要等够了这个时间,也会强制发车。 建议设为 550 毫秒。用微小的延迟换取极大的吞吐量提升。
执行发送模版
future = producer.send('topic_name', value=my_data, key=my_key)
参数名 类型 物流隐喻 / 用法说明
topic str 分类通道 (必填)。包裹要发往的 Topic 名称。
value any 包裹内容 (必填)。实际发送的数据,需要能被 value_serializer 处理。
key any 路由凭证 (可选)。具有相同 key 的包裹,会被保证送入同一个 Partition(分区),从而保证消费顺序。例如用“用户ID”做 Key,该用户的所有操作就会按顺序处理。
partition int 指定通道 (可选)。直接写死要发往哪个分区。通常不推荐,交由 key 自动路由更好。
确认与收尾模版
producer.flush(timeout=None)
producer.close(timeout=None)

flush(timeout): 强制将目前还在缓冲区内的所有数据发送出去。timeout 可以限制最长死等多久(秒)。

close(timeout): 关闭生产者,释放连接。它内部会自动调用一次 flush()

消费者

最小实例

根据生产者发送流程发送数据

import json
from kafka import KafkaConsumer

def run_minimal_consumer():
    topic_name = 'logistics_center_topic'

    # ==========================================
    # 1️⃣ 🔌 建立连接与归属 & 2️⃣ 🏷️ 订阅分类 & 4️⃣ 📦 拆包还原
    # ==========================================
    print("🔌 正在连接物流中心,注册快递团队...")
    consumer = KafkaConsumer(
        topic_name, # 直接在这里传入要订阅的 Topic
        bootstrap_servers=['localhost:9092'],
        group_id='delivery_team_A', # 注册快递团队编号
        # 自动拆包机:把接收到的 UTF-8 字节串重新还原为 Python 字典
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        auto_offset_reset='earliest', # 如果是新团队,从传送带上最早的包裹开始拿
        enable_auto_commit=True # 允许系统在后台自动帮我们签收
    )
    print("✅ 注册成功,开始接单!")

    # ==========================================
    # 3️⃣ 🔄 持续拉取 & 5️⃣ ✅ 签收确认 (隐式)
    # ==========================================
    try:
        print("🔄 传送带运转中,等待包裹...")
        # for 循环本质上就是一个不断向 Kafka 发起 poll() 请求的死循环
        for message in consumer:
            cargo = message.value # 这里已经是拆包后的字典了
            print(f"📦 收到包裹! 单号: {cargo.get('tracking_number')}, 目的地: {cargo.get('destination')}")
            # 因为 enable_auto_commit=True,代码运行到这里,Kafka 在后台就自动记录进度了
    except KeyboardInterrupt:
        print("\n🛑 收到下班指令,准备退出...")
    finally:
        consumer.close()
        print("🏁 消费者已安全退出。")

if __name__ == '__main__':
    run_minimal_consumer()

流程和参数

初始化与拆包模版
consumer = KafkaConsumer(参数名=参数值, ...)
参数名 默认值 物流隐喻 / 用法说明 生产环境建议配置
group_id None 快递团队编号。Kafka 会把一个 Topic 的包裹均分给同一个 Group 里的消费者。如果你开两个终端运行同样的 group_id,它们会平摊任务;如果 group_id 不同,它们都会收到全量数据。 必填!这是实现高并发处理和集群扩展的核心。
value_deserializer None 拆包机。把拉取到的 Bytes 逆向转回代码对象。 强建议:lambda m: json.loads(m.decode('utf-8'))
enable_auto_commit True 自动签收。是否让后台定时自动汇报处理进度。 涉及重要数据务必设为 False,改为手动签收。
auto_offset_reset latest 新兵策略。当一个全新的消费组首次连接,或者以前的进度记录丢失时,从哪里开始读数据? • 'latest': 只接手最新的包裹 (忽略以前的) • 'earliest': 从头开始把历史积压的包裹全拿过来 通常设为 'earliest' 以免漏掉数据。
max_poll_records 500 手推车容量。每次去物流中心最多一次性抱回多少个包裹。 根据你处理每条数据的耗时来定,如果处理很慢,适当调小(如 50100)。
订阅模版
consumer.subscribe(topics=['topic_A', 'topic_B'])
# 甚至支持正则表达式订阅
# consumer.subscribe(pattern='^logistics_.*')
持续拉取与手动签收模版
按批次拉取 -> 处理数据 -> 确认没报错 -> 手动提交进度。
# timeout_ms=1000 表示去传送带看一眼,如果没有包裹,最多等1秒就回来,避免程序死锁
batch_records = consumer.poll(timeout_ms=1000)

for topic_partition, messages in batch_records.items():
    for msg in messages:
        # 处理 msg.value
        pass

# 一批处理完,统一提交一次 Offset
consumer.commit()