tmp4.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. from sqlalchemy import create_engine, text
  2. from sqlalchemy.orm import sessionmaker
  3. class SqlEngine:
  4. def __init__(self, mysql_dict):
  5. from urllib.parse import quote_plus
  6. # 280秒重新连接一次
  7. self.DBSession = sessionmaker(bind=create_engine(
  8. f"mysql://{mysql_dict['user']}:{quote_plus(mysql_dict['password'])}@{mysql_dict['host']}:{mysql_dict['port']}/{mysql_dict['db']}?charset=utf8mb4&serverTimezone=Asia/Shanghai",
  9. pool_size=2, max_overflow=0, pool_recycle=280))
  10. def insert(self, item):
  11. with self.DBSession() as se:
  12. se.add(item)
  13. se.commit()
  14. return item.id
  15. def insert_all(self, items):
  16. with self.DBSession() as se:
  17. se.bulk_save_objects(items)
  18. se.commit()
  19. def select_all(self, model, filters=None, order_bys=None, offset=None, limit=None):
  20. """
  21. :param limit:
  22. :param offset:
  23. :param order_bys:
  24. order_by(User.name.desc()
  25. :param model:
  26. :param filters:
  27. User.name == 'James'
  28. User.name != 'James'
  29. User.name.like('%e%')
  30. User.name.in_(['Kobe', 'James'])
  31. :return:
  32. """
  33. with self.DBSession() as se:
  34. query = self.get_query(filters, se, model, order_bys, offset, limit)
  35. return query.all()
  36. def select_page(self, model, filters, order_bys=None, offset=0, limit=10):
  37. count = self.query_count(model, filters)
  38. res_list = self.query_all(model, filters, order_bys, offset, limit)
  39. return {
  40. "count": count,
  41. "list": res_list
  42. }
  43. def select_count(self, model, filters=None):
  44. with self.DBSession() as se:
  45. query = self.get_query(filters, se, model)
  46. return query.count()
  47. def select_first(self, model, filters=None):
  48. with self.DBSession() as se:
  49. query = self.get_query(filters, se, model)
  50. return query.first()
  51. def update(self, model, filters, dict_new):
  52. with self.DBSession() as se:
  53. query = self.get_query(filters, se, model)
  54. res = query.update(dict_new)
  55. se.commit()
  56. return res
  57. @staticmethod
  58. def get_query(filters, se, model, order_bys=None, offset=None, limit=None):
  59. if filters is None:
  60. filters = []
  61. if order_bys is None:
  62. order_bys = []
  63. query = se.query(model)
  64. for _filter in filters:
  65. query = query.filter(_filter)
  66. if len(order_bys) > 0:
  67. query = query.order_by(*order_bys)
  68. if offset:
  69. query = query.offset(offset)
  70. if limit:
  71. query = query.limit(limit)
  72. return query
  73. def delete(self, model, filters):
  74. with self.DBSession() as se:
  75. query = self.get_query(filters, se, model)
  76. res = query.delete()
  77. se.commit()
  78. return res
  79. def select_sql(self, sql, params=None):
  80. """
  81. sql 查询
  82. :param sql: "SELECT * FROM users WHERE age > :age"
  83. :param params: {'age': 18}
  84. :return:
  85. """
  86. with self.DBSession() as se:
  87. if params:
  88. stmt = text(sql).bindparams(**params)
  89. result = se.execute(stmt)
  90. else:
  91. result = se.execute(sql)
  92. return result.fetchall()
  93. def execute_sql(self, sql, params=None):
  94. with self.DBSession() as se:
  95. if params:
  96. stmt = text(sql).bindparams(**params)
  97. result = se.execute(stmt)
  98. else:
  99. result = se.execute(sql)
  100. return result.rowcount