|
@@ -0,0 +1,103 @@
|
|
|
+import datetime
|
|
|
+import json
|
|
|
+
|
|
|
+import requests
|
|
|
+
|
|
|
+
|
|
|
+class MyFlink:
|
|
|
+ def __init__(self, host):
|
|
|
+ self.host = host
|
|
|
+
|
|
|
+ # 返回id数组
|
|
|
+ def list_job1(self):
|
|
|
+ url = self.host + "/jobs"
|
|
|
+ response = requests.get(url)
|
|
|
+ return [job["id"] for job in response.json()['jobs']]
|
|
|
+
|
|
|
+ def list_jobs(self):
|
|
|
+ url = self.host + "/jobs/overview"
|
|
|
+ response = requests.get(url)
|
|
|
+ jobs = response.json()["jobs"]
|
|
|
+ job_details = []
|
|
|
+ for job in jobs:
|
|
|
+ job_id = job["jid"]
|
|
|
+ job_name = job["name"]
|
|
|
+ start_time = datetime.datetime.utcfromtimestamp(job["start-time"] / 1000).strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
|
+ duration = str(datetime.timedelta(milliseconds=job["duration"]))
|
|
|
+ tasks = job["tasks"]["total"]
|
|
|
+ status = job["state"]
|
|
|
+ job_detail = {
|
|
|
+ "job_id": job_id,
|
|
|
+ "job_name": job_name,
|
|
|
+ "start_time": start_time,
|
|
|
+ "duration": duration,
|
|
|
+ "tasks": tasks,
|
|
|
+ "status": status
|
|
|
+ }
|
|
|
+ job_details.append(job_detail)
|
|
|
+ return job_details
|
|
|
+
|
|
|
+ def stop_all(self):
|
|
|
+ url = self.host + "/jobs/stop"
|
|
|
+ response = requests.post(url)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def stop_by_id(self, job_id):
|
|
|
+ url = self.host + f"/jobs/{job_id}/stop"
|
|
|
+ response = requests.post(url)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ def upload_jar(self, jar_path):
|
|
|
+ url = self.host + "/jars/upload"
|
|
|
+ files = {'file': open(jar_path, 'rb')}
|
|
|
+ response = requests.post(url, files=files)
|
|
|
+ if response.json()['status'] == 'success':
|
|
|
+ return ["filename"].split("/")[-1]
|
|
|
+
|
|
|
+ def list_jars(self):
|
|
|
+ url = self.host + "/jars"
|
|
|
+ response = requests.get(url)
|
|
|
+ return response.json()['files']
|
|
|
+
|
|
|
+ def delete_jar(self, jar_id):
|
|
|
+ url = self.host + "/jars/" + jar_id
|
|
|
+ response = requests.delete(url)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ # 示例
|
|
|
+ # [
|
|
|
+ # "--input", "/path/to/input/file",
|
|
|
+ # "--output", "/path/to/output/file",
|
|
|
+ # ]
|
|
|
+ def start_job(self, jar_id, program_args):
|
|
|
+ url = f"{self.host}/jars/{jar_id}/plan"
|
|
|
+ response = requests.get(url)
|
|
|
+ entry_class = response.json()["entryClass"]
|
|
|
+
|
|
|
+ url = self.host + f"/jars/${jar_id}/run"
|
|
|
+ json_data = {
|
|
|
+ "entryClass": entry_class,
|
|
|
+ "programArgs": program_args
|
|
|
+ }
|
|
|
+ response = requests.post(url, json=json_data)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+
|
|
|
+def print_json_pretty(obj):
|
|
|
+ print(json.dumps(obj, indent=4, ensure_ascii=False))
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ # flink = MyFlink("https://flink.rxdpdev.k5.bigtree.tech")
|
|
|
+
|
|
|
+ # print(flink.upload_jar("/Users/alvin/bigtree/rxdp-dm-jobs/rxdp-dm-jobs-flink/target/rxdp-dm-jobs-flink-1.0-SNAPSHOT.jar"))
|
|
|
+ # print_json_pretty(flink.list_jars())
|
|
|
+ flink = MyFlink("http://121.37.84.96:8082")
|
|
|
+ jars = flink.list_jars()
|
|
|
+ # print(jars)
|
|
|
+ # for jar in jars:
|
|
|
+ # jar_id = jar['id']
|
|
|
+ # if "SNAPSHOT" not in jar_id:
|
|
|
+ # print(jar_id)
|
|
|
+ # print(flink.delete_jar(jar_id))
|
|
|
+ # print(flink.delete_jar("e9d0ec97-bcfd-4501-848e-23be742c5038_main.jar"))
|