翼度科技»论坛 编程开发 python 查看内容

Python Kafka客户端confluent-kafka学习总结

4

主题

4

帖子

12

积分

新手上路

Rank: 1

积分
12
实践环境

Python 3.6.2
confluent-kafka 2.2.0
confluent-kafka简介

Confluent在GitHub上开发和维护的confluent-kafka-python,Apache Kafka®的一个python客户端,提供了一个与所有brokers>=v0.8的kafka 、Confluent Cloud和Confluent Platform兼容的高阶级生产者、消费者和AdminClient。
confluent-kafka安装
  1. pip install confluent-kafka
复制代码
代码实践

Kafka生产者
  1. from confluent_kafka import Producer
  2. import socket
  3. def acked(err, msg):
  4.     if err is not None:
  5.         print("Failed to deliver message: %s: %s" % msg.value(), str(err))
  6.     else:
  7.         print("Message produced: %s" % msg.value())
  8. if __name__ == '__main__':
  9.     topic_name = 'FREE_TOPIC_FOR_TEST'
  10.     ### 初始化Producer (针对本地运行的Kafka,即不在Confluent云平台上运行的Kafka)
  11.     conf = {'bootstrap.servers': '100.81.xxx.xx:9092,100.81.xxx.xx:9092',
  12.             'client.id': socket.gethostname()}
  13.     producer = Producer(conf)
  14.     ### 异步写 kafka
  15.     # 给kafka发送消息--异步请求
  16.     producer.produce(topic_name, key="key", value="value")
  17.     # 添加回调函数
  18.     producer.produce(topic_name, value="test value", callback=acked)
  19.     # 最多等待事件1秒钟。等待期间,如果消息被确认,即成功写入kafka中,将调用回调 callback指定方法 acked
  20.     producer.poll(1)
  21.     ### 同步写kafka
  22.     producer.produce(topic_name, key="key", value="new msg")
  23.     producer.flush()
复制代码
说明:
produce方法
  1. producer.produce(topic, key="key", value="value", callback=None) # 给kafka发送消息
复制代码

  • topic kafka主题,如果主题不存在,则将自动创建
  • key 可选
  • value  需要发送的消息,可以为None
  • callback 回调函数。
product调用为异步请求,所以调用后立即完成,且不会返回值。如果由于librdkafka的本地生产队列已满而导致消息无法入队,则会引发KafkaException。
如果要接收发送是否成功或失败的通知,可以传递callback参数,该参数值可以是任何可调用的,例如lambda、函数、绑定方法或可调用对象。尽管produce()方法会立即将消息加入队列以进行批处理、压缩并传输到代理,但在调用poll()之前,不会传播任何传递通知事件。
flush方法
flush()方法用于同步写kafka。这通常是个坏主意,因为它有效地将吞吐量限制在broker往返时间内,但在某些情况下可能是合理的。
通常,应该在关闭生产者之前调用flush(),以确保所有未完成的/排队的/in-flight的消息都被传递。
Kafka消费者
  1. import time
  2. from confluent_kafka import Consumer
  3. from confluent_kafka import KafkaException, KafkaError
  4. running = True
  5. def msg_process(msg):
  6.     value = msg.value()
  7.     if value:
  8.         value = value.decode('utf-8') # 假设消息可采用 utf-8解码
  9.         
  10.     return {
  11.         'topic': msg.topic(),
  12.         'partition': msg.partition(),
  13.         'offset': msg.offset(),
  14.         'value': value
  15.     }
  16.       
  17. def consume_loop(consumer, topics):
  18.     global running
  19.     try:
  20.         consumer.subscribe(topics) # 订阅主题
  21.         while running:
  22.             msg = consumer.poll(timeout=10.0)
  23.             if msg is None:
  24.                 time.sleep(0.1)
  25.                 continue
  26.             if msg.error():
  27.                 if msg.error().code() == KafkaError._PARTITION_EOF:
  28.                     # End of partition event
  29.                     print('%% %s [%d] reached end at offset %d\n' %
  30.                                      (msg.topic(), msg.partition(), msg.offset()))
  31.                 elif msg.error():
  32.                     raise KafkaException(msg.error())
  33.             else:
  34.                 res = msg_process(msg)
  35.                 try:
  36.                     result = '{' + '"topic": "{topic}", "partition": {partition}, "offset": {offset}, "value": {value}'.format(**res) + '}\n'
  37.                 except Exception:
  38.                     result = '{' + '"topic": "{topic}", "partition": {partition}, "offset": {offset}, "value": "{value}"'.format(**res) + '}\n'
  39.                 print(result)
  40.     finally:
  41.         # 关闭消费者以提交最后的偏移量
  42.         consumer.close()
  43. if __name__ == '__main__':
  44.     topic_name = 'FREE_TOPIC_FOR_TEST_1'
  45.     # 初始化消费者
  46.     conf = {'bootstrap.servers': '100.81.xxx.xx:9092,100.81.xxx.xx:9092',
  47.             'group.id': 'custom_consumer',
  48.             'enable.auto.commit': 'true',
  49.             'auto.offset.reset': 'smallest',
  50.             }
  51.     consumer = Consumer(conf)
  52.     consume_loop(consumer, [topic_name]) # 可以指定多个主题
复制代码
说明:
初始化消费者配置字典说明
  1. conf = {'bootstrap.servers': 'host1:9092,host2:9092',
  2.         'group.id': 'foo',
  3.         'enable.auto.commit': 'false',
  4.         'auto.offset.reset': 'smallest'}
复制代码
说明

  • group.id  属性是必需的,设置当前消费者归属的消费组,可以是事先不存在的消费组。
  • auto.offset.reset  属性指定针对当前消费组,在分区没有提交偏移量或提交偏移量无效(可能是由于日志截断)的情况下,消费者应该从哪个偏移量开始读取。可选值:
  • 'smallest'  如果针对当前消费组,分区未提交offset,则从头开始消费,否则从已提交的offset 开始消费(即读取上次提交offset之后生产的数据)。
  • 'largest'   如果针对当前消费组,分区未提交offset,则读取新生产的数据(在启动该消费者之后才生产的数据),不会读取之前的数据,否则从已提交的offset 开始消费,同smallest
  • 'earliest'   同 'smallest'
  • 'latest'   同 'largest'
​        kafka-0.10.1.X 版本之前:auto.offset.reset 的值为smallest和largest (offest保存在zk中)
​        kafka-0.10.1.X版本之后:auto.offset.reset 的值更改为 earliest, latest (offest保存在kafka的一个特殊的topic名为:__consumer_offsets里面)

  • enable.auto.commit  设置是否允许自动提交偏移量,默认为'true',即允许。
一个典型的Kafka消费者应用程序以循环消费为中心,该循环重复调用poll方法来逐条检索消费者在后台高效预取的记录。例中poll超时被硬编码为1秒。如果在此期间没有收到任何记录,则Consumer.poll()将返回一个空记录集。
注意,在使用完Consumer之后,应该始终调用Consumer.close(),以确保活动套接字处于关闭状态,并清理内部状态。此外,还将立即触发组再均衡(group rebalance),以确保消费者拥有的任何分区都被重新分配给组中的另一个成员。如果未正确关闭,broker将仅在会话超时到期后才触发再均衡。
同步提交

手动提交偏移量的最简单、最可靠的方法是为Consumer.commit()调用设置asynchronous参数,与此同时设置构建消费者对象参数配置'enable.auto.commit'为'false'。
  1. MIN_COMMIT_COUNT = 10
  2. def consume_loop(consumer, topics):
  3.     try:
  4.         consumer.subscribe(topics)
  5.         msg_count = 0
  6.         while running:
  7.             msg = consumer.poll(timeout=1.0)
  8.             if msg is None: continue
  9.             if msg.error():
  10.                 if msg.error().code() == KafkaError._PARTITION_EOF:
  11.                     # End of partition event
  12.                     print('%% %s [%d] reached end at offset %d\n' %
  13.                                      (msg.topic(), msg.partition(), msg.offset()))
  14.                 elif msg.error():
  15.                     raise KafkaException(msg.error())
  16.             else:
  17.                 msg_process(msg)
  18.                 msg_count += 1
  19.                 if msg_count % MIN_COMMIT_COUNT == 0:
  20.                     consumer.commit(asynchronous=False)
  21.     finally:
  22.         # 关闭消费者以提交最后的偏移量
  23.         consumer.close()
复制代码
在本例中,每消费MIN_COMMIT_COUNT 消息都会触发一次同步提交。asynchronous标志控制此调用是否为异步调用,默认为False,即同步 。您还可以在超时到期时触发提交,以确保定期更新提交的位置。
消息投递保证

在前面的示例中,由于提交在消息处理之后,所以获得了“至少一次(at least once)”投递。然而,通过更改提交偏移和处理消息的顺序,可获得“最多一次(at most once)”投递,但必须小心提交失败。
说明:

  • 最多一次(at most once):消息可能丢失也可能被处理,但最多只会处理一次。因为当提交offset后,处理消息过程中出错导致消息处理失败,或者消费者down掉,导致消息不被处理。
  • 至少一次(at least once):消息不会丢失,但可能被处理多次。先获取消息,然后处理消息,最后提交offset,提交offset时,可能会因为网络超时,消费者down掉等,导致提交偏移量失败的情况,所以,会导致重复消费消息的情况,进而导致多次处理消息。
  1. def consume_loop(consumer, topics):
  2.     try:
  3.         consumer.subscribe(topics)
  4.         while running:
  5.             msg = consumer.poll(timeout=1.0)
  6.             if msg is None: continue
  7.             if msg.error():
  8.                 if msg.error().code() == KafkaError._PARTITION_EOF:
  9.                     # End of partition event
  10.                     print('%% %s [%d] reached end at offset %d\n' %
  11.                                      (msg.topic(), msg.partition(), msg.offset()))
  12.                 elif msg.error():
  13.                     raise KafkaException(msg.error())
  14.             else:
  15.                 consumer.commit(asynchronous=False)
  16.                 msg_process(msg)
  17.     finally:
  18.         # 关闭消费者以提交最后的偏移量
  19.         consumer.close()
复制代码
简单起见,在本例中,在处理消息之前使用Consumer.commit()。在实践中,对每条消息都进行提交会产生大量开销。更好的方法是收集一批消息,执行同步提交,然后只有在提交成功的情况下才处理消息。
异步提交
  1. def consume_loop(consumer, topics):
  2.     try:
  3.         consumer.subscribe(topics)
  4.         msg_count = 0
  5.         while running:
  6.             msg = consumer.poll(timeout=1.0)
  7.             if msg is None: continue
  8.             if msg.error():
  9.                 if msg.error().code() == KafkaError._PARTITION_EOF:
  10.                     # End of partition event
  11.                     print('%% %s [%d] reached end at offset %d\n' %
  12.                                      (msg.topic(), msg.partition(), msg.offset()))
  13.                 elif msg.error():
  14.                     raise KafkaException(msg.error())
  15.             else:
  16.                 msg_process(msg)
  17.                 msg_count += 1
  18.                 if msg_count % MIN_COMMIT_COUNT == 0:
  19.                     consumer.commit(asynchronous=True)
  20.     finally:
  21.         consumer.close()
复制代码
本例在前面示例的基础上,将commit() 的asynchronous 参数改成True,消费者将使用异步提交发送请求并立即返回
API提供了一个callback,当提交成功或失败时会调用该callback。 commit callback回调可以是任何可调用的,并且可以作为配置参数传递给消费者构造函数。
  1. from confluent_kafka import Consumer
  2. def commit_completed(err, partitions):
  3.     if err:
  4.         print(str(err))
  5.     else:
  6.         print("Committed partition offsets: " + str(partitions))
  7. conf = {'bootstrap.servers': "host1:9092,host2:9092",
  8.         'group.id': "foo",
  9.         'auto.offset.reset': 'smallest',
  10.         'on_commit': commit_completed}
  11. consumer = Consumer(conf)
复制代码
参考连接

https://docs.confluent.io/kafka-clients/python/current/overview.html#initialization
https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html

来源:https://www.cnblogs.com/shouke/p/17814004.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具