KafkaPushTest.py 544 B

1234567891011121314
  1. import json
  2. import time
  3. from kafka import KafkaProducer
  4. producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),
  5. bootstrap_servers='localhost:9092')
  6. index = 0
  7. while True:
  8. msg = "相信未来----------{}------------".format(index).encode('GBK') # 发送内容,必须是bytes类型,GBK可显示中文
  9. time.sleep(5) # 每个5秒推送一条数据
  10. index += 1
  11. print(msg.decode('GBK')) # 防止中文乱码
  12. producer.send('test', msg, partition=0) # 发送的topic为test