kafka_topic.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. import pykafka
  2. import re
  3. class Topic():
  4. def __init__(self, topic_name, num_partitions=1, replication_factor=1, replica_assignment=[], config_entries=[]):
  5. """
  6. :param topic_name:
  7. :param num_partitions:
  8. :param replication_factor:
  9. :param replica_assignment: [(partition, replicas)]
  10. :param config_entries: [(config_name, config_value)]
  11. """
  12. self.topic_name = topic_name.encode()
  13. self.num_partitions = num_partitions
  14. self.replication_factor = replication_factor
  15. self.replica_assignment = replica_assignment
  16. self.config_entries = config_entries
  17. client = pykafka.KafkaClient(hosts="192.168.3.17:9092")
  18. print(client.topics)
  19. for i in client.brokers.values():
  20. try:
  21. i.create_topics(topic_reqs=(Topic("tag_sink_topic"),))
  22. except Exception as e:
  23. if re.search("41", str(e)):
  24. print("该broker 不是 leader,交由下一个broker创建")
  25. elif re.search("7", str(e)):
  26. print("创建完成")
  27. break
  28. elif re.search("36", str(e)):
  29. print("topic 已存在")
  30. break
  31. else:
  32. raise e
  33. print(client.topics)