123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- import os
- import sys
- import time
- from pathlib import Path
- import uvicorn
- import subprocess
- from fastapi import FastAPI, UploadFile, File, Form
- from fastapi import Request
- from fastapi.middleware.cors import CORSMiddleware
- from starlette.middleware.sessions import SessionMiddleware
- from starlette.responses import JSONResponse, RedirectResponse, FileResponse
- # =============== 基础配置 ===============
- app = FastAPI(title="project name ", description="通用系统 ", version="v 0.0.0")
- # 添加 session 中间键,使项目中可以使用session
- app.add_middleware(SessionMiddleware, secret_key='123456hhh')
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*", ],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- @app.exception_handler(Exception)
- async def validation_exception_handler(request, exc):
- """请求校验异常捕获; application/json """
- return JSONResponse({'message': "服务器内部错误", 'status_code': 500})
- @app.middleware("http")
- async def add_process_time_header(request: Request, call_next):
- """接口响应中间键; 当前只支持 http 请求"""
- start_time = time.time()
- response = await call_next(request)
- process_time = time.time() - start_time
- response.headers["API-Process-Time"] = f"{process_time} seconds"
- return response
- # =============== 代码 ===============
- @app.get("/call_sh")
- def call_sh(path: str):
- """
- echo "$(curl 127.0.0.1:9999/call_sh?path=/Users/mlamp/IdeaProjects/python-base/tmp.sh)"
- :param path:
- :return:
- """
- # return os.popen(f"sh {path}").read()
- os.system(f"sh {path} &")
- return "请等待结果"
- def run_shell(shell):
- cmd = subprocess.Popen(shell, stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True,
- stdout=sys.stdout, universal_newlines=True, shell=True, bufsize=1)
- cmd.communicate()
- return cmd.returncode
- @app.post("/upload")
- def upload(file: UploadFile = File(...), path: str = Form(...)):
- """
- curl 127.0.0.1:9999/upload -F "file=@稻香.flac" -F 'path=/Users/mlamp/tmp'
- :param file:
- :param path:
- :return:
- """
- upload_dir = "/tmp/upload"
- if not Path(upload_dir).exists():
- Path(upload_dir).mkdir()
- with open("/tmp/upload/" + file.filename, 'wb') as f:
- f.write(file.file.read())
- return "OK"
- @app.get("/download")
- def download(path: str):
- """
- wget 127.0.0.1:9999/download?path=/Users/mlamp/tmp/稻香.flac
- :param path:
- :return:
- """
- return FileResponse(
- path,
- filename=path.split("/")[-1]
- )
- if __name__ == '__main__':
- # uvicorn call_sh:app --reload --port 19999
- uvicorn.run(app='call_sh_app:app', host="0.0.0.0", port=9999, reload=True, debug=True)
|