# -*- coding:utf-8 -*- # 配置文件初始化 # 主要是一些 db 的初始化创建 # 简单 Class 类函数的 初始化 import asyncio import logging import os from configparser import ConfigParser from logging.handlers import TimedRotatingFileHandler from os.path import dirname # import redis as redis import uvloop from requests import adapters from sqlalchemy import create_engine, and_ from sqlalchemy.orm import sessionmaker adapters.DEFAULT_POOLSIZE = 100000 asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) class CfgBaseInit(object): """ config: 配置文件对象 project_name: 项目名称 start_mode: 项目启动模式 """ __REC_ENGINE = "REC_ENGINE" # 环境变量以REC_ENGINE开头 config = ConfigParser() config.read(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config", "config.ini")) # 遍历所有配置,保存为dict config_dict = {} for sc in config.sections(): # 循环每个Session items = config.items(sc) # 返回结果为元组 config_dict[sc] = {} for it in items: # 循环遍历 每个属性 k, v = it[0], it[1] config_value = os.getenv("_".join([__REC_ENGINE, sc, k])) # 环境变量中是否存在 if config_value: print(f"获取到环境变量:{sc}_{k}") v = config_value config_dict[sc][k] = v project_name = config.get("CONFIG", "project_name") project_path = dirname(dirname(__file__)) bind = config.get("CONFIG", "host") class Logger(object): # 日志 log_path = CfgBaseInit.config.get("LOG", "log_path") when = CfgBaseInit.config.get("LOG", "when") backupCount = int(CfgBaseInit.config.get("LOG", "backupCount")) logging.basicConfig( format='%(asctime)s %(name)s %(process)d %(thread)d %(threadName)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s') logger = logging.getLogger(CfgBaseInit.project_name) logger.setLevel(logging.DEBUG) # 输出到文件 handler = TimedRotatingFileHandler(log_path, when=when, backupCount=backupCount) logger.addHandler(handler) logger = Logger.logger class SqlEngine(CfgBaseInit): """ mysql 引擎 """ import pymysql from urllib.parse import quote_plus pymysql.install_as_MySQLdb() mysql_dict = CfgBaseInit.config_dict["MYSQL"] # 280秒重新连接一次 DBSession = sessionmaker(bind=create_engine( f"mysql://{mysql_dict['user']}:{quote_plus(mysql_dict['password'])}@{mysql_dict['host']}:{mysql_dict['port']}/{mysql_dict['db']}", pool_size=2, max_overflow=0, pool_recycle=280)) @classmethod def add(cls, item): with cls.DBSession() as se: se.add(item) se.commit() return item.id @classmethod def add_all(cls, items): with cls.DBSession() as se: se.bulk_save_objects(items) se.commit() @classmethod def query_all(cls, model, filters=None, order_bys=None, offset=None, limit=None): """ :param limit: :param offset: :param order_bys: order_by(User.name.desc() :param model: :param filters: User.name == 'James' User.name != 'James' User.name.like('%e%') User.name.in_(['Kobe', 'James']) :return: """ with cls.DBSession() as se: query = cls.get_query(filters, se, model, order_bys, offset, limit) return query.all() @classmethod def query_page(cls, model, filters, order_bys=None, offset=0, limit=10): count = cls.query_count(model, filters) res_list = cls.query_all(model, filters, order_bys, offset, limit) return { "count": count, "list": res_list } @classmethod def query_count(cls, model, filters=None): with cls.DBSession() as se: query = cls.get_query(filters, se, model) return query.count() @classmethod def query_first(cls, model, filters=None): with cls.DBSession() as se: query = cls.get_query(filters, se, model) return query.first() @classmethod def update(cls, model, filters, dict_new): with cls.DBSession() as se: query = cls.get_query(filters, se, model) res = query.update(dict_new) se.commit() return res @staticmethod def get_query(filters, se, model, order_bys=None, offset=None, limit=None): if filters is None: filters = [] if order_bys is None: order_bys = [] query = se.query(model) for _filter in filters: query = query.filter(_filter) if len(order_bys) > 0: query = query.order_by(*order_bys) if offset: query = query.offset(offset) if limit: query = query.limit(limit) return query @classmethod def delete(cls, model, filters): with cls.DBSession() as se: query = cls.get_query(filters, se, model) res = query.delete() se.commit() return res @classmethod def exec_sql(cls, sql): with cls.DBSession() as se: return se.execute(sql).fetchall() class RedisInit(CfgBaseInit): """ Redis 对象初始化 """ # __redis_dict = CfgBaseInit.config_dict["REDIS"] # __host = __redis_dict['host'] # __port = __redis_dict['port'] # __db = __redis_dict['db'] # pool = redis.ConnectionPool(host=__host, port=__port, db=__db, max_connections=None, decode_responses=True, # socket_keepalive=False) # redis = redis.Redis(connection_pool=pool)