tianyunperfect 2 jaren geleden
bovenliggende
commit
ed2612040a
4 gewijzigde bestanden met toevoegingen van 277 en 0 verwijderingen
  1. 100 0
      project/call_sh_app.py
  2. 154 0
      project/push_user_app.py
  3. 14 0
      tmp/kafka/KafkaPushTest11.py
  4. 9 0
      tmp/kafka/kafkaReceiveTest11.py

+ 100 - 0
project/call_sh_app.py

@@ -0,0 +1,100 @@
+import os
+import sys
+import time
+from pathlib import Path
+
+import uvicorn
+import subprocess
+
+from fastapi import FastAPI, UploadFile, File, Form
+from fastapi import Request
+from fastapi.middleware.cors import CORSMiddleware
+from starlette.middleware.sessions import SessionMiddleware
+from starlette.responses import JSONResponse, RedirectResponse, FileResponse
+
+# =============== 基础配置 ===============
+
+app = FastAPI(title="project name ", description="通用系统 ", version="v 0.0.0")
+
+# 添加 session 中间键,使项目中可以使用session
+app.add_middleware(SessionMiddleware, secret_key='123456hhh')
+app.add_middleware(
+    CORSMiddleware,
+    allow_origins=["*", ],
+    allow_credentials=True,
+    allow_methods=["*"],
+    allow_headers=["*"],
+)
+
+
+@app.exception_handler(Exception)
+async def validation_exception_handler(request, exc):
+    """请求校验异常捕获; application/json """
+    return JSONResponse({'message': "服务器内部错误", 'status_code': 500})
+
+
+@app.middleware("http")
+async def add_process_time_header(request: Request, call_next):
+    """接口响应中间键; 当前只支持 http 请求"""
+    start_time = time.time()
+    response = await call_next(request)
+    process_time = time.time() - start_time
+    response.headers["API-Process-Time"] = f"{process_time} seconds"
+    return response
+
+
+# =============== 代码 ===============
+
+
+@app.get("/call_sh")
+def call_sh(path: str):
+    """
+    echo "$(curl 127.0.0.1:9999/call_sh?path=/Users/mlamp/IdeaProjects/python-base/tmp.sh)"
+    :param path:
+    :return:
+    """
+    # return os.popen(f"sh {path}").read()
+    os.system(f"sh {path} &")
+    return "请等待结果"
+
+
+def run_shell(shell):
+    cmd = subprocess.Popen(shell, stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True,
+                           stdout=sys.stdout, universal_newlines=True, shell=True, bufsize=1)
+    cmd.communicate()
+    return cmd.returncode
+
+
+@app.post("/upload")
+def upload(file: UploadFile = File(...), path: str = Form(...)):
+    """
+    curl 127.0.0.1:9999/upload -F "file=@稻香.flac" -F 'path=/Users/mlamp/tmp'
+    :param file:
+    :param path:
+    :return:
+    """
+    upload_dir = "/tmp/upload"
+    if not Path(upload_dir).exists():
+        Path(upload_dir).mkdir()
+
+    with open("/tmp/upload/" + file.filename, 'wb') as f:
+        f.write(file.file.read())
+    return "OK"
+
+
+@app.get("/download")
+def download(path: str):
+    """
+    wget 127.0.0.1:9999/download?path=/Users/mlamp/tmp/稻香.flac
+    :param path:
+    :return:
+    """
+    return FileResponse(
+        path,
+        filename=path.split("/")[-1]
+    )
+
+
+if __name__ == '__main__':
+    # uvicorn call_sh:app --reload --port 19999
+    uvicorn.run(app='call_sh_app:app', host="0.0.0.0", port=9999, reload=True, debug=True)

+ 154 - 0
project/push_user_app.py

@@ -0,0 +1,154 @@
+import json
+import random
+import threading
+import time
+
+import requests
+import uvicorn
+from fastapi import FastAPI
+from fastapi import Request
+from fastapi.exceptions import RequestValidationError
+from fastapi.middleware.cors import CORSMiddleware
+from fastapi.responses import PlainTextResponse
+from starlette.exceptions import HTTPException as StarletteHTTPException
+from starlette.middleware.sessions import SessionMiddleware
+from starlette.responses import JSONResponse
+
+app = FastAPI(title="push_user_app ", description="通用系统 ", version="v 0.0.0")
+
+# 添加 session 中间键,使项目中可以使用session
+app.add_middleware(SessionMiddleware, secret_key='123456hh112312h')
+app.add_middleware(
+    CORSMiddleware,
+    allow_origins=["*", ],
+    allow_credentials=True,
+    allow_methods=["*"],
+    allow_headers=["*"],
+)
+
+
+@app.exception_handler(StarletteHTTPException)
+async def http_exception_handler(request, exc):
+    """接口服务异常捕获; text/plain"""
+    return PlainTextResponse(str(exc.detail), status_code=exc.status_code)
+
+
+@app.exception_handler(RequestValidationError)
+async def validation_exception_handler(request, exc):
+    """请求校验异常捕获; application/json """
+    json_response = JSONResponse({'message': '【 {} 】接口触发了 RequestValidationError 错误.'
+                                 .format(request.scope.get("endpoint").__name__, exc.errors()), 'status_code': 999,
+                                  "error_message": exc.errors(), "queryParasm": exc.body})
+    return json_response
+
+
+@app.exception_handler(Exception)
+async def validation_exception_handler(request, exc):
+    """请求校验异常捕获; application/json """
+    return JSONResponse({'message': "服务器内部错误", 'status_code': 500})
+
+
+@app.middleware("http")
+async def add_process_time_header(request: Request, call_next):
+    """接口响应中间键; 当前只支持 http 请求"""
+    start_time = time.time()
+    response = await call_next(request)
+    process_time = time.time() - start_time
+    response.headers["API-Process-Time"] = f"{process_time} seconds"
+    return response
+
+
+# http://localhost:8000/query?uid=1
+@app.get("/query")
+def query(uid):
+    msg = f'uid为{uid}'
+    return {'success': True, 'msg': msg}
+
+
+token = None
+
+
+def delete_token():
+    global token
+    time.sleep(60 * 60)
+    token = None
+
+
+notes = [
+    """我们曾如此渴望命运的波澜,到最后才发现,人生最曼妙的风景,竟是内心的淡定与从容;
+    我们曾如此期盼外界的认可,到最后才知道,世界是自己的,与他人毫无关系。""",
+    """做一个浪漫的人,对世界保持永远的好奇,但别让他轻易改变你。""",
+    """我们终其一生都是为了获得选择的权利和拒绝的底气。""",
+    """无聊是非常有必要的,一个人在空白时间所做的事,决定了这个人和他人根本的不同。""",
+    """当爱支配一切时,权力就不存在了;
+    当权力主宰一切时,爱就消失了。两者互为对方的影子。""",
+    """没有一个人是住在客观的世界里, 我们都居住在一个各自赋予其意义的主观的世界。""",
+    """每天安静地坐十五分钟,倾听你的气息,感觉它,感觉你自己,并且试着什么都不想。"""
+]
+
+
+def get_des():
+    li = random.choice(notes).split("\n")
+    li.append("点击进行复习。")
+    res = ''
+    for line in li:
+        res += "<div>" + line.strip() + "</div>"
+    return res
+
+
+def send_message(corp_id, agent_id, secret, userid_list=None, title=None):
+    global token
+
+    if userid_list is None:
+        userid_list = []
+
+    userid_str = "|".join(userid_list)
+    if token is None:
+        print("获取一次 token")
+        response = requests.get(f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corp_id}&corpsecret={secret}")
+        data = json.loads(response.text)
+        access_token = data['access_token']
+        token = access_token
+        threading.Thread(target=delete_token).start()
+    else:
+        access_token = token
+
+    json_dict = {
+        "touser": userid_str,
+        "msgtype": "textcard",
+        "agentid": agent_id,
+        "textcard": {
+            "title": title,
+            "description": get_des(),
+            "url": "https://memory.tianyunperfect.cn/",
+            "btntxt": "更多"
+        },
+        "safe": 0,
+        "enable_id_trans": 0,
+        "enable_duplicate_check": 0,
+        "duplicate_check_interval": 1800
+    }
+    json_str = json.dumps(json_dict)
+    response_send = requests.post(f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}",
+                                  data=json_str)
+    return json.loads(response_send.text)['errmsg'] == 'ok'
+
+
+@app.get("/push_user")
+def push_user(user_id):
+    return {
+        'success': send_message("ww582505d35b886cde", "1000002", "yDCwvVOHSvb_43Y3e17mZi4E7hEZ2Z3UDyDpuKxzPsQ",
+                                [str(user_id)], "念念不忘,必有回响")}
+
+
+@app.get("/push_msg")
+def push_msg(user_id, title):
+    return {
+        'success': send_message("ww582505d35b886cde", "1000002", "yDCwvVOHSvb_43Y3e17mZi4E7hEZ2Z3UDyDpuKxzPsQ",
+                                [str(user_id)], title)}
+
+
+if __name__ == '__main__':
+    uvicorn.run(app='push_user_app:app', host="0.0.0.0", port=8087, reload=True, debug=True)
+
+# uvicorn push_user_app:app --reload --host 0.0.0.0 --port 9100

+ 14 - 0
tmp/kafka/KafkaPushTest11.py

@@ -0,0 +1,14 @@
+import json
+import time
+from kafka import KafkaProducer
+
+producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),
+                         bootstrap_servers=['192.168.36.199:9092'])
+index = 0
+while True:
+    time.sleep(2)  # 每个5秒推送一条数据
+
+    producer.send('tag_atom_topic', json.dumps({
+        "msg": "nihao"
+    }), partition=0)  # 发送的topic为test
+    print("===")

+ 9 - 0
tmp/kafka/kafkaReceiveTest11.py

@@ -0,0 +1,9 @@
+from kafka import KafkaConsumer
+# tag_sink_topic\tag_atom_topic
+consumer = KafkaConsumer('tag_atom_topic',
+                         group_id='test11',  # 一个组消费一次
+                         auto_offset_reset='latest',  # 从最新数据读取,earliest,latest
+                         bootstrap_servers=['192.168.36.199:9092']
+                         )
+for msg in consumer:
+    print(msg.value)