import pykafka import re class Topic(): def __init__(self, topic_name, num_partitions=1, replication_factor=1, replica_assignment=[], config_entries=[]): """ :param topic_name: :param num_partitions: :param replication_factor: :param replica_assignment: [(partition, replicas)] :param config_entries: [(config_name, config_value)] """ self.topic_name = topic_name.encode() self.num_partitions = num_partitions self.replication_factor = replication_factor self.replica_assignment = replica_assignment self.config_entries = config_entries client = pykafka.KafkaClient(hosts="192.168.36.199:9092") print(client.topics) # for i in client.brokers.values(): # try: # i.create_topics(topic_reqs=(Topic("tag_sink_topic"),)) # except Exception as e: # if re.search("41", str(e)): # print("该broker 不是 leader,交由下一个broker创建") # elif re.search("7", str(e)): # print("创建完成") # break # elif re.search("36", str(e)): # print("topic 已存在") # break # else: # raise e # print(client.topics)