flink.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import datetime
  2. import unittest
  3. import requests
  4. class MyFlink:
  5. def __init__(self, host):
  6. self.host = host
  7. # 返回所有 job id
  8. def list_job_id(self):
  9. url = self.host + "/jobs"
  10. response = requests.get(url)
  11. return [job["id"] for job in response.json()['jobs']]
  12. # 查看所有jar包
  13. def list_jobs(self):
  14. url = self.host + "/jobs/overview"
  15. response = requests.get(url)
  16. jobs = response.json()["jobs"]
  17. job_details = []
  18. for job in jobs:
  19. job_id = job["jid"]
  20. job_name = job["name"]
  21. start_time = datetime.datetime.utcfromtimestamp(job["start-time"] / 1000).strftime("%Y-%m-%d %H:%M:%S UTC")
  22. duration = str(datetime.timedelta(milliseconds=job["duration"]))
  23. tasks = job["tasks"]["total"]
  24. status = job["state"]
  25. job_detail = {
  26. "job_id": job_id,
  27. "job_name": job_name,
  28. "start_time": start_time,
  29. "duration": duration,
  30. "tasks": tasks,
  31. "status": status
  32. }
  33. job_details.append(job_detail)
  34. return job_details
  35. # 停止所有job
  36. def stop_all(self):
  37. job_ids = self.list_job_id()
  38. for job_id in job_ids:
  39. self.stop_by_id(job_id)
  40. print(f"停止job:{job_id}")
  41. # 根据job id 停止任务
  42. def stop_by_id(self, job_id):
  43. url = self.host + f"/jobs/{job_id}/stop"
  44. response = requests.post(url)
  45. return response.json()
  46. # 上传jar包
  47. def upload_jar(self, jar_path):
  48. url = self.host + "/jars/upload"
  49. with open(jar_path, 'rb') as jar_file:
  50. files = {'file': jar_file}
  51. response = requests.post(url, files=files, timeout=1000)
  52. print(response.json())
  53. if response.json()['status'] == 'success':
  54. return response.json()["filename"].split("/")[-1]
  55. def list_jars(self):
  56. url = self.host + "/jars"
  57. response = requests.get(url)
  58. return response.json()['files']
  59. def delete_jar(self, jar_id):
  60. url = self.host + "/jars/" + jar_id
  61. response = requests.delete(url)
  62. return response.json()
  63. def start_job(self, jar_id, program_args):
  64. program_args = " ".join(program_args.strip().split())
  65. print(f"program_args 入参:{program_args}")
  66. jars = self.list_jars()
  67. for jar in jars:
  68. if jar['id'] == jar_id:
  69. entry_class = jar['entry'][0]['name']
  70. url = self.host + f"/jars/{jar_id}/run"
  71. json_data = {
  72. "entryClass": entry_class,
  73. "programArgs": program_args
  74. }
  75. response = requests.post(url, json=json_data)
  76. return response.json()
  77. return "未找到jar包"
  78. class MyTest(unittest.TestCase):
  79. def test_stop_all(self):
  80. flink = MyFlink("https://flink.rxdptest.k5.bigtree.tech")
  81. print(flink.stop_all())
  82. def test_upload_start(self):
  83. flink = MyFlink("https://flink.rxdptest.k5.bigtree.tech")
  84. jar_id = flink.upload_jar("/Users/alvin/bigtree/rxdp-dm-jobs/rxdp-dm-jobs-flink/target/rxdp-dm-jobs-flink-1.0-SNAPSHOT.jar")
  85. print(f"jar_id: {jar_id}")
  86. flink.stop_all()
  87. print(flink.start_job(jar_id, """
  88. --kafkaServer kafka-0.kafka-headless.rxdptest.svc.k5.bigtree.zone:9092,kafka-1.kafka-headless.rxdptest.svc.k5.bigtree.zone:9092,kafka-2.kafka-headless.rxdptest.svc.k5.bigtree.zone:9092
  89. --esServer elasticsearch-master.rxdptest.svc.k5.bigtree.zone
  90. --esPort 9200
  91. --pythonUrl http://py-invoke-svc:8000
  92. --redisIp redis-master.rxdptest.svc.k5.bigtree.zone
  93. --redisPassword SB6vdem
  94. """))
  95. if __name__ == '__main__':
  96. pass