import datetime import json import requests class MyFlink: def __init__(self, host): self.host = host # 返回id数组 def list_job1(self): url = self.host + "/jobs" response = requests.get(url) return [job["id"] for job in response.json()['jobs']] def list_jobs(self): url = self.host + "/jobs/overview" response = requests.get(url) jobs = response.json()["jobs"] job_details = [] for job in jobs: job_id = job["jid"] job_name = job["name"] start_time = datetime.datetime.utcfromtimestamp(job["start-time"] / 1000).strftime("%Y-%m-%d %H:%M:%S UTC") duration = str(datetime.timedelta(milliseconds=job["duration"])) tasks = job["tasks"]["total"] status = job["state"] job_detail = { "job_id": job_id, "job_name": job_name, "start_time": start_time, "duration": duration, "tasks": tasks, "status": status } job_details.append(job_detail) return job_details def stop_all(self): url = self.host + "/jobs/stop" response = requests.post(url) return response.json() def stop_by_id(self, job_id): url = self.host + f"/jobs/{job_id}/stop" response = requests.post(url) return response.json() def upload_jar(self, jar_path): url = self.host + "/jars/upload" files = {'file': open(jar_path, 'rb')} response = requests.post(url, files=files) if response.json()['status'] == 'success': return ["filename"].split("/")[-1] def list_jars(self): url = self.host + "/jars" response = requests.get(url) return response.json()['files'] def delete_jar(self, jar_id): url = self.host + "/jars/" + jar_id response = requests.delete(url) return response.json() # 示例 # [ # "--input", "/path/to/input/file", # "--output", "/path/to/output/file", # ] def start_job(self, jar_id, program_args): url = f"{self.host}/jars/{jar_id}/plan" response = requests.get(url) entry_class = response.json()["entryClass"] url = self.host + f"/jars/${jar_id}/run" json_data = { "entryClass": entry_class, "programArgs": program_args } response = requests.post(url, json=json_data) return response.json() def print_json_pretty(obj): print(json.dumps(obj, indent=4, ensure_ascii=False)) if __name__ == '__main__': # flink = MyFlink("https://flink.rxdpdev.k5.bigtree.tech") # print(flink.upload_jar("/Users/alvin/bigtree/rxdp-dm-jobs/rxdp-dm-jobs-flink/target/rxdp-dm-jobs-flink-1.0-SNAPSHOT.jar")) # print_json_pretty(flink.list_jars()) flink = MyFlink("http://121.37.84.96:8082") jars = flink.list_jars() # print(jars) # for jar in jars: # jar_id = jar['id'] # if "SNAPSHOT" not in jar_id: # print(jar_id) # print(flink.delete_jar(jar_id)) # print(flink.delete_jar("e9d0ec97-bcfd-4501-848e-23be742c5038_main.jar"))