import unittest import requests from util import req, reqGet, reqForm, reqDelete, reqPut def get_token(): headers = { 'authority': 'edsp.rxdpdev.k5.bigtree.tech', 'accept': 'application/json, text/plain, */*', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8', } json_data = { 'username': 'jitiantian', 'password': 'Aa123456', 'token': True, } response = requests.post('https://edsp.rxdpdev.k5.bigtree.tech/api/rxdp-auth-web/auth/login', headers=headers, json=json_data) return response.json()['token'] class MyTest(unittest.TestCase): def setUp(self) -> None: # self.host = "https://gateway.rxdpdev.k5.bigtree.tech/rxdp-tag/rxdp-tag-polymer" self.host = "https://gateway.rxdptest.k5.bigtree.tech/rxdp-tag/rxdp-tag-polymer" # self.host = "http://127.1:8081/rxdp-tag-polymer" self.head = { "Access-Token": get_token() } # 分页 def test_page(self): reqGet(self.host + "/page?page=1&limit=10", None, self.head) # 删除 def test_del(self): reqDelete(self.host + "/5", None, self.head) # 更新状态 def test_update_state(self): reqPut(self.host + "/state?id=6&tagState=1", None, self.head) # 上传配置 def test_upload(self): with open('/Users/alvin/Downloads/______.json', 'rb') as file: files = {'file': file} response = requests.post(self.host + "/importData", files=files, headers=self.head) print(response.json()) # 执行打标 def test_task_exec(self): req(self.host + "/org/tag/sceneTaskCompute", { "sceneTaskCode": "Make_958769264633017865" }) # 新增、更新 def test_upsert(self): req(self.host, { "tagCnName": "juTest", "tagEnName": "juTest", "ruleJson": "{\"id\":\"cb4c9fdc4b43\",\"conjunction\":\"and\",\"children\":[{\"id\":\"fc745095df81\",\"left\":{\"type\":\"field\",\"field\":\"Id\"},\"op\":\"is_not_empty\"}]}", "tagDataSource": "A" }, self.head) def test_upsert2(self): req(self.host, { "tagCnName": "聚合测试", "tagEnName": "jutest", "ruleJson": "{\"id\":\"4a1ac2b8b824\",\"conjunction\":\"and\",\"children\":[{\"id\":\"fa849b3a732d\",\"left\":{\"type\":\"field\",\"field\":\"myEnName\"},\"op\":\"is_not_empty\"}]}", "tagDataSource": "A" }, self.head)