flink.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import datetime
  2. import json
  3. import requests
  4. class MyFlink:
  5. def __init__(self, host):
  6. self.host = host
  7. # 返回id数组
  8. def list_job1(self):
  9. url = self.host + "/jobs"
  10. response = requests.get(url)
  11. return [job["id"] for job in response.json()['jobs']]
  12. def list_jobs(self):
  13. url = self.host + "/jobs/overview"
  14. response = requests.get(url)
  15. jobs = response.json()["jobs"]
  16. job_details = []
  17. for job in jobs:
  18. job_id = job["jid"]
  19. job_name = job["name"]
  20. start_time = datetime.datetime.utcfromtimestamp(job["start-time"] / 1000).strftime("%Y-%m-%d %H:%M:%S UTC")
  21. duration = str(datetime.timedelta(milliseconds=job["duration"]))
  22. tasks = job["tasks"]["total"]
  23. status = job["state"]
  24. job_detail = {
  25. "job_id": job_id,
  26. "job_name": job_name,
  27. "start_time": start_time,
  28. "duration": duration,
  29. "tasks": tasks,
  30. "status": status
  31. }
  32. job_details.append(job_detail)
  33. return job_details
  34. def stop_all(self):
  35. url = self.host + "/jobs/stop"
  36. response = requests.post(url)
  37. return response.json()
  38. def stop_by_id(self, job_id):
  39. url = self.host + f"/jobs/{job_id}/stop"
  40. response = requests.post(url)
  41. return response.json()
  42. def upload_jar(self, jar_path):
  43. url = self.host + "/jars/upload"
  44. files = {'file': open(jar_path, 'rb')}
  45. response = requests.post(url, files=files)
  46. if response.json()['status'] == 'success':
  47. return ["filename"].split("/")[-1]
  48. def list_jars(self):
  49. url = self.host + "/jars"
  50. response = requests.get(url)
  51. return response.json()['files']
  52. def delete_jar(self, jar_id):
  53. url = self.host + "/jars/" + jar_id
  54. response = requests.delete(url)
  55. return response.json()
  56. # 示例
  57. # [
  58. # "--input", "/path/to/input/file",
  59. # "--output", "/path/to/output/file",
  60. # ]
  61. def start_job(self, jar_id, program_args):
  62. url = f"{self.host}/jars/{jar_id}/plan"
  63. response = requests.get(url)
  64. entry_class = response.json()["entryClass"]
  65. url = self.host + f"/jars/${jar_id}/run"
  66. json_data = {
  67. "entryClass": entry_class,
  68. "programArgs": program_args
  69. }
  70. response = requests.post(url, json=json_data)
  71. return response.json()
  72. def print_json_pretty(obj):
  73. print(json.dumps(obj, indent=4, ensure_ascii=False))
  74. if __name__ == '__main__':
  75. # flink = MyFlink("https://flink.rxdpdev.k5.bigtree.tech")
  76. # print(flink.upload_jar("/Users/alvin/bigtree/rxdp-dm-jobs/rxdp-dm-jobs-flink/target/rxdp-dm-jobs-flink-1.0-SNAPSHOT.jar"))
  77. # print_json_pretty(flink.list_jars())
  78. flink = MyFlink("http://121.37.84.96:8082")
  79. jars = flink.list_jars()
  80. # print(jars)
  81. # for jar in jars:
  82. # jar_id = jar['id']
  83. # if "SNAPSHOT" not in jar_id:
  84. # print(jar_id)
  85. # print(flink.delete_jar(jar_id))
  86. # print(flink.delete_jar("e9d0ec97-bcfd-4501-848e-23be742c5038_main.jar"))