kafka_topic.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334
  1. from kafka import KafkaAdminClient
  2. from kafka.admin import NewTopic
  3. # 获取topic列表
  4. def list_topics(bootstrap_servers):
  5. admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
  6. topics = admin_client.list_topics()
  7. return topics
  8. # 创建topic
  9. def create_topic(topic_name, partitions=1, replication_factor=1, bootstrap_servers='localhost:9092'):
  10. # Create an admin client
  11. admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers, client_id='create_topic')
  12. # Check if the topic already exists
  13. topic_list = list_topics(bootstrap_servers)
  14. if topic_name in topic_list:
  15. print(f"Topic '{topic_name}' already exists.")
  16. return
  17. # Create the new topic
  18. topic = NewTopic(
  19. name=topic_name,
  20. num_partitions=partitions,
  21. replication_factor=replication_factor
  22. )
  23. admin_client.create_topics([topic])
  24. print(f"Topic '{topic_name}' created.")
  25. # create_topic('zipkin', bootstrap_servers=['kafka-0.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092', 'kafka-1.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092', 'kafka-2.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092'])
  26. print(list_topics(['kafka-0.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092', 'kafka-1.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092', 'kafka-2.kafka-headless.aimpdev.svc.k5.bigtree.zone:9092']))