tianyunperfect 3 rokov pred
rodič
commit
7251ea74df

+ 101 - 22
fastapi-demo/config/__init__.py

@@ -10,10 +10,10 @@ from configparser import ConfigParser
 from logging.handlers import TimedRotatingFileHandler
 from os.path import dirname
 
-import redis as redis
+# import redis as redis
 import uvloop
 from requests import adapters
-from sqlalchemy import create_engine
+from sqlalchemy import create_engine, and_
 from sqlalchemy.orm import sessionmaker
 
 adapters.DEFAULT_POOLSIZE = 100000
@@ -69,35 +69,114 @@ class Logger(object):
 logger = Logger.logger
 
 
-class MySqlEngine(CfgBaseInit):
+class SqlEngine(CfgBaseInit):
     """ mysql 引擎
     """
     import pymysql
-    from urllib.parse import quote_plus as urlquote
-    pymysql.install_as_MySQLdb()
+    from urllib.parse import quote_plus
 
+    pymysql.install_as_MySQLdb()
     mysql_dict = CfgBaseInit.config_dict["MYSQL"]
 
-    __user = mysql_dict['user']
-    __password = mysql_dict['password']
-    __host = mysql_dict['host']
-    __port = mysql_dict['port']
-    __db = mysql_dict['db']
-    __engine = create_engine(f"mysql://{__user}:{urlquote(__password)}@{__host}:{__port}/{__db}",
-                             pool_size=2,
-                             max_overflow=0,
-                             pool_recycle=280)  # 280秒重新连接一次
-    DBSession = sessionmaker(bind=__engine)
+    # 280秒重新连接一次
+    DBSession = sessionmaker(bind=create_engine(
+        f"mysql://{mysql_dict['user']}:{quote_plus(mysql_dict['password'])}@{mysql_dict['host']}:{mysql_dict['port']}/{mysql_dict['db']}",
+        pool_size=2, max_overflow=0, pool_recycle=280))
+
+    @classmethod
+    def add(cls, item):
+        with cls.DBSession() as se:
+            se.add(item)
+            se.commit()
+            return item.id
+
+    @classmethod
+    def add_all(cls, items):
+        with cls.DBSession() as se:
+            se.bulk_save_objects(items)
+            se.commit()
+
+    @classmethod
+    def query_all(cls, model, filters=None, order_bys=None, offset=None, limit=None):
+        """
+        :param limit:
+        :param offset:
+        :param order_bys:
+                order_by(User.name.desc()
+        :param model:
+        :param filters:
+            User.name=='James'
+            User.name.like('%e%')
+            User.name.in_(['Kobe', 'James'])
+        :return:
+        """
+        with cls.DBSession() as se:
+            query = cls.get_query(filters, se, model, order_bys, offset, limit)
+            return query.all()
+
+    @classmethod
+    def query_page(cls, model, filters, order_bys=None, offset=0, limit=10):
+        count = cls.query_count(model, filters)
+        res_list = cls.query_all(model, filters, order_bys, offset, limit)
+        return {
+            "count": count,
+            "list": res_list
+        }
+
+    @classmethod
+    def query_count(cls, model, filters=None):
+        with cls.DBSession() as se:
+            query = cls.get_query(filters, se, model)
+            return query.count()
+
+    @classmethod
+    def query_first(cls, model, filters=None):
+        with cls.DBSession() as se:
+            query = cls.get_query(filters, se, model)
+            return query.first()
+
+    @classmethod
+    def update(cls, model, filters, dict_new):
+        with cls.DBSession() as se:
+            query = cls.get_query(filters, se, model)
+            res = query.update(dict_new)
+            se.commit()
+            return res
+
+    @staticmethod
+    def get_query(filters, se, model, order_bys=None, offset=None, limit=None):
+        if filters is None:
+            filters = []
+        if order_bys is None:
+            order_bys = []
+        query = se.query(model)
+        for _filter in filters:
+            query = query.filter(_filter)
+        if len(order_bys) > 0:
+            query = query.order_by(*order_bys)
+        if offset:
+            query = query.offset(offset)
+        if limit:
+            query = query.limit(limit)
+        return query
+
+    @classmethod
+    def delete(cls, model, filters):
+        with cls.DBSession() as se:
+            query = cls.get_query(filters, se, model)
+            res = query.delete()
+            se.commit()
+            return res
 
 
 class RedisInit(CfgBaseInit):
     """
         Redis 对象初始化
     """
-    __redis_dict = CfgBaseInit.config_dict["REDIS"]
-    __host = __redis_dict['host']
-    __port = __redis_dict['port']
-    __db = __redis_dict['db']
-    pool = redis.ConnectionPool(host=__host, port=__port, db=__db, max_connections=None, decode_responses=True,
-                                socket_keepalive=False)
-    redis = redis.Redis(connection_pool=pool)
+    # __redis_dict = CfgBaseInit.config_dict["REDIS"]
+    # __host = __redis_dict['host']
+    # __port = __redis_dict['port']
+    # __db = __redis_dict['db']
+    # pool = redis.ConnectionPool(host=__host, port=__port, db=__db, max_connections=None, decode_responses=True,
+    #                             socket_keepalive=False)
+    # redis = redis.Redis(connection_pool=pool)

+ 4 - 4
fastapi-demo/config/config.ini

@@ -11,11 +11,11 @@ when = d
 backupCount = 30
 
 [MYSQL]
-user = root
-password = mininglamp
-host = 192.168.86.26
+user = dev_test
+password = MyGRx8MBh43jmAsS
+host = 101.43.195.48
 port = 3306
-db = eip_kafka
+db = dev_test
 
 [REDIS]
 host = 192.168.86.23

+ 21 - 0
fastapi-demo/config/models.py

@@ -0,0 +1,21 @@
+# coding: utf-8
+from sqlalchemy import Column, String
+from sqlalchemy.dialects.mysql import INTEGER
+from sqlalchemy.ext.declarative import declarative_base
+
+Base = declarative_base()
+metadata = Base.metadata
+
+
+class DbKafka(Base):
+    __tablename__ = 'db_kafka'
+
+    id = Column(INTEGER(11), primary_key=True)
+    name = Column(String(200), nullable=False)
+    json_str = Column(String(5000), nullable=False)
+
+    def to_dict(self):
+        return {c.name: getattr(self, c.name, None) for c in self.__table__.columns}
+
+    def __repr__(self):
+        return str(self.to_dict())

+ 15 - 0
fastapi-demo/test_sql.py

@@ -0,0 +1,15 @@
+# -*- coding:utf-8 -*-
+
+from config import SqlEngine
+from config.models import DbKafka
+
+ka = DbKafka(name="hi", json_str="hello")
+# print(SqlEngine.add(ka))
+# print(SqlEngine.add_all([ka]))
+# print(SqlEngine.query_first(DbKafka))
+# print(SqlEngine.update(DbKafka, [DbKafka.name == 'hi'], {DbKafka.name: "hi2"}))
+
+
+# print(SqlEngine.query_all(DbKafka))
+print(SqlEngine.query_all(DbKafka, [], [DbKafka.id.asc()]))
+print(SqlEngine.query_all(DbKafka, [], [DbKafka.name.desc(), DbKafka.json_str.desc()]))

+ 6 - 27
tmp/tmp.py

@@ -1,29 +1,8 @@
-a = 123
-i = 1234
-import aiohttp
-import asyncio
+def test(a, b=None, c=None, d=None):
+    print(a)
+    print(b)
+    print(c)
+    print(d)
 
 
-async def fetch(session, url):
-    print("发送请求:", url)
-    async with session.get(url, verify_ssl=False) as response:
-        content = await response.content.read()
-        file_name = url.rsplit('_')[-1]
-        with open(file_name, mode='wb') as file_object:
-            file_object.write(content)
-    return "OK"
-
-
-async def main():
-    async with aiohttp.ClientSession() as session:
-        url_list = [
-            'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
-            'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
-            'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
-        ]
-        res = await asyncio.gather(*[fetch(session, url) for url in url_list])
-        print(res)
-
-
-if __name__ == '__main__':
-    asyncio.run(main())
+test(*[1, 2, 3, 4])