123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- import datetime
- import unittest
- import requests
- class MyFlink:
- def __init__(self, host):
- self.host = host
- # 返回所有 job id
- def list_job_id(self):
- url = self.host + "/jobs"
- response = requests.get(url)
- return [job["id"] for job in response.json()['jobs']]
- # 查看所有jar包
- def list_jobs(self):
- url = self.host + "/jobs/overview"
- response = requests.get(url)
- jobs = response.json()["jobs"]
- job_details = []
- for job in jobs:
- job_id = job["jid"]
- job_name = job["name"]
- start_time = datetime.datetime.utcfromtimestamp(job["start-time"] / 1000).strftime("%Y-%m-%d %H:%M:%S UTC")
- duration = str(datetime.timedelta(milliseconds=job["duration"]))
- tasks = job["tasks"]["total"]
- status = job["state"]
- job_detail = {
- "job_id": job_id,
- "job_name": job_name,
- "start_time": start_time,
- "duration": duration,
- "tasks": tasks,
- "status": status
- }
- job_details.append(job_detail)
- return job_details
- # 停止所有job
- def stop_all(self):
- job_ids = self.list_job_id()
- for job_id in job_ids:
- self.stop_by_id(job_id)
- # 根据job id 停止任务
- def stop_by_id(self, job_id):
- url = self.host + f"/jobs/{job_id}/stop"
- response = requests.post(url)
- return response.json()
- # 上传jar包
- def upload_jar(self, jar_path):
- url = self.host + "/jars/upload"
- with open(jar_path, 'rb') as jar_file:
- files = {'file': jar_file}
- response = requests.post(url, files=files, timeout=1000)
- print(response.json())
- if response.json()['status'] == 'success':
- return response.json()["filename"].split("/")[-1]
- def list_jars(self):
- url = self.host + "/jars"
- response = requests.get(url)
- return response.json()['files']
- def delete_jar(self, jar_id):
- url = self.host + "/jars/" + jar_id
- response = requests.delete(url)
- return response.json()
- def start_job(self, jar_id, program_args):
- program_args = " ".join(program_args.strip().split())
- print(f"program_args 入参:{program_args}")
- jars = self.list_jars()
- for jar in jars:
- if jar['id'] == jar_id:
- entry_class = jar['entry'][0]['name']
- url = self.host + f"/jars/{jar_id}/run"
- json_data = {
- "entryClass": entry_class,
- "programArgs": program_args
- }
- response = requests.post(url, json=json_data)
- return response.json()
- return "未找到jar包"
- class MyTest(unittest.TestCase):
- def test_stop_all(self):
- flink = MyFlink("https://flink.rxdptest.k5.bigtree.tech")
- print(flink.stop_all())
- def test_upload_start(self):
- flink = MyFlink("https://flink.rxdptest.k5.bigtree.tech")
- jar_id = flink.upload_jar("/Users/alvin/bigtree/rxdp-dm-jobs/rxdp-dm-jobs-flink/target/rxdp-dm-jobs-flink-1.0-SNAPSHOT.jar")
- print(f"jar_id: {jar_id}")
- jar_id = "b33b6c52-0e68-4f11-b6af-aeb7ffb116bf_rxdp-dm-jobs-flink-1.0-SNAPSHOT.jar"
- print(flink.start_job(jar_id, """
- --kafkaServer kafka-0.kafka-headless.rxdptest.svc.k5.bigtree.zone:9092,kafka-1.kafka-headless.rxdptest.svc.k5.bigtree.zone:9092,kafka-2.kafka-headless.rxdptest.svc.k5.bigtree.zone:9092
- --esServer elasticsearch-master.rxdptest.svc.k5.bigtree.zone
- --esPort 9200
- --pythonUrl http://py-invoke-svc:8000
- --redisIp redis-master.rxdptest.svc.k5.bigtree.zone
- --redisPassword SB6vdem
- """))
- if __name__ == '__main__':
- pass
|