KafkaPushTest11.py 416 B

1234567891011121314
  1. import json
  2. import time
  3. from kafka import KafkaProducer
  4. producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),
  5. bootstrap_servers=['192.168.36.199:9092'])
  6. index = 0
  7. while True:
  8. time.sleep(2) # 每个5秒推送一条数据
  9. producer.send('tag_atom_topic', json.dumps({
  10. "msg": "nihao"
  11. }), partition=0) # 发送的topic为test
  12. print("===")