123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- # -*- coding:utf-8 -*-
- # 配置文件初始化
- # 主要是一些 db 的初始化创建
- # 简单 Class 类函数的 初始化
- import asyncio
- import logging
- import os
- from configparser import ConfigParser
- from logging.handlers import TimedRotatingFileHandler
- from os.path import dirname
- # import redis as redis
- import uvloop
- from requests import adapters
- from sqlalchemy import create_engine, and_
- from sqlalchemy.orm import sessionmaker
- adapters.DEFAULT_POOLSIZE = 100000
- asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
- class CfgBaseInit(object):
- """
- config: 配置文件对象
- project_name: 项目名称
- start_mode: 项目启动模式
- """
- __REC_ENGINE = "REC_ENGINE" # 环境变量以REC_ENGINE开头
- config = ConfigParser()
- config.read(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config", "config.ini"))
- # 遍历所有配置,保存为dict
- config_dict = {}
- for sc in config.sections(): # 循环每个Session
- items = config.items(sc) # 返回结果为元组
- config_dict[sc] = {}
- for it in items: # 循环遍历 每个属性
- k, v = it[0], it[1]
- config_value = os.getenv("_".join([__REC_ENGINE, sc, k])) # 环境变量中是否存在
- if config_value:
- print(f"获取到环境变量:{sc}_{k}")
- v = config_value
- config_dict[sc][k] = v
- project_name = config.get("CONFIG", "project_name")
- project_path = dirname(dirname(__file__))
- bind = config.get("CONFIG", "host")
- class Logger(object):
- # 日志
- log_path = CfgBaseInit.config.get("LOG", "log_path")
- when = CfgBaseInit.config.get("LOG", "when")
- backupCount = int(CfgBaseInit.config.get("LOG", "backupCount"))
- logging.basicConfig(
- format='%(asctime)s %(name)s %(process)d %(thread)d %(threadName)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
- logger = logging.getLogger(CfgBaseInit.project_name)
- logger.setLevel(logging.DEBUG)
- # 输出到文件
- handler = TimedRotatingFileHandler(log_path, when=when, backupCount=backupCount)
- logger.addHandler(handler)
- logger = Logger.logger
- class SqlEngine(CfgBaseInit):
- """ mysql 引擎
- """
- import pymysql
- from urllib.parse import quote_plus
- pymysql.install_as_MySQLdb()
- mysql_dict = CfgBaseInit.config_dict["MYSQL"]
- # 280秒重新连接一次
- DBSession = sessionmaker(bind=create_engine(
- f"mysql://{mysql_dict['user']}:{quote_plus(mysql_dict['password'])}@{mysql_dict['host']}:{mysql_dict['port']}/{mysql_dict['db']}",
- pool_size=2, max_overflow=0, pool_recycle=280))
- @classmethod
- def add(cls, item):
- with cls.DBSession() as se:
- se.add(item)
- se.commit()
- return item.id
- @classmethod
- def add_all(cls, items):
- with cls.DBSession() as se:
- se.bulk_save_objects(items)
- se.commit()
- @classmethod
- def query_all(cls, model, filters=None, order_bys=None, offset=None, limit=None):
- """
- :param limit:
- :param offset:
- :param order_bys:
- order_by(User.name.desc()
- :param model:
- :param filters:
- User.name=='James'
- User.name.like('%e%')
- User.name.in_(['Kobe', 'James'])
- :return:
- """
- with cls.DBSession() as se:
- query = cls.get_query(filters, se, model, order_bys, offset, limit)
- return query.all()
- @classmethod
- def query_page(cls, model, filters, order_bys=None, offset=0, limit=10):
- count = cls.query_count(model, filters)
- res_list = cls.query_all(model, filters, order_bys, offset, limit)
- return {
- "count": count,
- "list": res_list
- }
- @classmethod
- def query_count(cls, model, filters=None):
- with cls.DBSession() as se:
- query = cls.get_query(filters, se, model)
- return query.count()
- @classmethod
- def query_first(cls, model, filters=None):
- with cls.DBSession() as se:
- query = cls.get_query(filters, se, model)
- return query.first()
- @classmethod
- def update(cls, model, filters, dict_new):
- with cls.DBSession() as se:
- query = cls.get_query(filters, se, model)
- res = query.update(dict_new)
- se.commit()
- return res
- @staticmethod
- def get_query(filters, se, model, order_bys=None, offset=None, limit=None):
- if filters is None:
- filters = []
- if order_bys is None:
- order_bys = []
- query = se.query(model)
- for _filter in filters:
- query = query.filter(_filter)
- if len(order_bys) > 0:
- query = query.order_by(*order_bys)
- if offset:
- query = query.offset(offset)
- if limit:
- query = query.limit(limit)
- return query
- @classmethod
- def delete(cls, model, filters):
- with cls.DBSession() as se:
- query = cls.get_query(filters, se, model)
- res = query.delete()
- se.commit()
- return res
- class RedisInit(CfgBaseInit):
- """
- Redis 对象初始化
- """
- # __redis_dict = CfgBaseInit.config_dict["REDIS"]
- # __host = __redis_dict['host']
- # __port = __redis_dict['port']
- # __db = __redis_dict['db']
- # pool = redis.ConnectionPool(host=__host, port=__port, db=__db, max_connections=None, decode_responses=True,
- # socket_keepalive=False)
- # redis = redis.Redis(connection_pool=pool)
|