|
@@ -10,13 +10,9 @@ import os
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
from configparser import ConfigParser
|
|
|
from logging.handlers import TimedRotatingFileHandler
|
|
|
+from os.path import dirname
|
|
|
|
|
|
-import aredis
|
|
|
-import redis
|
|
|
-import requests
|
|
|
import uvloop
|
|
|
-from aiokafka import AIOKafkaProducer
|
|
|
-from elasticsearch import Elasticsearch
|
|
|
from requests import adapters
|
|
|
|
|
|
adapters.DEFAULT_POOLSIZE = 100000
|
|
@@ -31,71 +27,11 @@ class CfgBaseInit(object):
|
|
|
start_mode: 项目启动模式
|
|
|
"""
|
|
|
config = ConfigParser()
|
|
|
-
|
|
|
config.read(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config", "config.ini"))
|
|
|
- project_name = config.get("CONFIG", "project_name")
|
|
|
- project_path = os.path.join(os.path.abspath(os.path.dirname(__file__)).split(project_name)[0], project_name)
|
|
|
- start_model = config.get("CONFIG", "start_model").upper()
|
|
|
- __authentication = config.get("CONFIG", "authentication")
|
|
|
- authentication = json.loads(__authentication if "'" not in __authentication else __authentication.replace("'", '"'))
|
|
|
- __headers = config.get("CONFIG", "headers")
|
|
|
- headers = json.loads(__headers if "'" not in __headers else __headers.replace("'", '"'))
|
|
|
- bind = config.get(section=f"PROJECT-{start_model}", option="host")
|
|
|
- _executor = ThreadPoolExecutor(max_workers=50000) # 线程池
|
|
|
- _async_kafka_session = None
|
|
|
-
|
|
|
- kafka_server = config.get(f"BEHAVE-{start_model}", "kafka_server")
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def get_my_event_loop():
|
|
|
- """
|
|
|
- 获取当前 loop 循环事件管理对象
|
|
|
- :return:
|
|
|
- """
|
|
|
- event_loop = asyncio.get_event_loop()
|
|
|
- # asyncio.set_event_loop(loop=event_loop)
|
|
|
- return event_loop
|
|
|
-
|
|
|
- @classmethod
|
|
|
- def executor_submit(cls, *args, **kwargs):
|
|
|
- """
|
|
|
- 改写 executor.submit 方法,加入默认的异常捕获功能;
|
|
|
- 使用方式与 原来的 executor.submit 相同
|
|
|
- :param args:
|
|
|
- :param kwargs:
|
|
|
- :return:
|
|
|
- """
|
|
|
- if asyncio.iscoroutinefunction(args[0]):
|
|
|
- loop = asyncio.new_event_loop()
|
|
|
- if not loop.is_running():
|
|
|
- executor_future = loop.run_until_complete(args[0](args[1:])) # 处理 future 函数 or 对象
|
|
|
- return executor_future
|
|
|
- else:
|
|
|
- thread_name_prefix = kwargs.get("thread_name_prefix", "")
|
|
|
- if not thread_name_prefix:
|
|
|
- thread_name_prefix = args[0].__name__
|
|
|
- if "thread_name_prefix" in kwargs:
|
|
|
- del kwargs['thread_name_prefix']
|
|
|
- cls._executor._thread_name_prefix = thread_name_prefix
|
|
|
- executor_future = cls._executor.submit(*args, **kwargs) # 处理 普通 function;
|
|
|
- # 线程任务增加任务名称前缀,与线程名称相同.
|
|
|
- # 默认为 function.__name__
|
|
|
- executor_future.__setattr__("name", thread_name_prefix)
|
|
|
- executor_future.add_done_callback(cls._executor_callback)
|
|
|
- return executor_future
|
|
|
|
|
|
- @classmethod
|
|
|
- def _executor_callback(cls, worker):
|
|
|
- """
|
|
|
- 任务池中任务异常捕获回调函数
|
|
|
- 抛出异常
|
|
|
- :param worker: 任务
|
|
|
- :return:
|
|
|
- """
|
|
|
- worker_exception = worker.exception()
|
|
|
- if worker_exception:
|
|
|
- logger.exception("Worker return exception: {}".format(worker_exception))
|
|
|
- raise worker_exception
|
|
|
+ project_name = config.get("CONFIG", "project_name")
|
|
|
+ project_path = dirname(dirname(__file__))
|
|
|
+ bind = config.get("CONFIG", "host")
|
|
|
|
|
|
|
|
|
class Logger(object):
|
|
@@ -115,124 +51,3 @@ class Logger(object):
|
|
|
|
|
|
|
|
|
logger = Logger.logger
|
|
|
-
|
|
|
-
|
|
|
-class RedisInit(CfgBaseInit):
|
|
|
- """
|
|
|
- Redis 对象初始化
|
|
|
- """
|
|
|
- section = "REDIS-{}".format(CfgBaseInit.start_model.upper())
|
|
|
- host = CfgBaseInit.config.get(section=section, option="host")
|
|
|
- port = CfgBaseInit.config.get(section=section, option="port")
|
|
|
- db = CfgBaseInit.config.get(section=section, option="db")
|
|
|
-
|
|
|
- host_bak = CfgBaseInit.config.get(section=section, option="host_bak")
|
|
|
- port_bak = CfgBaseInit.config.get(section=section, option="port_bak")
|
|
|
- db_bak = CfgBaseInit.config.get(section=section, option="db_bak")
|
|
|
- Pool = redis.ConnectionPool(host=host, port=port, db=db, max_connections=None, decode_responses=True,
|
|
|
- socket_keepalive=False)
|
|
|
- Pool_bak = redis.ConnectionPool(host=host_bak, port=port_bak, db=db_bak, max_connections=None,
|
|
|
- decode_responses=True)
|
|
|
- conn = redis.Redis(connection_pool=Pool)
|
|
|
-
|
|
|
-
|
|
|
-class AsyncRedisInit(CfgBaseInit):
|
|
|
- """
|
|
|
- redis 异步连接池初始化
|
|
|
- pipeline_demo:
|
|
|
- async def pipeline_test():
|
|
|
- conn = aredis.StrictRedis(connection_pool=AsyncRedisInit.Pool)
|
|
|
- p = await conn.pipeline()
|
|
|
- await p.hgetall("process_cache\0015f371f42f286b2eb461e75a437162c6e@cccxx@abrs_reco@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@0@0_0")
|
|
|
- a = await p.execute()
|
|
|
- return a
|
|
|
-
|
|
|
- client_demo:
|
|
|
- async def conn_test():
|
|
|
- conn = aredis.StrictRedis(connection_pool=AsyncRedisInit.Pool)
|
|
|
- a = await conn.hgetall("process_cache\0015f371f42f286b2eb461e75a437162c6e@cccxx@abrs_reco@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@0@0_0")
|
|
|
- return a
|
|
|
- """
|
|
|
- section = "REDIS-{}".format(CfgBaseInit.start_model.upper())
|
|
|
- host = CfgBaseInit.config.get(section=section, option="host")
|
|
|
- port = CfgBaseInit.config.get(section=section, option="port")
|
|
|
- try:
|
|
|
- password = CfgBaseInit.config.get(section=section, option="password")
|
|
|
- except Exception:
|
|
|
- password = None
|
|
|
- db = CfgBaseInit.config.get(section=section, option="db")
|
|
|
-
|
|
|
- host_bak = CfgBaseInit.config.get(section=section, option="host_bak")
|
|
|
- port_bak = CfgBaseInit.config.get(section=section, option="port_bak")
|
|
|
- try:
|
|
|
- password_bak = password
|
|
|
- except Exception:
|
|
|
- password_bak = None
|
|
|
- db_bak = CfgBaseInit.config.get(section=section, option="db_bak")
|
|
|
- # todo 不能使用链接池,链接池不能自动释放 conn
|
|
|
- Pool = aredis.ConnectionPool(host=host, port=port, db=db,
|
|
|
- password=password, max_connections=None, decode_responses=True,
|
|
|
- socket_keepalive=False)
|
|
|
- Pool_bak = aredis.ConnectionPool(host=host_bak, port=port_bak, db=db_bak, password=password_bak,
|
|
|
- max_connections=None, decode_responses=True,
|
|
|
- socket_keepalive=False)
|
|
|
- conn = aredis.StrictRedis(connection_pool=Pool)
|
|
|
-
|
|
|
-
|
|
|
-class ElasticSearchInit(CfgBaseInit):
|
|
|
- """
|
|
|
- ElasticSearch 对象初始化
|
|
|
- """
|
|
|
- section = "ES-{}".format(CfgBaseInit.start_model.upper())
|
|
|
- index = CfgBaseInit.config.get(section=section, option="index")
|
|
|
- fc_index = CfgBaseInit.config.get(section=section, option="fc_index")
|
|
|
- try:
|
|
|
- es_timeout = float(CfgBaseInit.config.get(section=section, option="timeout"))
|
|
|
- except Exception as e:
|
|
|
- es_timeout = 10
|
|
|
- es_client = Elasticsearch(hosts=[CfgBaseInit.config.get(section=section, option="host")], timeout=es_timeout)
|
|
|
- fc_es_client = Elasticsearch(hosts=CfgBaseInit.config.get(section=section, option="fc_host").split(","),
|
|
|
- timeout=es_timeout)
|
|
|
-
|
|
|
-
|
|
|
-class BehaviorAPI(CfgBaseInit):
|
|
|
- """
|
|
|
- 行为数据对接接口 Behavior Api 初始化
|
|
|
- """
|
|
|
- section = "BEHAVIOR_API-{}".format(CfgBaseInit.start_model.upper())
|
|
|
- url = CfgBaseInit.config.get(section=section, option="url")
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def push_behavior_log(data):
|
|
|
- data = json.dumps(data)
|
|
|
- r = requests.post(BehaviorAPI.url, data=data, headers=CfgBaseInit.headers)
|
|
|
- return r
|
|
|
-
|
|
|
-
|
|
|
-class AsyncKafka(CfgBaseInit):
|
|
|
- """异步请求模块
|
|
|
- """
|
|
|
- _async_kafka_session = None
|
|
|
-
|
|
|
- @classmethod
|
|
|
- async def __set_async_session(cls):
|
|
|
- producer = AIOKafkaProducer(loop=cls.get_my_event_loop(),
|
|
|
- value_serializer=lambda v: json.dumps(v).encode('utf-8'),
|
|
|
- bootstrap_servers=CfgBaseInit.kafka_server)
|
|
|
- await producer.start()
|
|
|
- cls._async_kafka_session = producer
|
|
|
- return producer
|
|
|
-
|
|
|
- @classmethod
|
|
|
- async def async_push(cls, payload):
|
|
|
- """异步请求接口
|
|
|
- """
|
|
|
- request_session = cls._async_kafka_session or await cls.__set_async_session()
|
|
|
- print(request_session)
|
|
|
- await request_session.send_and_wait("eip_rec_behave", payload)
|
|
|
-
|
|
|
-
|
|
|
-if __name__ == '__main__':
|
|
|
- a = BehaviorAPI.push_behavior_log({"a": 1})
|
|
|
- print(a)
|
|
|
- print(a.content)
|