Kafka 是一个分布式的消息队列系统’
以把它理解成——一个超强的“中转站”,专门负责在不同程序之间高速传递数据,而且还能扛得住非常大的流量
Kafka 就像一个缓冲仓库:
程序A → 把数据丢进 Kafka
程序B → 从 Kafka 里面慢慢取
即使 B 挂了,数据也不会丢,它还在仓库里等着。
🏢 Broker(代理)
相当于物流分发中心里的一个个实体仓库。一个 Kafka 集群通常由多个 Broker 服务器组成,它们负责实际接收、存储和转发包裹(消息)。
🏷️ Topic(主题):
相当于包裹的分类标签(例如“物流状态更新”、“用户注册信息”)。它是一个逻辑上的分类,发货人和收货人都通过认准这个标签来对接工作。
🛤️ Partition(分区):
相当于同一个分类标签下的多条并行传送带。如果“物流状态更新”这个 Topic 的数据量极其庞大,一条传送带肯定会堵死。Kafka 就会把这个 Topic 拆分成多条独立的传送带(Partition)同时运作,分散压力。这就是它实现高并发的核心秘诀。
🏭/🛍️ Producer & Consumer(生产者 & 消费者):
Producer 是发货的商家,负责产生数据,并贴好标签(Topic)送到仓库。
Consumer 是收货的买家或处理中心,它们盯着自己需要的标签,从传送带上把数据取走处理。
👥 Consumer Group(消费者组): 相当于一个快递员团队。
负载均衡: 同一个团队(Group)里的多个快递员(Consumer),会分工负责不同的传送带(Partition)。这样大家分摊工作,同一个包裹只会被团队里的一个人处理一次,不会重复。
消息广播: 如果有两支完全不同的快递团队(属于不同的 Consumer Group)都关注了同一个 Topic,那么 Kafka 会保证这两支团队都能收到一份完整的包裹清单,互不干扰地独立处理。
在代码层面,我们通常不需要去编写 Broker(它是提前部署运行好的服务器程序),我们真正需要编写和控制的是 Producer(生产者) 和 Consumer(消费者)。而 Topic、Partition 和 Consumer Group 则是作为配置参数或对象属性**融入在我们的代码中。
⚙️ 连接 Broker: 无论是写生产者还是消费者,代码的第一步都是设置连接参数(通常叫 bootstrap.servers),告诉你的程序 Kafka 集群的 IP 地址和端口,这就相当于给司机导航到物流中心。
🏭 编写 Producer (发货端):
send() 方法将消息发往 Broker。group.id),告诉 Kafka 你属于哪个快递团队。subscribe() 方法,传入你关注的 Topic 名称。while(true)),不断调用 poll() 方法从分配给你的 Partition 上拉取新消息并执行你的业务逻辑。1️⃣ 🔌 建立连接 (Initialization): 第一步是让程序知道物流中心(Broker)在哪里。在代码中,就是初始化一个生产者对象并传入地址,比如 KafkaProducer(bootstrap_servers='localhost:9092')。
2️⃣ 📦 打包数据 (Serialization): Kafka 的底层传送带为了追求极致的速度,并不认识 Python 的字符串、列表或字典。它只接受一种最基础的数据格式:字节串 (Bytes)。所以发货前,必须把我们要发送的内容“打包”翻译成字节。
3️⃣ 🚀 执行发送 (Send): 指定这批货物的分类标签(Topic),然后调用发送指令,把打包好的字节数据推入队列。
4️⃣ 🧹 确认收尾 (Flush/Close): 生产者通常会在后台把多个小包裹凑成一车再发货(为了提高效率)。在程序结束前,我们需要执行一个强制发车指令,确保所有还在“发件箱”里的包裹都被真正送出,然后断开连接。
1️⃣ 🔌 建立连接与归属 (Initialization): 首先连接到物流中心(Broker)。除了提供地址,这里必须配置 group_id,告诉 Kafka 你属于哪支“快递团队”,这决定了你是要和别人分摊工作,还是独立接收所有消息。
2️⃣ 🏷️ 订阅分类 (Subscription): 明确告诉 Kafka 你要监听哪个或哪几个 Topic(包裹标签)。
3️⃣ 🔄 持续拉取 (Polling): 物流中心不会主动把包裹塞给你,你需要自己去拿。消费者通常会运行在一个无限循环中,不断地从传送带上把属于自己的新包裹拉取(Poll)下来。
4️⃣ 📦 拆包还原 (Deserialization): 把拉取到的底层字节串 (Bytes) 重新“翻译”回我们的程序可以处理的数据格式(比如字符串或字典)。
5️⃣ ✅ 签收确认 (Commit Offset): 这是非常关键的一步。处理完包裹里的数据后,需要告诉 Kafka “这个位置(Offset)的包裹我处理完了”。Kafka 会记录下这个进度,下次就不会再重复派发。
pip install kafka-python
根据生产者发送流程发送数据
import json
import time
from kafka import KafkaProducer
from kafka.errors import KafkaError
def run_logistics_producer():
# 设定发货的分类标签 (Topic)
topic_name = 'logistics_center_topic'
# ==========================================
# 1️⃣ 🔌 建立连接 (Initialization) & 2️⃣ 📦 打包数据 (Serialization)
# ==========================================
print("🔌 正在呼叫物流中心 (连接 Broker)...")
try:
# 在建立连接的同时,我们雇佣了一个“自动打包机” (value_serializer)
# 它会自动把 Python 的字典转化为 JSON 字符串,最后压缩成 Kafka 认识的 UTF-8 字节串 (Bytes)
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda cargo: json.dumps(cargo).encode('utf-8'),
retries=3 # 如果遇到网络波动,重试3次
)
print("✅ 成功连接到物流中心!")
except KafkaError as e:
print(f"❌ 连接失败,请检查物流中心 (Broker) 是否开启: {e}")
return
# 准备一批待发送的货物数据
cargos = [
{"tracking_number": "A001", "item": "机械键盘", "destination": "北京"},
{"tracking_number": "A002", "item": "高清显示器", "destination": "上海"},
{"tracking_number": "A003", "item": "人体工学椅", "destination": "广州"}
]
# ==========================================
# 3️⃣ 🚀 执行发送 (Send)
# ==========================================
print(f"\n🚀 开始向专属通道 '{topic_name}' 发送包裹...")
for cargo in cargos:
# send() 是一个异步指令。它把数据塞进内存的“发件箱”后就立刻返回,不会死等
producer.send(topic_name, value=cargo)
print(f"📦 已将包裹 [单号: {cargo['tracking_number']}] 推入传送带...")
time.sleep(0.5) # 稍微停顿一下,模拟真实业务中的数据生产间隔
# ==========================================
# 4️⃣ 🧹 确认收尾 (Flush/Close)
# ==========================================
print("\n🧹 准备下班,正在执行强制发车指令...")
# flush() 是强制发车指令。它会阻塞程序,直到“发件箱”里的所有缓存包裹都确确实实交给了物流中心
producer.flush()
print("🚚 所有缓存包裹已确认送达物流中心!")
# close() 负责切断连接,释放系统资源
producer.close()
print("🏁 物流通道已安全关闭,打卡下班!")
if __name__ == '__main__':
run_logistics_producer()
producer = KafkaProducer(参数名=参数值, ...)
| 参数名 | 默认值 | 物流隐喻 / 用法说明 | 生产环境建议配置 |
|---|---|---|---|
bootstrap_servers |
localhost |
物流中心地址。可以是一个或多个 Broker 节点地址的列表 (例如 ['ip1:9092', 'ip2:9092'])。 |
必填项,尽量填入多个地址以防单点故障。 |
value_serializer |
None |
货物打包机。将传入的数据(字典、字符串等)转换为 Kafka 认识的字节串。 | 强烈建议配置。常用 lambda v: json.dumps(v).encode('utf-8')。 |
key_serializer |
None |
运单号打包机。如果发送数据时指定了 key,需要将 key 也序列化。 |
若使用分区路由机制,需配置。通常设为 lambda k: k.encode('utf-8')。 |
acks |
1 |
签收回执级别。 • 0: 发出去就不管 (最快,最不安全) • 1: Leader节点收到就回复成功 • 'all': 所有副本节点都同步完才算成功 (最慢,最安全) |
涉及资金/核心业务选 'all',普通日志收集选 1。 |
retries |
0 |
重试次数。网络波动导致发送失败时,自动重新发送的次数。 | 建议设为 3 或以上。 |
batch_size |
16384 |
货厢容量 (字节)。当攒够这么大的数据量时,直接打包发车。 | 若单条数据小且并发高,可适当调大(如 32768 提高吞吐量)。 |
linger_ms |
0 |
发车最长等待时间 (毫秒)。即便货厢没满(batch_size 未达到),只要等够了这个时间,也会强制发车。 |
建议设为 5 到 50 毫秒。用微小的延迟换取极大的吞吐量提升。 |
future = producer.send('topic_name', value=my_data, key=my_key)
| 参数名 | 类型 | 物流隐喻 / 用法说明 |
|---|---|---|
topic |
str |
分类通道 (必填)。包裹要发往的 Topic 名称。 |
value |
any |
包裹内容 (必填)。实际发送的数据,需要能被 value_serializer 处理。 |
key |
any |
路由凭证 (可选)。具有相同 key 的包裹,会被保证送入同一个 Partition(分区),从而保证消费顺序。例如用“用户ID”做 Key,该用户的所有操作就会按顺序处理。 |
partition |
int |
指定通道 (可选)。直接写死要发往哪个分区。通常不推荐,交由 key 自动路由更好。 |
producer.flush(timeout=None)
producer.close(timeout=None)
flush(timeout): 强制将目前还在缓冲区内的所有数据发送出去。timeout 可以限制最长死等多久(秒)。
close(timeout): 关闭生产者,释放连接。它内部会自动调用一次 flush()。
根据生产者发送流程发送数据
import json
from kafka import KafkaConsumer
def run_minimal_consumer():
topic_name = 'logistics_center_topic'
# ==========================================
# 1️⃣ 🔌 建立连接与归属 & 2️⃣ 🏷️ 订阅分类 & 4️⃣ 📦 拆包还原
# ==========================================
print("🔌 正在连接物流中心,注册快递团队...")
consumer = KafkaConsumer(
topic_name, # 直接在这里传入要订阅的 Topic
bootstrap_servers=['localhost:9092'],
group_id='delivery_team_A', # 注册快递团队编号
# 自动拆包机:把接收到的 UTF-8 字节串重新还原为 Python 字典
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest', # 如果是新团队,从传送带上最早的包裹开始拿
enable_auto_commit=True # 允许系统在后台自动帮我们签收
)
print("✅ 注册成功,开始接单!")
# ==========================================
# 3️⃣ 🔄 持续拉取 & 5️⃣ ✅ 签收确认 (隐式)
# ==========================================
try:
print("🔄 传送带运转中,等待包裹...")
# for 循环本质上就是一个不断向 Kafka 发起 poll() 请求的死循环
for message in consumer:
cargo = message.value # 这里已经是拆包后的字典了
print(f"📦 收到包裹! 单号: {cargo.get('tracking_number')}, 目的地: {cargo.get('destination')}")
# 因为 enable_auto_commit=True,代码运行到这里,Kafka 在后台就自动记录进度了
except KeyboardInterrupt:
print("\n🛑 收到下班指令,准备退出...")
finally:
consumer.close()
print("🏁 消费者已安全退出。")
if __name__ == '__main__':
run_minimal_consumer()
consumer = KafkaConsumer(参数名=参数值, ...)
| 参数名 | 默认值 | 物流隐喻 / 用法说明 | 生产环境建议配置 |
|---|---|---|---|
group_id |
None |
快递团队编号。Kafka 会把一个 Topic 的包裹均分给同一个 Group 里的消费者。如果你开两个终端运行同样的 group_id,它们会平摊任务;如果 group_id 不同,它们都会收到全量数据。 |
必填!这是实现高并发处理和集群扩展的核心。 |
value_deserializer |
None |
拆包机。把拉取到的 Bytes 逆向转回代码对象。 | 强建议:lambda m: json.loads(m.decode('utf-8')) |
enable_auto_commit |
True |
自动签收。是否让后台定时自动汇报处理进度。 | 涉及重要数据务必设为 False,改为手动签收。 |
auto_offset_reset |
latest |
新兵策略。当一个全新的消费组首次连接,或者以前的进度记录丢失时,从哪里开始读数据? • 'latest': 只接手最新的包裹 (忽略以前的) • 'earliest': 从头开始把历史积压的包裹全拿过来 |
通常设为 'earliest' 以免漏掉数据。 |
max_poll_records |
500 |
手推车容量。每次去物流中心最多一次性抱回多少个包裹。 | 根据你处理每条数据的耗时来定,如果处理很慢,适当调小(如 50 或 100)。 |
consumer.subscribe(topics=['topic_A', 'topic_B'])
# 甚至支持正则表达式订阅
# consumer.subscribe(pattern='^logistics_.*')
按批次拉取 -> 处理数据 -> 确认没报错 -> 手动提交进度。
# timeout_ms=1000 表示去传送带看一眼,如果没有包裹,最多等1秒就回来,避免程序死锁
batch_records = consumer.poll(timeout_ms=1000)
for topic_partition, messages in batch_records.items():
for msg in messages:
# 处理 msg.value
pass
# 一批处理完,统一提交一次 Offset
consumer.commit()