__init__.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. # -*- coding:utf-8 -*-
  2. # 配置文件初始化
  3. # 主要是一些 db 的初始化创建
  4. # 简单 Class 类函数的 初始化
  5. import asyncio
  6. import json
  7. import logging
  8. import os
  9. from concurrent.futures import ThreadPoolExecutor
  10. from configparser import ConfigParser
  11. from logging.handlers import TimedRotatingFileHandler
  12. import aredis
  13. import redis
  14. import requests
  15. import uvloop
  16. from aiokafka import AIOKafkaProducer
  17. from elasticsearch import Elasticsearch
  18. from requests import adapters
  19. adapters.DEFAULT_POOLSIZE = 100000
  20. asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
  21. class CfgBaseInit(object):
  22. """
  23. config: 配置文件对象
  24. project_name: 项目名称
  25. start_mode: 项目启动模式
  26. """
  27. config = ConfigParser()
  28. config.read(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config", "config.ini"))
  29. project_name = config.get("CONFIG", "project_name")
  30. project_path = os.path.join(os.path.abspath(os.path.dirname(__file__)).split(project_name)[0], project_name)
  31. start_model = config.get("CONFIG", "start_model").upper()
  32. __authentication = config.get("CONFIG", "authentication")
  33. authentication = json.loads(__authentication if "'" not in __authentication else __authentication.replace("'", '"'))
  34. __headers = config.get("CONFIG", "headers")
  35. headers = json.loads(__headers if "'" not in __headers else __headers.replace("'", '"'))
  36. bind = config.get(section=f"PROJECT-{start_model}", option="host")
  37. _executor = ThreadPoolExecutor(max_workers=50000) # 线程池
  38. _async_kafka_session = None
  39. kafka_server = config.get(f"BEHAVE-{start_model}", "kafka_server")
  40. @staticmethod
  41. def get_my_event_loop():
  42. """
  43. 获取当前 loop 循环事件管理对象
  44. :return:
  45. """
  46. event_loop = asyncio.get_event_loop()
  47. # asyncio.set_event_loop(loop=event_loop)
  48. return event_loop
  49. @classmethod
  50. def executor_submit(cls, *args, **kwargs):
  51. """
  52. 改写 executor.submit 方法,加入默认的异常捕获功能;
  53. 使用方式与 原来的 executor.submit 相同
  54. :param args:
  55. :param kwargs:
  56. :return:
  57. """
  58. if asyncio.iscoroutinefunction(args[0]):
  59. loop = asyncio.new_event_loop()
  60. if not loop.is_running():
  61. executor_future = loop.run_until_complete(args[0](args[1:])) # 处理 future 函数 or 对象
  62. return executor_future
  63. else:
  64. thread_name_prefix = kwargs.get("thread_name_prefix", "")
  65. if not thread_name_prefix:
  66. thread_name_prefix = args[0].__name__
  67. if "thread_name_prefix" in kwargs:
  68. del kwargs['thread_name_prefix']
  69. cls._executor._thread_name_prefix = thread_name_prefix
  70. executor_future = cls._executor.submit(*args, **kwargs) # 处理 普通 function;
  71. # 线程任务增加任务名称前缀,与线程名称相同.
  72. # 默认为 function.__name__
  73. executor_future.__setattr__("name", thread_name_prefix)
  74. executor_future.add_done_callback(cls._executor_callback)
  75. return executor_future
  76. @classmethod
  77. def _executor_callback(cls, worker):
  78. """
  79. 任务池中任务异常捕获回调函数
  80. 抛出异常
  81. :param worker: 任务
  82. :return:
  83. """
  84. worker_exception = worker.exception()
  85. if worker_exception:
  86. logger.exception("Worker return exception: {}".format(worker_exception))
  87. raise worker_exception
  88. class Logger(object):
  89. # 日志
  90. log_path = CfgBaseInit.config.get("LOG", "log_path")
  91. when = CfgBaseInit.config.get("LOG", "when")
  92. backupCount = int(CfgBaseInit.config.get("LOG", "backupCount"))
  93. logging.basicConfig(
  94. format='%(asctime)s %(name)s %(process)d %(thread)d %(threadName)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
  95. logger = logging.getLogger(CfgBaseInit.project_name)
  96. logger.setLevel(logging.DEBUG)
  97. # 输出到文件
  98. handler = TimedRotatingFileHandler(log_path, when=when, backupCount=backupCount)
  99. logger.addHandler(handler)
  100. logger = Logger.logger
  101. class RedisInit(CfgBaseInit):
  102. """
  103. Redis 对象初始化
  104. """
  105. section = "REDIS-{}".format(CfgBaseInit.start_model.upper())
  106. host = CfgBaseInit.config.get(section=section, option="host")
  107. port = CfgBaseInit.config.get(section=section, option="port")
  108. db = CfgBaseInit.config.get(section=section, option="db")
  109. host_bak = CfgBaseInit.config.get(section=section, option="host_bak")
  110. port_bak = CfgBaseInit.config.get(section=section, option="port_bak")
  111. db_bak = CfgBaseInit.config.get(section=section, option="db_bak")
  112. Pool = redis.ConnectionPool(host=host, port=port, db=db, max_connections=None, decode_responses=True,
  113. socket_keepalive=False)
  114. Pool_bak = redis.ConnectionPool(host=host_bak, port=port_bak, db=db_bak, max_connections=None,
  115. decode_responses=True)
  116. conn = redis.Redis(connection_pool=Pool)
  117. class AsyncRedisInit(CfgBaseInit):
  118. """
  119. redis 异步连接池初始化
  120. pipeline_demo:
  121. async def pipeline_test():
  122. conn = aredis.StrictRedis(connection_pool=AsyncRedisInit.Pool)
  123. p = await conn.pipeline()
  124. await p.hgetall("process_cache\0015f371f42f286b2eb461e75a437162c6e@cccxx@abrs_reco@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@0@0_0")
  125. a = await p.execute()
  126. return a
  127. client_demo:
  128. async def conn_test():
  129. conn = aredis.StrictRedis(connection_pool=AsyncRedisInit.Pool)
  130. a = await conn.hgetall("process_cache\0015f371f42f286b2eb461e75a437162c6e@cccxx@abrs_reco@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@d751713988987e9331980363e24189ce@0@0_0")
  131. return a
  132. """
  133. section = "REDIS-{}".format(CfgBaseInit.start_model.upper())
  134. host = CfgBaseInit.config.get(section=section, option="host")
  135. port = CfgBaseInit.config.get(section=section, option="port")
  136. try:
  137. password = CfgBaseInit.config.get(section=section, option="password")
  138. except Exception:
  139. password = None
  140. db = CfgBaseInit.config.get(section=section, option="db")
  141. host_bak = CfgBaseInit.config.get(section=section, option="host_bak")
  142. port_bak = CfgBaseInit.config.get(section=section, option="port_bak")
  143. try:
  144. password_bak = password
  145. except Exception:
  146. password_bak = None
  147. db_bak = CfgBaseInit.config.get(section=section, option="db_bak")
  148. # todo 不能使用链接池,链接池不能自动释放 conn
  149. Pool = aredis.ConnectionPool(host=host, port=port, db=db,
  150. password=password, max_connections=None, decode_responses=True,
  151. socket_keepalive=False)
  152. Pool_bak = aredis.ConnectionPool(host=host_bak, port=port_bak, db=db_bak, password=password_bak,
  153. max_connections=None, decode_responses=True,
  154. socket_keepalive=False)
  155. conn = aredis.StrictRedis(connection_pool=Pool)
  156. class ElasticSearchInit(CfgBaseInit):
  157. """
  158. ElasticSearch 对象初始化
  159. """
  160. section = "ES-{}".format(CfgBaseInit.start_model.upper())
  161. index = CfgBaseInit.config.get(section=section, option="index")
  162. fc_index = CfgBaseInit.config.get(section=section, option="fc_index")
  163. try:
  164. es_timeout = float(CfgBaseInit.config.get(section=section, option="timeout"))
  165. except Exception as e:
  166. es_timeout = 10
  167. es_client = Elasticsearch(hosts=[CfgBaseInit.config.get(section=section, option="host")], timeout=es_timeout)
  168. fc_es_client = Elasticsearch(hosts=CfgBaseInit.config.get(section=section, option="fc_host").split(","),
  169. timeout=es_timeout)
  170. class BehaviorAPI(CfgBaseInit):
  171. """
  172. 行为数据对接接口 Behavior Api 初始化
  173. """
  174. section = "BEHAVIOR_API-{}".format(CfgBaseInit.start_model.upper())
  175. url = CfgBaseInit.config.get(section=section, option="url")
  176. @staticmethod
  177. def push_behavior_log(data):
  178. data = json.dumps(data)
  179. r = requests.post(BehaviorAPI.url, data=data, headers=CfgBaseInit.headers)
  180. return r
  181. class AsyncKafka(CfgBaseInit):
  182. """异步请求模块
  183. """
  184. _async_kafka_session = None
  185. @classmethod
  186. async def __set_async_session(cls):
  187. producer = AIOKafkaProducer(loop=cls.get_my_event_loop(),
  188. value_serializer=lambda v: json.dumps(v).encode('utf-8'),
  189. bootstrap_servers=CfgBaseInit.kafka_server)
  190. await producer.start()
  191. cls._async_kafka_session = producer
  192. return producer
  193. @classmethod
  194. async def async_push(cls, payload):
  195. """异步请求接口
  196. """
  197. request_session = cls._async_kafka_session or await cls.__set_async_session()
  198. print(request_session)
  199. await request_session.send_and_wait("eip_rec_behave", payload)
  200. if __name__ == '__main__':
  201. a = BehaviorAPI.push_behavior_log({"a": 1})
  202. print(a)
  203. print(a.content)