from kafka import KafkaAdminClient from kafka.admin import NewTopic # 获取topic列表 def list_topics(bootstrap_servers): admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers) topics = admin_client.list_topics() return topics # 创建topic def create_topic(topic_name, partitions=1, replication_factor=1, bootstrap_servers='localhost:9092'): # Create an admin client admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers, client_id='create_topic') # Check if the topic already exists topic_list = list_topics(bootstrap_servers) if topic_name in topic_list: print(f"Topic '{topic_name}' already exists.") return # Create the new topic topic = NewTopic( name=topic_name, num_partitions=partitions, replication_factor=replication_factor ) admin_client.create_topics([topic]) print(f"Topic '{topic_name}' created.") # create_topic('zipkin', bootstrap_servers=['kafka-0.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092', 'kafka-1.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092', 'kafka-2.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092']) print(list_topics(['kafka-0.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092', 'kafka-1.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092', 'kafka-2.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092']))