# -*- coding:utf-8 -*- # 配置文件初始化 # 主要是一些 db 的初始化创建 # 简单 Class 类函数的 初始化 import asyncio import json import logging import os from concurrent.futures import ThreadPoolExecutor from configparser import ConfigParser from logging.handlers import TimedRotatingFileHandler import aredis import redis import requests import uvloop from aiokafka import AIOKafkaProducer from elasticsearch import Elasticsearch from requests import adapters adapters.DEFAULT_POOLSIZE = 100000 asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) class CfgBaseInit(object): """ config: 配置文件对象 project_name: 项目名称 start_mode: 项目启动模式 """ config = ConfigParser() config.read(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config", "config.ini")) project_name = config.get("CONFIG", "project_name") project_path = os.path.join(os.path.abspath(os.path.dirname(__file__)).split(project_name)[0], project_name) start_model = config.get("CONFIG", "start_model").upper() __authentication = config.get("CONFIG", "authentication") authentication = json.loads(__authentication if "'" not in __authentication else __authentication.replace("'", '"')) __headers = config.get("CONFIG", "headers") headers = json.loads(__headers if "'" not in __headers else __headers.replace("'", '"')) bind = config.get(section=f"PROJECT-{start_model}", option="host") _executor = ThreadPoolExecutor(max_workers=50000) # 线程池 _async_kafka_session = None kafka_server = config.get(f"BEHAVE-{start_model}", "kafka_server") @staticmethod def get_my_event_loop(): """ 获取当前 loop 循环事件管理对象 :return: """ event_loop = asyncio.get_event_loop() # asyncio.set_event_loop(loop=event_loop) return event_loop @classmethod def executor_submit(cls, *args, **kwargs): """ 改写 executor.submit 方法,加入默认的异常捕获功能; 使用方式与 原来的 executor.submit 相同 :param args: :param kwargs: :return: """ if asyncio.iscoroutinefunction(args[0]): loop = asyncio.new_event_loop() if not loop.is_running(): executor_future = loop.run_until_complete(args[0](args[1:])) # 处理 future 函数 or 对象 return executor_future else: thread_name_prefix = kwargs.get("thread_name_prefix", "") if not thread_name_prefix: thread_name_prefix = args[0].__name__ if "thread_name_prefix" in kwargs: del kwargs['thread_name_prefix'] cls._executor._thread_name_prefix = thread_name_prefix executor_future = cls._executor.submit(*args, **kwargs) # 处理 普通 function; # 线程任务增加任务名称前缀,与线程名称相同. # 默认为 function.__name__ executor_future.__setattr__("name", thread_name_prefix) executor_future.add_done_callback(cls._executor_callback) return executor_future @classmethod def _executor_callback(cls, worker): """ 任务池中任务异常捕获回调函数 抛出异常 :param worker: 任务 :return: """ worker_exception = worker.exception() if worker_exception: logger.exception("Worker return exception: {}".format(worker_exception)) raise worker_exception class Logger(object): # 日志 log_path = CfgBaseInit.config.get("LOG", "log_path") when = CfgBaseInit.config.get("LOG", "when") backupCount = int(CfgBaseInit.config.get("LOG", "backupCount")) formatter = logging.Formatter( '%(asctime)s %(name)s %(process)d %(thread)d %(threadName)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s') handler = TimedRotatingFileHandler(log_path, when=when, backupCount=backupCount) handler.setFormatter(formatter) logger = logging.getLogger(CfgBaseInit.project_name) logger.addHandler(handler) logger.setLevel(logging.DEBUG) def __init__(self, cls): self._cls = cls def __call__(self, *args, **kwargs): if not hasattr(self._cls, 'logger'): self._cls.logger = logger return self._cls(*args, **kwargs) @staticmethod def replace_blank(msg, *args, **kwargs): msg = str(msg) return f"{msg % args}".replace("\r", " ").replace("\n", " ") def debug(self, msg, *args, **kwargs): self.logger.debug(self.replace_blank(msg, *args, **kwargs)) def info(self, msg, *args, **kwargs): self.logger.info(self.replace_blank(msg, *args, **kwargs)) def warning(self, msg, *args, **kwargs): self.logger.warning(self.replace_blank(msg, *args, **kwargs)) def error(self, msg, *args, **kwargs): self.logger.error(self.replace_blank(msg, *args, **kwargs)) def exception(self, msg, *args, **kwargs): self.logger.exception(self.replace_blank(msg, *args, **kwargs)) def critical(self, msg, *args, **kwargs): self.logger.critical(self.replace_blank(msg, *args, **kwargs)) logger = Logger(Logger) class RedisInit(CfgBaseInit): """ Redis 对象初始化 """ section = "REDIS-{}".format(CfgBaseInit.start_model.upper()) host = CfgBaseInit.config.get(section=section, option="host") port = CfgBaseInit.config.get(section=section, option="port") db = CfgBaseInit.config.get(section=section, option="db") host_bak = CfgBaseInit.config.get(section=section, option="host_bak") port_bak = CfgBaseInit.config.get(section=section, option="port_bak") db_bak = CfgBaseInit.config.get(section=section, option="db_bak") Pool = redis.ConnectionPool(host=host, port=port, db=db, max_connections=None, decode_responses=True, socket_keepalive=False) Pool_bak = redis.ConnectionPool(host=host_bak, port=port_bak, db=db_bak, max_connections=None, decode_responses=True) conn = redis.Redis(connection_pool=Pool) class AsyncRedisInit(CfgBaseInit): """ redis 异步连接池初始化 pipeline_demo: async def pipeline_test(): conn = aredis.StrictRedis(connection_pool=AsyncRedisInit.Pool) p = await conn.pipeline() await p.hgetall("process_cache\0015f371f42f286b2eb461e75a437162c6e@cccxx@abrs_reco@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@0@0_0") a = await p.execute() return a client_demo: async def conn_test(): conn = aredis.StrictRedis(connection_pool=AsyncRedisInit.Pool) a = await conn.hgetall("process_cache\0015f371f42f286b2eb461e75a437162c6e@cccxx@abrs_reco@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@0@0_0") return a """ section = "REDIS-{}".format(CfgBaseInit.start_model.upper()) host = CfgBaseInit.config.get(section=section, option="host") port = CfgBaseInit.config.get(section=section, option="port") try: password = CfgBaseInit.config.get(section=section, option="password") except Exception: password = None db = CfgBaseInit.config.get(section=section, option="db") host_bak = CfgBaseInit.config.get(section=section, option="host_bak") port_bak = CfgBaseInit.config.get(section=section, option="port_bak") try: password_bak = password except Exception: password_bak = None db_bak = CfgBaseInit.config.get(section=section, option="db_bak") # todo 不能使用链接池,链接池不能自动释放 conn Pool = aredis.ConnectionPool(host=host, port=port, db=db, password=password, max_connections=None, decode_responses=True, socket_keepalive=False) Pool_bak = aredis.ConnectionPool(host=host_bak, port=port_bak, db=db_bak, password=password_bak, max_connections=None, decode_responses=True, socket_keepalive=False) conn = aredis.StrictRedis(connection_pool=Pool) class ElasticSearchInit(CfgBaseInit): """ ElasticSearch 对象初始化 """ section = "ES-{}".format(CfgBaseInit.start_model.upper()) index = CfgBaseInit.config.get(section=section, option="index") fc_index = CfgBaseInit.config.get(section=section, option="fc_index") try: es_timeout = float(CfgBaseInit.config.get(section=section, option="timeout")) except Exception as e: es_timeout = 10 es_client = Elasticsearch(hosts=[CfgBaseInit.config.get(section=section, option="host")], timeout=es_timeout) fc_es_client = Elasticsearch(hosts=CfgBaseInit.config.get(section=section, option="fc_host").split(","), timeout=es_timeout) class BehaviorAPI(CfgBaseInit): """ 行为数据对接接口 Behavior Api 初始化 """ section = "BEHAVIOR_API-{}".format(CfgBaseInit.start_model.upper()) url = CfgBaseInit.config.get(section=section, option="url") @staticmethod def push_behavior_log(data): data = json.dumps(data) r = requests.post(BehaviorAPI.url, data=data, headers=CfgBaseInit.headers) return r class AsyncKafka(CfgBaseInit): """异步请求模块 """ _async_kafka_session = None @classmethod async def __set_async_session(cls): producer = AIOKafkaProducer(loop=cls.get_my_event_loop(), value_serializer=lambda v: json.dumps(v).encode('utf-8'), bootstrap_servers=CfgBaseInit.kafka_server) await producer.start() cls._async_kafka_session = producer return producer @classmethod async def async_push(cls, payload): """异步请求接口 """ request_session = cls._async_kafka_session or await cls.__set_async_session() print(request_session) await request_session.send_and_wait("eip_rec_behave", payload) if __name__ == '__main__': a = BehaviorAPI.push_behavior_log({"a": 1}) print(a) print(a.content)