AKafkaPushTest.py 1001 B

123456789101112131415161718192021222324
  1. # class AsyncKafka(CfgBaseInit):
  2. # """异步请求模块
  3. # """
  4. # _async_kafka_session = None
  5. # _instance_lock = threading.Lock()
  6. #
  7. # @classmethod
  8. # async def __set_async_session(cls):
  9. # if not cls._async_kafka_session:
  10. # with cls._instance_lock:
  11. # producer = AIOKafkaProducer(loop=cls.get_my_event_loop(),
  12. # value_serializer=lambda v: json.dumps(v).encode('utf-8'),
  13. # bootstrap_servers=CfgBaseInit.kafka_server)
  14. # await producer.start()
  15. # cls._async_kafka_session = producer
  16. # return cls._async_kafka_session
  17. #
  18. # @classmethod
  19. # async def async_push(cls, payload):
  20. # """异步请求接口
  21. # """
  22. # request_session = cls._async_kafka_session or await cls.__set_async_session()
  23. # print(request_session)
  24. # await request_session.send_and_wait(CfgBaseInit.topic, payload)