__init__.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. # -*- coding:utf-8 -*-
  2. # 配置文件初始化
  3. # 主要是一些 db 的初始化创建
  4. # 简单 Class 类函数的 初始化
  5. import asyncio
  6. import json
  7. import logging
  8. import os
  9. from concurrent.futures import ThreadPoolExecutor
  10. from configparser import ConfigParser
  11. from logging.handlers import TimedRotatingFileHandler
  12. import aredis
  13. import redis
  14. import requests
  15. import uvloop
  16. from aiokafka import AIOKafkaProducer
  17. from elasticsearch import Elasticsearch
  18. from requests import adapters
  19. adapters.DEFAULT_POOLSIZE = 100000
  20. asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
  21. class CfgBaseInit(object):
  22. """
  23. config: 配置文件对象
  24. project_name: 项目名称
  25. start_mode: 项目启动模式
  26. """
  27. config = ConfigParser()
  28. config.read(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config", "config.ini"))
  29. project_name = config.get("CONFIG", "project_name")
  30. project_path = os.path.join(os.path.abspath(os.path.dirname(__file__)).split(project_name)[0], project_name)
  31. start_model = config.get("CONFIG", "start_model").upper()
  32. __authentication = config.get("CONFIG", "authentication")
  33. authentication = json.loads(__authentication if "'" not in __authentication else __authentication.replace("'", '"'))
  34. __headers = config.get("CONFIG", "headers")
  35. headers = json.loads(__headers if "'" not in __headers else __headers.replace("'", '"'))
  36. bind = config.get(section=f"PROJECT-{start_model}", option="host")
  37. _executor = ThreadPoolExecutor(max_workers=50000) # 线程池
  38. _async_kafka_session = None
  39. kafka_server = config.get(f"BEHAVE-{start_model}", "kafka_server")
  40. @staticmethod
  41. def get_my_event_loop():
  42. """
  43. 获取当前 loop 循环事件管理对象
  44. :return:
  45. """
  46. event_loop = asyncio.get_event_loop()
  47. # asyncio.set_event_loop(loop=event_loop)
  48. return event_loop
  49. @classmethod
  50. def executor_submit(cls, *args, **kwargs):
  51. """
  52. 改写 executor.submit 方法,加入默认的异常捕获功能;
  53. 使用方式与 原来的 executor.submit 相同
  54. :param args:
  55. :param kwargs:
  56. :return:
  57. """
  58. if asyncio.iscoroutinefunction(args[0]):
  59. loop = asyncio.new_event_loop()
  60. if not loop.is_running():
  61. executor_future = loop.run_until_complete(args[0](args[1:])) # 处理 future 函数 or 对象
  62. return executor_future
  63. else:
  64. thread_name_prefix = kwargs.get("thread_name_prefix", "")
  65. if not thread_name_prefix:
  66. thread_name_prefix = args[0].__name__
  67. if "thread_name_prefix" in kwargs:
  68. del kwargs['thread_name_prefix']
  69. cls._executor._thread_name_prefix = thread_name_prefix
  70. executor_future = cls._executor.submit(*args, **kwargs) # 处理 普通 function;
  71. # 线程任务增加任务名称前缀,与线程名称相同.
  72. # 默认为 function.__name__
  73. executor_future.__setattr__("name", thread_name_prefix)
  74. executor_future.add_done_callback(cls._executor_callback)
  75. return executor_future
  76. @classmethod
  77. def _executor_callback(cls, worker):
  78. """
  79. 任务池中任务异常捕获回调函数
  80. 抛出异常
  81. :param worker: 任务
  82. :return:
  83. """
  84. worker_exception = worker.exception()
  85. if worker_exception:
  86. logger.exception("Worker return exception: {}".format(worker_exception))
  87. raise worker_exception
  88. class Logger(object):
  89. # 日志
  90. log_path = CfgBaseInit.config.get("LOG", "log_path")
  91. when = CfgBaseInit.config.get("LOG", "when")
  92. backupCount = int(CfgBaseInit.config.get("LOG", "backupCount"))
  93. formatter = logging.Formatter(
  94. '%(asctime)s %(name)s %(process)d %(thread)d %(threadName)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
  95. handler = TimedRotatingFileHandler(log_path, when=when, backupCount=backupCount)
  96. handler.setFormatter(formatter)
  97. logger = logging.getLogger(CfgBaseInit.project_name)
  98. logger.addHandler(handler)
  99. logger.setLevel(logging.DEBUG)
  100. def __init__(self, cls):
  101. self._cls = cls
  102. def __call__(self, *args, **kwargs):
  103. if not hasattr(self._cls, 'logger'):
  104. self._cls.logger = logger
  105. return self._cls(*args, **kwargs)
  106. @staticmethod
  107. def replace_blank(msg, *args, **kwargs):
  108. msg = str(msg)
  109. return f"{msg % args}".replace("\r", " ").replace("\n", " ")
  110. def debug(self, msg, *args, **kwargs):
  111. self.logger.debug(self.replace_blank(msg, *args, **kwargs))
  112. def info(self, msg, *args, **kwargs):
  113. self.logger.info(self.replace_blank(msg, *args, **kwargs))
  114. def warning(self, msg, *args, **kwargs):
  115. self.logger.warning(self.replace_blank(msg, *args, **kwargs))
  116. def error(self, msg, *args, **kwargs):
  117. self.logger.error(self.replace_blank(msg, *args, **kwargs))
  118. def exception(self, msg, *args, **kwargs):
  119. self.logger.exception(self.replace_blank(msg, *args, **kwargs))
  120. def critical(self, msg, *args, **kwargs):
  121. self.logger.critical(self.replace_blank(msg, *args, **kwargs))
  122. logger = Logger(Logger)
  123. class RedisInit(CfgBaseInit):
  124. """
  125. Redis 对象初始化
  126. """
  127. section = "REDIS-{}".format(CfgBaseInit.start_model.upper())
  128. host = CfgBaseInit.config.get(section=section, option="host")
  129. port = CfgBaseInit.config.get(section=section, option="port")
  130. db = CfgBaseInit.config.get(section=section, option="db")
  131. host_bak = CfgBaseInit.config.get(section=section, option="host_bak")
  132. port_bak = CfgBaseInit.config.get(section=section, option="port_bak")
  133. db_bak = CfgBaseInit.config.get(section=section, option="db_bak")
  134. Pool = redis.ConnectionPool(host=host, port=port, db=db, max_connections=None, decode_responses=True,
  135. socket_keepalive=False)
  136. Pool_bak = redis.ConnectionPool(host=host_bak, port=port_bak, db=db_bak, max_connections=None,
  137. decode_responses=True)
  138. conn = redis.Redis(connection_pool=Pool)
  139. class AsyncRedisInit(CfgBaseInit):
  140. """
  141. redis 异步连接池初始化
  142. pipeline_demo:
  143. async def pipeline_test():
  144. conn = aredis.StrictRedis(connection_pool=AsyncRedisInit.Pool)
  145. p = await conn.pipeline()
  146. await p.hgetall("process_cache\0015f371f42f286b2eb461e75a437162c6e@cccxx@abrs_reco@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@0@0_0")
  147. a = await p.execute()
  148. return a
  149. client_demo:
  150. async def conn_test():
  151. conn = aredis.StrictRedis(connection_pool=AsyncRedisInit.Pool)
  152. a = await conn.hgetall("process_cache\0015f371f42f286b2eb461e75a437162c6e@cccxx@abrs_reco@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@0@0_0")
  153. return a
  154. """
  155. section = "REDIS-{}".format(CfgBaseInit.start_model.upper())
  156. host = CfgBaseInit.config.get(section=section, option="host")
  157. port = CfgBaseInit.config.get(section=section, option="port")
  158. try:
  159. password = CfgBaseInit.config.get(section=section, option="password")
  160. except Exception:
  161. password = None
  162. db = CfgBaseInit.config.get(section=section, option="db")
  163. host_bak = CfgBaseInit.config.get(section=section, option="host_bak")
  164. port_bak = CfgBaseInit.config.get(section=section, option="port_bak")
  165. try:
  166. password_bak = password
  167. except Exception:
  168. password_bak = None
  169. db_bak = CfgBaseInit.config.get(section=section, option="db_bak")
  170. # todo 不能使用链接池,链接池不能自动释放 conn
  171. Pool = aredis.ConnectionPool(host=host, port=port, db=db,
  172. password=password, max_connections=None, decode_responses=True,
  173. socket_keepalive=False)
  174. Pool_bak = aredis.ConnectionPool(host=host_bak, port=port_bak, db=db_bak, password=password_bak,
  175. max_connections=None, decode_responses=True,
  176. socket_keepalive=False)
  177. conn = aredis.StrictRedis(connection_pool=Pool)
  178. class ElasticSearchInit(CfgBaseInit):
  179. """
  180. ElasticSearch 对象初始化
  181. """
  182. section = "ES-{}".format(CfgBaseInit.start_model.upper())
  183. index = CfgBaseInit.config.get(section=section, option="index")
  184. fc_index = CfgBaseInit.config.get(section=section, option="fc_index")
  185. try:
  186. es_timeout = float(CfgBaseInit.config.get(section=section, option="timeout"))
  187. except Exception as e:
  188. es_timeout = 10
  189. es_client = Elasticsearch(hosts=[CfgBaseInit.config.get(section=section, option="host")], timeout=es_timeout)
  190. fc_es_client = Elasticsearch(hosts=CfgBaseInit.config.get(section=section, option="fc_host").split(","),
  191. timeout=es_timeout)
  192. class BehaviorAPI(CfgBaseInit):
  193. """
  194. 行为数据对接接口 Behavior Api 初始化
  195. """
  196. section = "BEHAVIOR_API-{}".format(CfgBaseInit.start_model.upper())
  197. url = CfgBaseInit.config.get(section=section, option="url")
  198. @staticmethod
  199. def push_behavior_log(data):
  200. data = json.dumps(data)
  201. r = requests.post(BehaviorAPI.url, data=data, headers=CfgBaseInit.headers)
  202. return r
  203. class AsyncKafka(CfgBaseInit):
  204. """异步请求模块
  205. """
  206. _async_kafka_session = None
  207. @classmethod
  208. async def __set_async_session(cls):
  209. producer = AIOKafkaProducer(loop=cls.get_my_event_loop(),
  210. value_serializer=lambda v: json.dumps(v).encode('utf-8'),
  211. bootstrap_servers=CfgBaseInit.kafka_server)
  212. await producer.start()
  213. cls._async_kafka_session = producer
  214. return producer
  215. @classmethod
  216. async def async_push(cls, payload):
  217. """异步请求接口
  218. """
  219. request_session = cls._async_kafka_session or await cls.__set_async_session()
  220. print(request_session)
  221. await request_session.send_and_wait("eip_rec_behave", payload)
  222. if __name__ == '__main__':
  223. a = BehaviorAPI.push_behavior_log({"a": 1})
  224. print(a)
  225. print(a.content)