12345678910111213141516171819202122232425262728293031323334 |
- from kafka import KafkaAdminClient
- from kafka.admin import NewTopic
- # 获取topic列表
- def list_topics(bootstrap_servers):
- admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
- topics = admin_client.list_topics()
- return topics
- # 创建topic
- def create_topic(topic_name, partitions=1, replication_factor=1, bootstrap_servers='localhost:9092'):
- # Create an admin client
- admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers, client_id='create_topic')
- # Check if the topic already exists
- topic_list = list_topics(bootstrap_servers)
- if topic_name in topic_list:
- print(f"Topic '{topic_name}' already exists.")
- return
- # Create the new topic
- topic = NewTopic(
- name=topic_name,
- num_partitions=partitions,
- replication_factor=replication_factor
- )
- admin_client.create_topics([topic])
- print(f"Topic '{topic_name}' created.")
- # create_topic('zipkin', bootstrap_servers=['kafka-0.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092', 'kafka-1.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092', 'kafka-2.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092'])
- print(list_topics(['kafka-0.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092', 'kafka-1.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092', 'kafka-2.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092']))
|