flink.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import datetime
  2. import unittest
  3. import requests
  4. class MyFlink:
  5. def __init__(self, host):
  6. self.host = host
  7. # 返回所有 job id
  8. def list_job_id(self):
  9. url = self.host + "/jobs"
  10. response = requests.get(url)
  11. return [job["id"] for job in response.json()['jobs']]
  12. # 查看所有jar包
  13. def list_jobs(self):
  14. url = self.host + "/jobs/overview"
  15. response = requests.get(url)
  16. jobs = response.json()["jobs"]
  17. job_details = []
  18. for job in jobs:
  19. job_id = job["jid"]
  20. job_name = job["name"]
  21. start_time = datetime.datetime.utcfromtimestamp(job["start-time"] / 1000).strftime("%Y-%m-%d %H:%M:%S UTC")
  22. duration = str(datetime.timedelta(milliseconds=job["duration"]))
  23. tasks = job["tasks"]["total"]
  24. status = job["state"]
  25. job_detail = {
  26. "job_id": job_id,
  27. "job_name": job_name,
  28. "start_time": start_time,
  29. "duration": duration,
  30. "tasks": tasks,
  31. "status": status
  32. }
  33. job_details.append(job_detail)
  34. return job_details
  35. # 停止所有job
  36. def stop_all(self):
  37. job_ids = self.list_job_id()
  38. for job_id in job_ids:
  39. self.stop_by_id(job_id)
  40. # 根据job id 停止任务
  41. def stop_by_id(self, job_id):
  42. url = self.host + f"/jobs/{job_id}/stop"
  43. response = requests.post(url)
  44. return response.json()
  45. # 上传jar包
  46. def upload_jar(self, jar_path):
  47. url = self.host + "/jars/upload"
  48. with open(jar_path, 'rb') as jar_file:
  49. files = {'file': jar_file}
  50. response = requests.post(url, files=files, timeout=1000)
  51. print(response.json())
  52. if response.json()['status'] == 'success':
  53. return response.json()["filename"].split("/")[-1]
  54. def list_jars(self):
  55. url = self.host + "/jars"
  56. response = requests.get(url)
  57. return response.json()['files']
  58. def delete_jar(self, jar_id):
  59. url = self.host + "/jars/" + jar_id
  60. response = requests.delete(url)
  61. return response.json()
  62. def start_job(self, jar_id, program_args):
  63. program_args = " ".join(program_args.strip().split())
  64. print(f"program_args 入参:{program_args}")
  65. jars = self.list_jars()
  66. for jar in jars:
  67. if jar['id'] == jar_id:
  68. entry_class = jar['entry'][0]['name']
  69. url = self.host + f"/jars/{jar_id}/run"
  70. json_data = {
  71. "entryClass": entry_class,
  72. "programArgs": program_args
  73. }
  74. response = requests.post(url, json=json_data)
  75. return response.json()
  76. return "未找到jar包"
  77. class MyTest(unittest.TestCase):
  78. def test_stop_all(self):
  79. flink = MyFlink("https://flink.rxdptest.k5.bigtree.tech")
  80. print(flink.stop_all())
  81. def test_upload_start(self):
  82. flink = MyFlink("https://flink.rxdptest.k5.bigtree.tech")
  83. jar_id = flink.upload_jar("/Users/alvin/bigtree/rxdp-dm-jobs/rxdp-dm-jobs-flink/target/rxdp-dm-jobs-flink-1.0-SNAPSHOT.jar")
  84. print(f"jar_id: {jar_id}")
  85. jar_id = "b33b6c52-0e68-4f11-b6af-aeb7ffb116bf_rxdp-dm-jobs-flink-1.0-SNAPSHOT.jar"
  86. print(flink.start_job(jar_id, """
  87. --kafkaServer kafka-0.kafka-headless.rxdptest.svc.k5.bigtree.zone:9092,kafka-1.kafka-headless.rxdptest.svc.k5.bigtree.zone:9092,kafka-2.kafka-headless.rxdptest.svc.k5.bigtree.zone:9092
  88. --esServer elasticsearch-master.rxdptest.svc.k5.bigtree.zone
  89. --esPort 9200
  90. --pythonUrl http://py-invoke-svc:8000
  91. --redisIp redis-master.rxdptest.svc.k5.bigtree.zone
  92. --redisPassword SB6vdem
  93. """))
  94. if __name__ == '__main__':
  95. pass