import json import time from kafka import KafkaProducer producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'), bootstrap_servers='localhost:9092') index = 0 while True: msg = "相信未来----------{}------------".format(index).encode('GBK') # 发送内容,必须是bytes类型,GBK可显示中文 time.sleep(5) # 每个5秒推送一条数据 index += 1 print(msg.decode('GBK')) # 防止中文乱码 producer.send('test', msg, partition=0) # 发送的topic为test