123456789101112131415161718192021222324 |
- # class AsyncKafka(CfgBaseInit):
- # """异步请求模块
- # """
- # _async_kafka_session = None
- # _instance_lock = threading.Lock()
- #
- # @classmethod
- # async def __set_async_session(cls):
- # if not cls._async_kafka_session:
- # with cls._instance_lock:
- # producer = AIOKafkaProducer(loop=cls.get_my_event_loop(),
- # value_serializer=lambda v: json.dumps(v).encode('utf-8'),
- # bootstrap_servers=CfgBaseInit.kafka_server)
- # await producer.start()
- # cls._async_kafka_session = producer
- # return cls._async_kafka_session
- #
- # @classmethod
- # async def async_push(cls, payload):
- # """异步请求接口
- # """
- # request_session = cls._async_kafka_session or await cls.__set_async_session()
- # print(request_session)
- # await request_session.send_and_wait(CfgBaseInit.topic, payload)
|