|
@@ -0,0 +1,270 @@
|
|
|
|
+# -*- coding:utf-8 -*-
|
|
|
|
+# 配置文件初始化
|
|
|
|
+# 主要是一些 db 的初始化创建
|
|
|
|
+# 简单 Class 类函数的 初始化
|
|
|
|
+
|
|
|
|
+import asyncio
|
|
|
|
+import json
|
|
|
|
+import logging
|
|
|
|
+import os
|
|
|
|
+from concurrent.futures import ThreadPoolExecutor
|
|
|
|
+from configparser import ConfigParser
|
|
|
|
+from logging.handlers import TimedRotatingFileHandler
|
|
|
|
+
|
|
|
|
+import aredis
|
|
|
|
+import redis
|
|
|
|
+import requests
|
|
|
|
+import uvloop
|
|
|
|
+from aiokafka import AIOKafkaProducer
|
|
|
|
+from elasticsearch import Elasticsearch
|
|
|
|
+from requests import adapters
|
|
|
|
+
|
|
|
|
+adapters.DEFAULT_POOLSIZE = 100000
|
|
|
|
+
|
|
|
|
+asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class CfgBaseInit(object):
|
|
|
|
+ """
|
|
|
|
+ config: 配置文件对象
|
|
|
|
+ project_name: 项目名称
|
|
|
|
+ start_mode: 项目启动模式
|
|
|
|
+ """
|
|
|
|
+ config = ConfigParser()
|
|
|
|
+
|
|
|
|
+ config.read(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config", "config.ini"))
|
|
|
|
+ project_name = config.get("CONFIG", "project_name")
|
|
|
|
+ project_path = os.path.join(os.path.abspath(os.path.dirname(__file__)).split(project_name)[0], project_name)
|
|
|
|
+ start_model = config.get("CONFIG", "start_model").upper()
|
|
|
|
+ __authentication = config.get("CONFIG", "authentication")
|
|
|
|
+ authentication = json.loads(__authentication if "'" not in __authentication else __authentication.replace("'", '"'))
|
|
|
|
+ __headers = config.get("CONFIG", "headers")
|
|
|
|
+ headers = json.loads(__headers if "'" not in __headers else __headers.replace("'", '"'))
|
|
|
|
+ bind = config.get(section=f"PROJECT-{start_model}", option="host")
|
|
|
|
+ _executor = ThreadPoolExecutor(max_workers=50000) # 线程池
|
|
|
|
+ _async_kafka_session = None
|
|
|
|
+
|
|
|
|
+ kafka_server = config.get(f"BEHAVE-{start_model}", "kafka_server")
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def get_my_event_loop():
|
|
|
|
+ """
|
|
|
|
+ 获取当前 loop 循环事件管理对象
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ event_loop = asyncio.get_event_loop()
|
|
|
|
+ # asyncio.set_event_loop(loop=event_loop)
|
|
|
|
+ return event_loop
|
|
|
|
+
|
|
|
|
+ @classmethod
|
|
|
|
+ def executor_submit(cls, *args, **kwargs):
|
|
|
|
+ """
|
|
|
|
+ 改写 executor.submit 方法,加入默认的异常捕获功能;
|
|
|
|
+ 使用方式与 原来的 executor.submit 相同
|
|
|
|
+ :param args:
|
|
|
|
+ :param kwargs:
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ if asyncio.iscoroutinefunction(args[0]):
|
|
|
|
+ loop = asyncio.new_event_loop()
|
|
|
|
+ if not loop.is_running():
|
|
|
|
+ executor_future = loop.run_until_complete(args[0](args[1:])) # 处理 future 函数 or 对象
|
|
|
|
+ return executor_future
|
|
|
|
+ else:
|
|
|
|
+ thread_name_prefix = kwargs.get("thread_name_prefix", "")
|
|
|
|
+ if not thread_name_prefix:
|
|
|
|
+ thread_name_prefix = args[0].__name__
|
|
|
|
+ if "thread_name_prefix" in kwargs:
|
|
|
|
+ del kwargs['thread_name_prefix']
|
|
|
|
+ cls._executor._thread_name_prefix = thread_name_prefix
|
|
|
|
+ executor_future = cls._executor.submit(*args, **kwargs) # 处理 普通 function;
|
|
|
|
+ # 线程任务增加任务名称前缀,与线程名称相同.
|
|
|
|
+ # 默认为 function.__name__
|
|
|
|
+ executor_future.__setattr__("name", thread_name_prefix)
|
|
|
|
+ executor_future.add_done_callback(cls._executor_callback)
|
|
|
|
+ return executor_future
|
|
|
|
+
|
|
|
|
+ @classmethod
|
|
|
|
+ def _executor_callback(cls, worker):
|
|
|
|
+ """
|
|
|
|
+ 任务池中任务异常捕获回调函数
|
|
|
|
+ 抛出异常
|
|
|
|
+ :param worker: 任务
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ worker_exception = worker.exception()
|
|
|
|
+ if worker_exception:
|
|
|
|
+ logger.exception("Worker return exception: {}".format(worker_exception))
|
|
|
|
+ raise worker_exception
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class Logger(object):
|
|
|
|
+ # 日志
|
|
|
|
+ log_path = CfgBaseInit.config.get("LOG", "log_path")
|
|
|
|
+ when = CfgBaseInit.config.get("LOG", "when")
|
|
|
|
+ backupCount = int(CfgBaseInit.config.get("LOG", "backupCount"))
|
|
|
|
+
|
|
|
|
+ formatter = logging.Formatter(
|
|
|
|
+ '%(asctime)s %(name)s %(process)d %(thread)d %(threadName)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
|
|
|
|
+ handler = TimedRotatingFileHandler(log_path, when=when, backupCount=backupCount)
|
|
|
|
+ handler.setFormatter(formatter)
|
|
|
|
+ logger = logging.getLogger(CfgBaseInit.project_name)
|
|
|
|
+ logger.addHandler(handler)
|
|
|
|
+ logger.setLevel(logging.DEBUG)
|
|
|
|
+
|
|
|
|
+ def __init__(self, cls):
|
|
|
|
+ self._cls = cls
|
|
|
|
+
|
|
|
|
+ def __call__(self, *args, **kwargs):
|
|
|
|
+ if not hasattr(self._cls, 'logger'):
|
|
|
|
+ self._cls.logger = logger
|
|
|
|
+ return self._cls(*args, **kwargs)
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def replace_blank(msg, *args, **kwargs):
|
|
|
|
+ msg = str(msg)
|
|
|
|
+ return f"{msg % args}".replace("\r", " ").replace("\n", " ")
|
|
|
|
+
|
|
|
|
+ def debug(self, msg, *args, **kwargs):
|
|
|
|
+ self.logger.debug(self.replace_blank(msg, *args, **kwargs))
|
|
|
|
+
|
|
|
|
+ def info(self, msg, *args, **kwargs):
|
|
|
|
+ self.logger.info(self.replace_blank(msg, *args, **kwargs))
|
|
|
|
+
|
|
|
|
+ def warning(self, msg, *args, **kwargs):
|
|
|
|
+ self.logger.warning(self.replace_blank(msg, *args, **kwargs))
|
|
|
|
+
|
|
|
|
+ def error(self, msg, *args, **kwargs):
|
|
|
|
+ self.logger.error(self.replace_blank(msg, *args, **kwargs))
|
|
|
|
+
|
|
|
|
+ def exception(self, msg, *args, **kwargs):
|
|
|
|
+ self.logger.exception(self.replace_blank(msg, *args, **kwargs))
|
|
|
|
+
|
|
|
|
+ def critical(self, msg, *args, **kwargs):
|
|
|
|
+ self.logger.critical(self.replace_blank(msg, *args, **kwargs))
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+logger = Logger(Logger)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class RedisInit(CfgBaseInit):
|
|
|
|
+ """
|
|
|
|
+ Redis 对象初始化
|
|
|
|
+ """
|
|
|
|
+ section = "REDIS-{}".format(CfgBaseInit.start_model.upper())
|
|
|
|
+ host = CfgBaseInit.config.get(section=section, option="host")
|
|
|
|
+ port = CfgBaseInit.config.get(section=section, option="port")
|
|
|
|
+ db = CfgBaseInit.config.get(section=section, option="db")
|
|
|
|
+
|
|
|
|
+ host_bak = CfgBaseInit.config.get(section=section, option="host_bak")
|
|
|
|
+ port_bak = CfgBaseInit.config.get(section=section, option="port_bak")
|
|
|
|
+ db_bak = CfgBaseInit.config.get(section=section, option="db_bak")
|
|
|
|
+ Pool = redis.ConnectionPool(host=host, port=port, db=db, max_connections=None, decode_responses=True,
|
|
|
|
+ socket_keepalive=False)
|
|
|
|
+ Pool_bak = redis.ConnectionPool(host=host_bak, port=port_bak, db=db_bak, max_connections=None,
|
|
|
|
+ decode_responses=True)
|
|
|
|
+ conn = redis.Redis(connection_pool=Pool)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class AsyncRedisInit(CfgBaseInit):
|
|
|
|
+ """
|
|
|
|
+ redis 异步连接池初始化
|
|
|
|
+ pipeline_demo:
|
|
|
|
+ async def pipeline_test():
|
|
|
|
+ conn = aredis.StrictRedis(connection_pool=AsyncRedisInit.Pool)
|
|
|
|
+ p = await conn.pipeline()
|
|
|
|
+ await p.hgetall("process_cache\0015f371f42f286b2eb461e75a437162c6e@cccxx@abrs_reco@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@0@0_0")
|
|
|
|
+ a = await p.execute()
|
|
|
|
+ return a
|
|
|
|
+
|
|
|
|
+ client_demo:
|
|
|
|
+ async def conn_test():
|
|
|
|
+ conn = aredis.StrictRedis(connection_pool=AsyncRedisInit.Pool)
|
|
|
|
+ a = await conn.hgetall("process_cache\0015f371f42f286b2eb461e75a437162c6e@cccxx@abrs_reco@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@0@0_0")
|
|
|
|
+ return a
|
|
|
|
+ """
|
|
|
|
+ section = "REDIS-{}".format(CfgBaseInit.start_model.upper())
|
|
|
|
+ host = CfgBaseInit.config.get(section=section, option="host")
|
|
|
|
+ port = CfgBaseInit.config.get(section=section, option="port")
|
|
|
|
+ try:
|
|
|
|
+ password = CfgBaseInit.config.get(section=section, option="password")
|
|
|
|
+ except Exception:
|
|
|
|
+ password = None
|
|
|
|
+ db = CfgBaseInit.config.get(section=section, option="db")
|
|
|
|
+
|
|
|
|
+ host_bak = CfgBaseInit.config.get(section=section, option="host_bak")
|
|
|
|
+ port_bak = CfgBaseInit.config.get(section=section, option="port_bak")
|
|
|
|
+ try:
|
|
|
|
+ password_bak = password
|
|
|
|
+ except Exception:
|
|
|
|
+ password_bak = None
|
|
|
|
+ db_bak = CfgBaseInit.config.get(section=section, option="db_bak")
|
|
|
|
+ # todo 不能使用链接池,链接池不能自动释放 conn
|
|
|
|
+ Pool = aredis.ConnectionPool(host=host, port=port, db=db,
|
|
|
|
+ password=password, max_connections=None, decode_responses=True,
|
|
|
|
+ socket_keepalive=False)
|
|
|
|
+ Pool_bak = aredis.ConnectionPool(host=host_bak, port=port_bak, db=db_bak, password=password_bak,
|
|
|
|
+ max_connections=None, decode_responses=True,
|
|
|
|
+ socket_keepalive=False)
|
|
|
|
+ conn = aredis.StrictRedis(connection_pool=Pool)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class ElasticSearchInit(CfgBaseInit):
|
|
|
|
+ """
|
|
|
|
+ ElasticSearch 对象初始化
|
|
|
|
+ """
|
|
|
|
+ section = "ES-{}".format(CfgBaseInit.start_model.upper())
|
|
|
|
+ index = CfgBaseInit.config.get(section=section, option="index")
|
|
|
|
+ fc_index = CfgBaseInit.config.get(section=section, option="fc_index")
|
|
|
|
+ try:
|
|
|
|
+ es_timeout = float(CfgBaseInit.config.get(section=section, option="timeout"))
|
|
|
|
+ except Exception as e:
|
|
|
|
+ es_timeout = 10
|
|
|
|
+ es_client = Elasticsearch(hosts=[CfgBaseInit.config.get(section=section, option="host")], timeout=es_timeout)
|
|
|
|
+ fc_es_client = Elasticsearch(hosts=CfgBaseInit.config.get(section=section, option="fc_host").split(","),
|
|
|
|
+ timeout=es_timeout)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class BehaviorAPI(CfgBaseInit):
|
|
|
|
+ """
|
|
|
|
+ 行为数据对接接口 Behavior Api 初始化
|
|
|
|
+ """
|
|
|
|
+ section = "BEHAVIOR_API-{}".format(CfgBaseInit.start_model.upper())
|
|
|
|
+ url = CfgBaseInit.config.get(section=section, option="url")
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def push_behavior_log(data):
|
|
|
|
+ data = json.dumps(data)
|
|
|
|
+ r = requests.post(BehaviorAPI.url, data=data, headers=CfgBaseInit.headers)
|
|
|
|
+ return r
|
|
|
|
+
|
|
|
|
+class AsyncKafka(CfgBaseInit):
|
|
|
|
+ """异步请求模块
|
|
|
|
+ """
|
|
|
|
+ _async_kafka_session = None
|
|
|
|
+
|
|
|
|
+ @classmethod
|
|
|
|
+ async def __set_async_session(cls):
|
|
|
|
+ producer = AIOKafkaProducer(loop=cls.get_my_event_loop(),
|
|
|
|
+ value_serializer=lambda v: json.dumps(v).encode('utf-8'),
|
|
|
|
+ bootstrap_servers=CfgBaseInit.kafka_server)
|
|
|
|
+ await producer.start()
|
|
|
|
+ cls._async_kafka_session = producer
|
|
|
|
+ return producer
|
|
|
|
+
|
|
|
|
+ @classmethod
|
|
|
|
+ async def async_push(cls, payload):
|
|
|
|
+ """异步请求接口
|
|
|
|
+ """
|
|
|
|
+ request_session = cls._async_kafka_session or await cls.__set_async_session()
|
|
|
|
+ print(request_session)
|
|
|
|
+ await request_session.send_and_wait("eip_rec_behave", payload)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+if __name__ == '__main__':
|
|
|
|
+ a = BehaviorAPI.push_behavior_log({"a": 1})
|
|
|
|
+ print(a)
|
|
|
|
+ print(a.content)
|