import threading class AsyncKafka: """异步请求模块 """ __session = None __lock = threading.Lock() @classmethod async def __init_session(cls): if not cls.__session: with cls.__lock: if not cls.__session: pass # producer = AIOKafkaProducer(……) # await producer.start() # cls.__session = producer return cls.__session @classmethod async def async_push(cls, payload): if not cls.__session: await cls.__init_session() pass