# class AsyncKafka(CfgBaseInit): # """异步请求模块 # """ # _async_kafka_session = None # _instance_lock = threading.Lock() # # @classmethod # async def __set_async_session(cls): # if not cls._async_kafka_session: # with cls._instance_lock: # producer = AIOKafkaProducer(loop=cls.get_my_event_loop(), # value_serializer=lambda v: json.dumps(v).encode('utf-8'), # bootstrap_servers=CfgBaseInit.kafka_server) # await producer.start() # cls._async_kafka_session = producer # return cls._async_kafka_session # # @classmethod # async def async_push(cls, payload): # """异步请求接口 # """ # request_session = cls._async_kafka_session or await cls.__set_async_session() # print(request_session) # await request_session.send_and_wait(CfgBaseInit.topic, payload)