|
@@ -1,37 +1,34 @@
|
|
-import pykafka
|
|
|
|
-import re
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-class Topic():
|
|
|
|
- def __init__(self, topic_name, num_partitions=1, replication_factor=1, replica_assignment=[], config_entries=[]):
|
|
|
|
- """
|
|
|
|
- :param topic_name:
|
|
|
|
- :param num_partitions:
|
|
|
|
- :param replication_factor:
|
|
|
|
- :param replica_assignment: [(partition, replicas)]
|
|
|
|
- :param config_entries: [(config_name, config_value)]
|
|
|
|
- """
|
|
|
|
- self.topic_name = topic_name.encode()
|
|
|
|
- self.num_partitions = num_partitions
|
|
|
|
- self.replication_factor = replication_factor
|
|
|
|
- self.replica_assignment = replica_assignment
|
|
|
|
- self.config_entries = config_entries
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-client = pykafka.KafkaClient(hosts="192.168.36.199:9092")
|
|
|
|
-print(client.topics)
|
|
|
|
-# for i in client.brokers.values():
|
|
|
|
-# try:
|
|
|
|
-# i.create_topics(topic_reqs=(Topic("tag_sink_topic"),))
|
|
|
|
-# except Exception as e:
|
|
|
|
-# if re.search("41", str(e)):
|
|
|
|
-# print("该broker 不是 leader,交由下一个broker创建")
|
|
|
|
-# elif re.search("7", str(e)):
|
|
|
|
-# print("创建完成")
|
|
|
|
-# break
|
|
|
|
-# elif re.search("36", str(e)):
|
|
|
|
-# print("topic 已存在")
|
|
|
|
-# break
|
|
|
|
-# else:
|
|
|
|
-# raise e
|
|
|
|
-# print(client.topics)
|
|
|
|
|
|
+from kafka import KafkaAdminClient
|
|
|
|
+from kafka.admin import NewTopic
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+# 获取topic列表
|
|
|
|
+def list_topics(bootstrap_servers):
|
|
|
|
+ admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
|
|
|
|
+ topics = admin_client.list_topics()
|
|
|
|
+ return topics
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+# 创建topic
|
|
|
|
+def create_topic(topic_name, partitions=1, replication_factor=1, bootstrap_servers='localhost:9092'):
|
|
|
|
+ # Create an admin client
|
|
|
|
+ admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers, client_id='create_topic')
|
|
|
|
+
|
|
|
|
+ # Check if the topic already exists
|
|
|
|
+ topic_list = list_topics(bootstrap_servers)
|
|
|
|
+ if topic_name in topic_list:
|
|
|
|
+ print(f"Topic '{topic_name}' already exists.")
|
|
|
|
+ return
|
|
|
|
+
|
|
|
|
+ # Create the new topic
|
|
|
|
+ topic = NewTopic(
|
|
|
|
+ name=topic_name,
|
|
|
|
+ num_partitions=partitions,
|
|
|
|
+ replication_factor=replication_factor
|
|
|
|
+ )
|
|
|
|
+ admin_client.create_topics([topic])
|
|
|
|
+ print(f"Topic '{topic_name}' created.")
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+# create_topic('zipkin', bootstrap_servers=['kafka-0.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092', 'kafka-1.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092', 'kafka-2.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092'])
|
|
|
|
+print(list_topics(['kafka-0.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092', 'kafka-1.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092', 'kafka-2.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092']))
|