123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- from sqlalchemy import create_engine, text
- from sqlalchemy.orm import sessionmaker
- class SqlEngine:
- def __init__(self, mysql_dict):
- from urllib.parse import quote_plus
- # 280秒重新连接一次
- self.DBSession = sessionmaker(bind=create_engine(
- f"mysql://{mysql_dict['user']}:{quote_plus(mysql_dict['password'])}@{mysql_dict['host']}:{mysql_dict['port']}/{mysql_dict['db']}?charset=utf8mb4&serverTimezone=Asia/Shanghai",
- pool_size=2, max_overflow=0, pool_recycle=280))
- def insert(self, item):
- with self.DBSession() as se:
- se.add(item)
- se.commit()
- return item.id
- def insert_all(self, items):
- with self.DBSession() as se:
- se.bulk_save_objects(items)
- se.commit()
- def select_all(self, model, filters=None, order_bys=None, offset=None, limit=None):
- """
- :param limit:
- :param offset:
- :param order_bys:
- order_by(User.name.desc()
- :param model:
- :param filters:
- User.name == 'James'
- User.name != 'James'
- User.name.like('%e%')
- User.name.in_(['Kobe', 'James'])
- :return:
- """
- with self.DBSession() as se:
- query = self.get_query(filters, se, model, order_bys, offset, limit)
- return query.all()
- def select_page(self, model, filters, order_bys=None, offset=0, limit=10):
- count = self.query_count(model, filters)
- res_list = self.query_all(model, filters, order_bys, offset, limit)
- return {
- "count": count,
- "list": res_list
- }
- def select_count(self, model, filters=None):
- with self.DBSession() as se:
- query = self.get_query(filters, se, model)
- return query.count()
- def select_first(self, model, filters=None):
- with self.DBSession() as se:
- query = self.get_query(filters, se, model)
- return query.first()
- def update(self, model, filters, dict_new):
- with self.DBSession() as se:
- query = self.get_query(filters, se, model)
- res = query.update(dict_new)
- se.commit()
- return res
- @staticmethod
- def get_query(filters, se, model, order_bys=None, offset=None, limit=None):
- if filters is None:
- filters = []
- if order_bys is None:
- order_bys = []
- query = se.query(model)
- for _filter in filters:
- query = query.filter(_filter)
- if len(order_bys) > 0:
- query = query.order_by(*order_bys)
- if offset:
- query = query.offset(offset)
- if limit:
- query = query.limit(limit)
- return query
- def delete(self, model, filters):
- with self.DBSession() as se:
- query = self.get_query(filters, se, model)
- res = query.delete()
- se.commit()
- return res
- def select_sql(self, sql, params=None):
- """
- sql 查询
- :param sql: "SELECT * FROM users WHERE age > :age"
- :param params: {'age': 18}
- :return:
- """
- with self.DBSession() as se:
- if params:
- stmt = text(sql).bindparams(**params)
- result = se.execute(stmt)
- else:
- result = se.execute(sql)
- return result.fetchall()
- def execute_sql(self, sql, params=None):
- with self.DBSession() as se:
- if params:
- stmt = text(sql).bindparams(**params)
- result = se.execute(stmt)
- else:
- result = se.execute(sql)
- return result.rowcount
|