翼度科技»论坛 编程开发 python 查看内容

Python向kafka发消息

8

主题

8

帖子

24

积分

新手上路

Rank: 1

积分
24
后端研发可以提供一个向kafka发消息的接口,用requests向接口post消息就行:
  1. import requests
  2. import json
  3. import time
  4. now = int(time.time())
  5. n = 10
  6. while n > 0:
  7.     tt = now - n * 60
  8.     data = {
  9.         "queue": "alarm-dog-alarm-dog-test",
  10.         "payload": "{"test":80,"notice_time":%d}" % tt
  11.     }
  12.     header = {"Content-Type": "application/json"}
  13.     res = requests.post(url="http://10.90.100.130:8088/v1/kafka/send", headers=header, data=json.dumps(data))
  14.     print(res.status_code)
  15.     print(res.content)
  16.     n -= 1
复制代码
如果没有提供接口,可以借助python-kafka库连接kafka,模拟生产者向kafka发消息:
同步发送消息:
  1. from kafka import KafkaProducer
  2. import json
  3. # 创建一个KafkaProducer实例,指定Kafka服务器地址
  4. producer = KafkaProducer(bootstrap_servers='http://10.90.100.130:8088')
  5. # 要发送的消息内容
  6. message = {'test': 80, 'notice_time': 5}
  7. # 将消息转换为JSON字符串格式(也可以是其他格式,如纯文本)
  8. message_json = json.dumps(message)
  9. # 发送消息到指定的Kafka主题,这里主题名称是'my_topic'
  10. producer.send('alarm-dog-alarm-dog-test', value=message_json.encode('utf - 8'))
  11. # 确保所有消息都已发送
  12. producer.flush()
  13. # 关闭生产者连接
  14. producer.close()
复制代码
异步发送消息
  1. from kafka import KafkaProducer
  2. import json
  3. import time
  4. # 创建一个KafkaProducer实例,设置异步发送和回调函数
  5. producer = KafkaProducer(bootstrap_servers='http://10.90.100.130:8088',
  6.           acks='all',
  7.           retries=3,
  8.           value_deliver_callback=lambda m: print(f"消息已发送到主题{m.topic()},分区{m.partition()}"))
  9. # 要发送的消息内容
  10. message = {'test': 80, 'notice_time': 6}
  11. message_json = json.dumps(message)
  12. # 异步发送消息到'my_topic'主题
  13. future = producer.send('alarm-dog-alarm-dog-test', value=message_json.encode('utf - 8'))
  14. try:
  15. record_metadata = future.get(timeout=10)
  16. print(f"消息已发送到主题{record_metadata.topic()},分区{record_metadata.partition()},偏移量{record_metadata.offset()}")
  17. except Exception as e:
  18. print(f"发送消息时出错: {e}")
  19. # 关闭生产者连接
  20. producer.close()
复制代码
 

来源:https://www.cnblogs.com/ailiailan/p/18523653
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具