123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- # -*- coding:utf-8 -*-
- # 配置文件初始化
- # 主要是一些 db 的初始化创建
- # 简单 Class 类函数的 初始化
- import asyncio
- import json
- import logging
- import os
- from concurrent.futures import ThreadPoolExecutor
- from configparser import ConfigParser
- from logging.handlers import TimedRotatingFileHandler
- import aredis
- import redis
- import requests
- import uvloop
- from aiokafka import AIOKafkaProducer
- from elasticsearch import Elasticsearch
- from requests import adapters
- adapters.DEFAULT_POOLSIZE = 100000
- asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
- class CfgBaseInit(object):
- """
- config: 配置文件对象
- project_name: 项目名称
- start_mode: 项目启动模式
- """
- config = ConfigParser()
- config.read(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config", "config.ini"))
- project_name = config.get("CONFIG", "project_name")
- project_path = os.path.join(os.path.abspath(os.path.dirname(__file__)).split(project_name)[0], project_name)
- start_model = config.get("CONFIG", "start_model").upper()
- __authentication = config.get("CONFIG", "authentication")
- authentication = json.loads(__authentication if "'" not in __authentication else __authentication.replace("'", '"'))
- __headers = config.get("CONFIG", "headers")
- headers = json.loads(__headers if "'" not in __headers else __headers.replace("'", '"'))
- bind = config.get(section=f"PROJECT-{start_model}", option="host")
- _executor = ThreadPoolExecutor(max_workers=50000) # 线程池
- _async_kafka_session = None
- kafka_server = config.get(f"BEHAVE-{start_model}", "kafka_server")
- @staticmethod
- def get_my_event_loop():
- """
- 获取当前 loop 循环事件管理对象
- :return:
- """
- event_loop = asyncio.get_event_loop()
- # asyncio.set_event_loop(loop=event_loop)
- return event_loop
- @classmethod
- def executor_submit(cls, *args, **kwargs):
- """
- 改写 executor.submit 方法,加入默认的异常捕获功能;
- 使用方式与 原来的 executor.submit 相同
- :param args:
- :param kwargs:
- :return:
- """
- if asyncio.iscoroutinefunction(args[0]):
- loop = asyncio.new_event_loop()
- if not loop.is_running():
- executor_future = loop.run_until_complete(args[0](args[1:])) # 处理 future 函数 or 对象
- return executor_future
- else:
- thread_name_prefix = kwargs.get("thread_name_prefix", "")
- if not thread_name_prefix:
- thread_name_prefix = args[0].__name__
- if "thread_name_prefix" in kwargs:
- del kwargs['thread_name_prefix']
- cls._executor._thread_name_prefix = thread_name_prefix
- executor_future = cls._executor.submit(*args, **kwargs) # 处理 普通 function;
- # 线程任务增加任务名称前缀,与线程名称相同.
- # 默认为 function.__name__
- executor_future.__setattr__("name", thread_name_prefix)
- executor_future.add_done_callback(cls._executor_callback)
- return executor_future
- @classmethod
- def _executor_callback(cls, worker):
- """
- 任务池中任务异常捕获回调函数
- 抛出异常
- :param worker: 任务
- :return:
- """
- worker_exception = worker.exception()
- if worker_exception:
- logger.exception("Worker return exception: {}".format(worker_exception))
- raise worker_exception
- class Logger(object):
- # 日志
- log_path = CfgBaseInit.config.get("LOG", "log_path")
- when = CfgBaseInit.config.get("LOG", "when")
- backupCount = int(CfgBaseInit.config.get("LOG", "backupCount"))
- logging.basicConfig(
- format='%(asctime)s %(name)s %(process)d %(thread)d %(threadName)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
- logger = logging.getLogger(CfgBaseInit.project_name)
- logger.setLevel(logging.DEBUG)
- # 输出到文件
- handler = TimedRotatingFileHandler(log_path, when=when, backupCount=backupCount)
- logger.addHandler(handler)
- logger = Logger.logger
- class RedisInit(CfgBaseInit):
- """
- Redis 对象初始化
- """
- section = "REDIS-{}".format(CfgBaseInit.start_model.upper())
- host = CfgBaseInit.config.get(section=section, option="host")
- port = CfgBaseInit.config.get(section=section, option="port")
- db = CfgBaseInit.config.get(section=section, option="db")
- host_bak = CfgBaseInit.config.get(section=section, option="host_bak")
- port_bak = CfgBaseInit.config.get(section=section, option="port_bak")
- db_bak = CfgBaseInit.config.get(section=section, option="db_bak")
- Pool = redis.ConnectionPool(host=host, port=port, db=db, max_connections=None, decode_responses=True,
- socket_keepalive=False)
- Pool_bak = redis.ConnectionPool(host=host_bak, port=port_bak, db=db_bak, max_connections=None,
- decode_responses=True)
- conn = redis.Redis(connection_pool=Pool)
- class AsyncRedisInit(CfgBaseInit):
- """
- redis 异步连接池初始化
- pipeline_demo:
- async def pipeline_test():
- conn = aredis.StrictRedis(connection_pool=AsyncRedisInit.Pool)
- p = await conn.pipeline()
- await p.hgetall("process_cache\0015f371f42f286b2eb461e75a437162c6e@cccxx@abrs_reco@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@0@0_0")
- a = await p.execute()
- return a
- client_demo:
- async def conn_test():
- conn = aredis.StrictRedis(connection_pool=AsyncRedisInit.Pool)
- a = await conn.hgetall("process_cache\0015f371f42f286b2eb461e75a437162c6e@cccxx@abrs_reco@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@0@0_0")
- return a
- """
- section = "REDIS-{}".format(CfgBaseInit.start_model.upper())
- host = CfgBaseInit.config.get(section=section, option="host")
- port = CfgBaseInit.config.get(section=section, option="port")
- try:
- password = CfgBaseInit.config.get(section=section, option="password")
- except Exception:
- password = None
- db = CfgBaseInit.config.get(section=section, option="db")
- host_bak = CfgBaseInit.config.get(section=section, option="host_bak")
- port_bak = CfgBaseInit.config.get(section=section, option="port_bak")
- try:
- password_bak = password
- except Exception:
- password_bak = None
- db_bak = CfgBaseInit.config.get(section=section, option="db_bak")
- # todo 不能使用链接池,链接池不能自动释放 conn
- Pool = aredis.ConnectionPool(host=host, port=port, db=db,
- password=password, max_connections=None, decode_responses=True,
- socket_keepalive=False)
- Pool_bak = aredis.ConnectionPool(host=host_bak, port=port_bak, db=db_bak, password=password_bak,
- max_connections=None, decode_responses=True,
- socket_keepalive=False)
- conn = aredis.StrictRedis(connection_pool=Pool)
- class ElasticSearchInit(CfgBaseInit):
- """
- ElasticSearch 对象初始化
- """
- section = "ES-{}".format(CfgBaseInit.start_model.upper())
- index = CfgBaseInit.config.get(section=section, option="index")
- fc_index = CfgBaseInit.config.get(section=section, option="fc_index")
- try:
- es_timeout = float(CfgBaseInit.config.get(section=section, option="timeout"))
- except Exception as e:
- es_timeout = 10
- es_client = Elasticsearch(hosts=[CfgBaseInit.config.get(section=section, option="host")], timeout=es_timeout)
- fc_es_client = Elasticsearch(hosts=CfgBaseInit.config.get(section=section, option="fc_host").split(","),
- timeout=es_timeout)
- class BehaviorAPI(CfgBaseInit):
- """
- 行为数据对接接口 Behavior Api 初始化
- """
- section = "BEHAVIOR_API-{}".format(CfgBaseInit.start_model.upper())
- url = CfgBaseInit.config.get(section=section, option="url")
- @staticmethod
- def push_behavior_log(data):
- data = json.dumps(data)
- r = requests.post(BehaviorAPI.url, data=data, headers=CfgBaseInit.headers)
- return r
- class AsyncKafka(CfgBaseInit):
- """异步请求模块
- """
- _async_kafka_session = None
- @classmethod
- async def __set_async_session(cls):
- producer = AIOKafkaProducer(loop=cls.get_my_event_loop(),
- value_serializer=lambda v: json.dumps(v).encode('utf-8'),
- bootstrap_servers=CfgBaseInit.kafka_server)
- await producer.start()
- cls._async_kafka_session = producer
- return producer
- @classmethod
- async def async_push(cls, payload):
- """异步请求接口
- """
- request_session = cls._async_kafka_session or await cls.__set_async_session()
- print(request_session)
- await request_session.send_and_wait("eip_rec_behave", payload)
- if __name__ == '__main__':
- a = BehaviorAPI.push_behavior_log({"a": 1})
- print(a)
- print(a.content)
|