from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker class SqlEngine: def __init__(self, mysql_dict): from urllib.parse import quote_plus # 280秒重新连接一次 self.DBSession = sessionmaker(bind=create_engine( f"mysql://{mysql_dict['user']}:{quote_plus(mysql_dict['password'])}@{mysql_dict['host']}:{mysql_dict['port']}/{mysql_dict['db']}?charset=utf8mb4&serverTimezone=Asia/Shanghai", pool_size=2, max_overflow=0, pool_recycle=280)) def insert(self, item): with self.DBSession() as se: se.add(item) se.commit() return item.id def insert_all(self, items): with self.DBSession() as se: se.bulk_save_objects(items) se.commit() def select_all(self, model, filters=None, order_bys=None, offset=None, limit=None): """ :param limit: :param offset: :param order_bys: order_by(User.name.desc() :param model: :param filters: User.name == 'James' User.name != 'James' User.name.like('%e%') User.name.in_(['Kobe', 'James']) :return: """ with self.DBSession() as se: query = self.get_query(filters, se, model, order_bys, offset, limit) return query.all() def select_page(self, model, filters, order_bys=None, offset=0, limit=10): count = self.query_count(model, filters) res_list = self.query_all(model, filters, order_bys, offset, limit) return { "count": count, "list": res_list } def select_count(self, model, filters=None): with self.DBSession() as se: query = self.get_query(filters, se, model) return query.count() def select_first(self, model, filters=None): with self.DBSession() as se: query = self.get_query(filters, se, model) return query.first() def update(self, model, filters, dict_new): with self.DBSession() as se: query = self.get_query(filters, se, model) res = query.update(dict_new) se.commit() return res @staticmethod def get_query(filters, se, model, order_bys=None, offset=None, limit=None): if filters is None: filters = [] if order_bys is None: order_bys = [] query = se.query(model) for _filter in filters: query = query.filter(_filter) if len(order_bys) > 0: query = query.order_by(*order_bys) if offset: query = query.offset(offset) if limit: query = query.limit(limit) return query def delete(self, model, filters): with self.DBSession() as se: query = self.get_query(filters, se, model) res = query.delete() se.commit() return res def select_sql(self, sql, params=None): """ sql 查询 :param sql: "SELECT * FROM users WHERE age > :age" :param params: {'age': 18} :return: """ with self.DBSession() as se: if params: stmt = text(sql).bindparams(**params) result = se.execute(stmt) else: result = se.execute(sql) return result.fetchall() def execute_sql(self, sql, params=None): with self.DBSession() as se: if params: stmt = text(sql).bindparams(**params) result = se.execute(stmt) else: result = se.execute(sql) return result.rowcount