123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- import datetime
- import json
- import requests
- class MyFlink:
- def __init__(self, host):
- self.host = host
- # 返回id数组
- def list_job1(self):
- url = self.host + "/jobs"
- response = requests.get(url)
- return [job["id"] for job in response.json()['jobs']]
- def list_jobs(self):
- url = self.host + "/jobs/overview"
- response = requests.get(url)
- jobs = response.json()["jobs"]
- job_details = []
- for job in jobs:
- job_id = job["jid"]
- job_name = job["name"]
- start_time = datetime.datetime.utcfromtimestamp(job["start-time"] / 1000).strftime("%Y-%m-%d %H:%M:%S UTC")
- duration = str(datetime.timedelta(milliseconds=job["duration"]))
- tasks = job["tasks"]["total"]
- status = job["state"]
- job_detail = {
- "job_id": job_id,
- "job_name": job_name,
- "start_time": start_time,
- "duration": duration,
- "tasks": tasks,
- "status": status
- }
- job_details.append(job_detail)
- return job_details
- def stop_all(self):
- url = self.host + "/jobs/stop"
- response = requests.post(url)
- return response.json()
- def stop_by_id(self, job_id):
- url = self.host + f"/jobs/{job_id}/stop"
- response = requests.post(url)
- return response.json()
- def upload_jar(self, jar_path):
- url = self.host + "/jars/upload"
- files = {'file': open(jar_path, 'rb')}
- response = requests.post(url, files=files)
- if response.json()['status'] == 'success':
- return ["filename"].split("/")[-1]
- def list_jars(self):
- url = self.host + "/jars"
- response = requests.get(url)
- return response.json()['files']
- def delete_jar(self, jar_id):
- url = self.host + "/jars/" + jar_id
- response = requests.delete(url)
- return response.json()
- # 示例
- # [
- # "--input", "/path/to/input/file",
- # "--output", "/path/to/output/file",
- # ]
- def start_job(self, jar_id, program_args):
- url = f"{self.host}/jars/{jar_id}/plan"
- response = requests.get(url)
- entry_class = response.json()["entryClass"]
- url = self.host + f"/jars/${jar_id}/run"
- json_data = {
- "entryClass": entry_class,
- "programArgs": program_args
- }
- response = requests.post(url, json=json_data)
- return response.json()
- def print_json_pretty(obj):
- print(json.dumps(obj, indent=4, ensure_ascii=False))
- if __name__ == '__main__':
- # flink = MyFlink("https://flink.rxdpdev.k5.bigtree.tech")
- # print(flink.upload_jar("/Users/alvin/bigtree/rxdp-dm-jobs/rxdp-dm-jobs-flink/target/rxdp-dm-jobs-flink-1.0-SNAPSHOT.jar"))
- # print_json_pretty(flink.list_jars())
- flink = MyFlink("http://121.37.84.96:8082")
- jars = flink.list_jars()
- # print(jars)
- # for jar in jars:
- # jar_id = jar['id']
- # if "SNAPSHOT" not in jar_id:
- # print(jar_id)
- # print(flink.delete_jar(jar_id))
- # print(flink.delete_jar("e9d0ec97-bcfd-4501-848e-23be742c5038_main.jar"))
|