12345678910111213141516171819202122232425262728293031323334353637 |
- import pykafka
- import re
- class Topic():
- def __init__(self, topic_name, num_partitions=1, replication_factor=1, replica_assignment=[], config_entries=[]):
- """
- :param topic_name:
- :param num_partitions:
- :param replication_factor:
- :param replica_assignment: [(partition, replicas)]
- :param config_entries: [(config_name, config_value)]
- """
- self.topic_name = topic_name.encode()
- self.num_partitions = num_partitions
- self.replication_factor = replication_factor
- self.replica_assignment = replica_assignment
- self.config_entries = config_entries
- client = pykafka.KafkaClient(hosts="192.168.36.199:9092")
- print(client.topics)
- # for i in client.brokers.values():
- # try:
- # i.create_topics(topic_reqs=(Topic("tag_sink_topic"),))
- # except Exception as e:
- # if re.search("41", str(e)):
- # print("该broker 不是 leader,交由下一个broker创建")
- # elif re.search("7", str(e)):
- # print("创建完成")
- # break
- # elif re.search("36", str(e)):
- # print("topic 已存在")
- # break
- # else:
- # raise e
- # print(client.topics)
|