|
后端研发可以提供一个向kafka发消息的接口,用requests向接口post消息就行:- import requests
- import json
- import time
- now = int(time.time())
- n = 10
- while n > 0:
- tt = now - n * 60
- data = {
- "queue": "alarm-dog-alarm-dog-test",
- "payload": "{"test":80,"notice_time":%d}" % tt
- }
- header = {"Content-Type": "application/json"}
- res = requests.post(url="http://10.90.100.130:8088/v1/kafka/send", headers=header, data=json.dumps(data))
- print(res.status_code)
- print(res.content)
- n -= 1
复制代码 如果没有提供接口,可以借助python-kafka库连接kafka,模拟生产者向kafka发消息:
同步发送消息:- from kafka import KafkaProducer
- import json
- # 创建一个KafkaProducer实例,指定Kafka服务器地址
- producer = KafkaProducer(bootstrap_servers='http://10.90.100.130:8088')
- # 要发送的消息内容
- message = {'test': 80, 'notice_time': 5}
- # 将消息转换为JSON字符串格式(也可以是其他格式,如纯文本)
- message_json = json.dumps(message)
- # 发送消息到指定的Kafka主题,这里主题名称是'my_topic'
- producer.send('alarm-dog-alarm-dog-test', value=message_json.encode('utf - 8'))
- # 确保所有消息都已发送
- producer.flush()
- # 关闭生产者连接
- producer.close()
复制代码 异步发送消息- from kafka import KafkaProducer
- import json
- import time
- # 创建一个KafkaProducer实例,设置异步发送和回调函数
- producer = KafkaProducer(bootstrap_servers='http://10.90.100.130:8088',
- acks='all',
- retries=3,
- value_deliver_callback=lambda m: print(f"消息已发送到主题{m.topic()},分区{m.partition()}"))
- # 要发送的消息内容
- message = {'test': 80, 'notice_time': 6}
- message_json = json.dumps(message)
- # 异步发送消息到'my_topic'主题
- future = producer.send('alarm-dog-alarm-dog-test', value=message_json.encode('utf - 8'))
- try:
- record_metadata = future.get(timeout=10)
- print(f"消息已发送到主题{record_metadata.topic()},分区{record_metadata.partition()},偏移量{record_metadata.offset()}")
- except Exception as e:
- print(f"发送消息时出错: {e}")
- # 关闭生产者连接
- producer.close()
复制代码
来源:https://www.cnblogs.com/ailiailan/p/18523653
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
|