tps_util.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. import concurrent.futures
  2. import requests
  3. import time
  4. import statistics
  5. def call_api(url, method, params, json):
  6. start = time.time()
  7. response = requests.request(method, url, params=params, json=json)
  8. # print(response.json())
  9. end = time.time()
  10. return (end - start) * 1000, response.status_code, len(response.content)
  11. def test_api_tps(url, method, params=None, json=None, concurrency=1, duration=1):
  12. successes = 0
  13. failures = 0
  14. times = []
  15. data_lengths = []
  16. with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
  17. start = time.time()
  18. futures = [executor.submit(call_api, url, method, params, json) for i in range(concurrency)]
  19. while time.time() - start < duration:
  20. completed, futures = concurrent.futures.wait(futures, timeout=0)
  21. for future in completed:
  22. elapsed, status, length = future.result()
  23. times.append(elapsed)
  24. data_lengths.append(length)
  25. if status == 200:
  26. successes += 1
  27. else:
  28. failures += 1
  29. futures.add(executor.submit(call_api, url, method, params, json))
  30. total = successes + failures
  31. tps = total / duration
  32. mean = statistics.mean(times)
  33. min_time = min(times)
  34. max_time = max(times)
  35. print(f'TPS:{tps}; ', end=' ')
  36. print(f'平均时间(ms): {mean:.0f}; ', end=' ')
  37. print(f'最小时间(ms): {min_time:.0f}; ', end=' ')
  38. print(f'最大时间(ms): {max_time:.0f}; ', end=' ')
  39. print(f'成功数: {successes}; ', end=' ')
  40. print(f'失败数: {failures}; ', end=' ')
  41. print(f'总数据长度: {sum(data_lengths)}; ', end=' ')
  42. if __name__ == '__main__':
  43. test_api_tps('https://httpbin.tianyunperfect.cn/get', 'get', params={"a": 1}, duration=2)