1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- import concurrent.futures
- import requests
- import time
- import statistics
- def call_api(url, method, params, json):
- start = time.time()
- response = requests.request(method, url, params=params, json=json)
- # print(response.json())
- end = time.time()
- return (end - start) * 1000, response.status_code, len(response.content)
- def test_api_tps(url, method, params=None, json=None, concurrency=1, duration=1):
- successes = 0
- failures = 0
- times = []
- data_lengths = []
- with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
- start = time.time()
- futures = [executor.submit(call_api, url, method, params, json) for i in range(concurrency)]
- while time.time() - start < duration:
- completed, futures = concurrent.futures.wait(futures, timeout=0)
- for future in completed:
- elapsed, status, length = future.result()
- times.append(elapsed)
- data_lengths.append(length)
- if status == 200:
- successes += 1
- else:
- failures += 1
- futures.add(executor.submit(call_api, url, method, params, json))
- total = successes + failures
- tps = total / duration
- mean = statistics.mean(times)
- min_time = min(times)
- max_time = max(times)
- print(f'TPS:{tps}; ', end=' ')
- print(f'平均时间(ms): {mean:.0f}; ', end=' ')
- print(f'最小时间(ms): {min_time:.0f}; ', end=' ')
- print(f'最大时间(ms): {max_time:.0f}; ', end=' ')
- print(f'成功数: {successes}; ', end=' ')
- print(f'失败数: {failures}; ', end=' ')
- print(f'总数据长度: {sum(data_lengths)}; ', end=' ')
- if __name__ == '__main__':
- test_api_tps('https://httpbin.tianyunperfect.cn/get', 'get', params={"a": 1}, duration=2)
|