import datetime import unittest import requests class MyFlink: def __init__(self, host): self.host = host # 返回所有 job id def list_job_id(self): url = self.host + "/jobs" response = requests.get(url) return [job["id"] for job in response.json()['jobs']] # 查看所有jar包 def list_jobs(self): url = self.host + "/jobs/overview" response = requests.get(url) jobs = response.json()["jobs"] job_details = [] for job in jobs: job_id = job["jid"] job_name = job["name"] start_time = datetime.datetime.utcfromtimestamp(job["start-time"] / 1000).strftime("%Y-%m-%d %H:%M:%S UTC") duration = str(datetime.timedelta(milliseconds=job["duration"])) tasks = job["tasks"]["total"] status = job["state"] job_detail = { "job_id": job_id, "job_name": job_name, "start_time": start_time, "duration": duration, "tasks": tasks, "status": status } job_details.append(job_detail) return job_details # 停止所有job def stop_all(self): job_ids = self.list_job_id() for job_id in job_ids: self.stop_by_id(job_id) print(f"停止job:{job_id}") # 根据job id 停止任务 def stop_by_id(self, job_id): url = self.host + f"/jobs/{job_id}/stop" response = requests.post(url) return response.json() # 上传jar包 def upload_jar(self, jar_path): url = self.host + "/jars/upload" with open(jar_path, 'rb') as jar_file: files = {'file': jar_file} response = requests.post(url, files=files, timeout=1000) print(response.json()) if response.json()['status'] == 'success': return response.json()["filename"].split("/")[-1] def list_jars(self): url = self.host + "/jars" response = requests.get(url) return response.json()['files'] def delete_jar(self, jar_id): url = self.host + "/jars/" + jar_id response = requests.delete(url) return response.json() def start_job(self, jar_id, program_args): program_args = " ".join(program_args.strip().split()) print(f"program_args 入参:{program_args}") jars = self.list_jars() for jar in jars: if jar['id'] == jar_id: entry_class = jar['entry'][0]['name'] url = self.host + f"/jars/{jar_id}/run" json_data = { "entryClass": entry_class, "programArgs": program_args } response = requests.post(url, json=json_data) return response.json() return "未找到jar包" class MyTest(unittest.TestCase): def test_stop_all(self): flink = MyFlink("https://flink.rxdptest.k5.bigtree.tech") print(flink.stop_all()) def test_upload_start(self): flink = MyFlink("https://flink.rxdptest.k5.bigtree.tech") jar_id = flink.upload_jar("/Users/alvin/bigtree/rxdp-dm-jobs/rxdp-dm-jobs-flink/target/rxdp-dm-jobs-flink-1.0-SNAPSHOT.jar") print(f"jar_id: {jar_id}") flink.stop_all() print(flink.start_job(jar_id, """ --kafkaServer kafka-0.kafka-headless.rxdptest.svc.k5.bigtree.zone:9092,kafka-1.kafka-headless.rxdptest.svc.k5.bigtree.zone:9092,kafka-2.kafka-headless.rxdptest.svc.k5.bigtree.zone:9092 --esServer elasticsearch-master.rxdptest.svc.k5.bigtree.zone --esPort 9200 --pythonUrl http://py-invoke-svc:8000 --redisIp redis-master.rxdptest.svc.k5.bigtree.zone --redisPassword SB6vdem """)) def test_upload_start_dev(self): flink = MyFlink("https://flink.rxdpdev.k5.bigtree.tech") jar_id = flink.upload_jar("/Users/alvin/bigtree/rxdp-dm-jobs/rxdp-dm-jobs-flink/target/rxdp-dm-jobs-flink-1.6.0-SNAPSHOT.jar") print(f"jar_id: {jar_id}") flink.stop_all() print(flink.start_job(jar_id, """ --kafkaServer kafka-0.kafka-headless.rxdpdev.svc.k5.bigtree.zone:9092,kafka-1.kafka-headless.rxdpdev.svc.k5.bigtree.zone:9092,kafka-2.kafka-headless.rxdpdev.svc.k5.bigtree.zone:9092 --esServer elasticsearch-master.rxdpdev.svc.k5.bigtree.zone --esPort 9200 --pythonUrl http://py-invoke-svc:8000 --redisIp redis-master.rxdpdev.svc.k5.bigtree.zone --redisPassword SB6vdem """)) def test_upload_start_sl(self): flink = MyFlink("https://flink.rxdpsl.k5.bigtree.tech") jar_id = flink.upload_jar("/Users/alvin/bigtree/rxdp-dm-jobs/rxdp-dm-jobs-flink/target/rxdp-dm-jobs-flink-1.6.0-SNAPSHOT.jar") print(f"jar_id: {jar_id}") flink.stop_all() print(flink.start_job(jar_id, """ --kafkaServer kafka-0.kafka-headless.rxdpsl.svc.k5.bigtree.zone:9092,kafka-1.kafka-headless.rxdpsl.svc.k5.bigtree.zone:9092,kafka-2.kafka-headless.rxdpsl.svc.k5.bigtree.zone:9092 --esServer elasticsearch-master.rxdpsl.svc.k5.bigtree.zone --esPort 9200 --pythonUrl http://py-invoke-svc:8000 --redisIp redis-master.rxdpsl.svc.k5.bigtree.zone --redisPassword SB6vdem """)) if __name__ == '__main__': MyTest.test_upload_start() pass