import base64 import json import requests class EsHttp: def __init__(self, host: str, port: int, index_name: str, user_name: str, passwd: str): if not host.startswith("http"): host = "http://" + host self.__url = host + ":" + str(port) + "/" + index_name + "/" + "_search" self.__headers = self.__get_header(user_name, passwd) def query(self, data): res = requests.post(self.__url, headers=self.__headers, data=json.dumps(data)) return res.json() @staticmethod def __get_header(user_name, passwd): """ 获取访问es的header :param user_name: :param passwd: :return: """ headers = { 'Authorization': 'Basic cmVjb21tZW5kOlJlY29tbWVuZEAxMjM0NTY=', 'Content-Type': 'application/json' } authorization = "Basic " + EsHttp.__get_base64(user_name + ":" + passwd) headers['Authorization'] = authorization return headers @staticmethod def __get_base64(string): return base64.b64encode(string.encode()).decode() if __name__ == '__main__': es_http = EsHttp("http://eip-es-2.qa.mlamp.cn", 9200, "voc_feed_card", 'voc_feed_read', 'Voc_feed_read@123456') print(es_http.query({ "query": { "match_all": { } }, "from": 0, "size": 1000 })) print(123)