Pārlūkot izejas kodu

添加kafka、es demo

tianyun 3 gadi atpakaļ
vecāks
revīzija
723b35a73e
6 mainītis faili ar 135 papildinājumiem un 0 dzēšanām
  1. 1 0
      .gitignore
  2. 29 0
      tmp/es/es.py
  3. 49 0
      tmp/es/es_http.py
  4. 24 0
      tmp/kafka/AKafkaPushTest.py
  5. 14 0
      tmp/kafka/KafkaPushTest.py
  6. 18 0
      tmp/kafka/kafkaReceiveTest.py

+ 1 - 0
.gitignore

@@ -0,0 +1 @@
+*.iml

+ 29 - 0
tmp/es/es.py

@@ -0,0 +1,29 @@
+import datetime
+import json
+
+from elasticsearch import Elasticsearch
+from elasticsearch import helpers
+
+index_name = 'voc_feed_card_dev'
+es = Elasticsearch(
+    ['http://eip-es-2.qa.mlamp.cn:9200/'],
+    http_auth=('voc_feed_read', 'Voc_feed_read@123456'),
+    port=9200,
+    use_ssl=False
+)
+print("索引情况")
+print(es.cat.indices())
+#
+print("mapping情况")
+res = es.indices.get_mapping(index=index_name)
+print(json.dumps(res, indent=2))
+
+print("查询情况")
+body = {
+    "query": {
+        "match_all": {}
+    }
+}
+
+res = es.search(index=index_name, doc_type="_doc", body=body)
+print(json.dumps(res, indent=2))

+ 49 - 0
tmp/es/es_http.py

@@ -0,0 +1,49 @@
+import base64
+import json
+
+import requests
+
+
+class EsHttp:
+    def __init__(self, host: str, port: int, index_name: str, user_name: str, passwd: str):
+        if not host.startswith("http"):
+            host = "http://" + host
+        self.__url = host + ":" + str(port) + "/" + index_name + "/" + "_search"
+        self.__headers = self.__get_header(user_name, passwd)
+
+    def query(self, data):
+        res = requests.post(self.__url, headers=self.__headers, data=json.dumps(data))
+        return res.json()
+
+    @staticmethod
+    def __get_header(user_name, passwd):
+        """
+        获取访问es的header
+        :param user_name:
+        :param passwd:
+        :return:
+        """
+        headers = {
+            'Authorization': 'Basic cmVjb21tZW5kOlJlY29tbWVuZEAxMjM0NTY=',
+            'Content-Type': 'application/json'
+        }
+        authorization = "Basic " + EsHttp.__get_base64(user_name + ":" + passwd)
+        headers['Authorization'] = authorization
+        return headers
+
+    @staticmethod
+    def __get_base64(string):
+        return base64.b64encode(string.encode()).decode()
+
+
+if __name__ == '__main__':
+    es_http = EsHttp("http://eip-es-2.qa.mlamp.cn", 9200, "voc_feed_card", 'voc_feed_read', 'Voc_feed_read@123456')
+    print(es_http.query({
+        "query": {
+            "match_all": {
+            }
+        },
+        "from": 0,
+        "size": 1000
+    }))
+    print(123)

+ 24 - 0
tmp/kafka/AKafkaPushTest.py

@@ -0,0 +1,24 @@
+# class AsyncKafka(CfgBaseInit):
+#     """异步请求模块
+#     """
+#     _async_kafka_session = None
+#     _instance_lock = threading.Lock()
+#
+#     @classmethod
+#     async def __set_async_session(cls):
+#         if not cls._async_kafka_session:
+#             with cls._instance_lock:
+#                 producer = AIOKafkaProducer(loop=cls.get_my_event_loop(),
+#                                             value_serializer=lambda v: json.dumps(v).encode('utf-8'),
+#                                             bootstrap_servers=CfgBaseInit.kafka_server)
+#                 await producer.start()
+#                 cls._async_kafka_session = producer
+#         return cls._async_kafka_session
+#
+#     @classmethod
+#     async def async_push(cls, payload):
+#         """异步请求接口
+#         """
+#         request_session = cls._async_kafka_session or await cls.__set_async_session()
+#         print(request_session)
+#         await request_session.send_and_wait(CfgBaseInit.topic, payload)

+ 14 - 0
tmp/kafka/KafkaPushTest.py

@@ -0,0 +1,14 @@
+import json
+import time
+from kafka import KafkaProducer
+
+producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),
+                         bootstrap_servers='localhost:9092')
+index = 0
+while True:
+    msg = "相信未来----------{}------------".format(index).encode('GBK')  # 发送内容,必须是bytes类型,GBK可显示中文
+    time.sleep(5)  # 每个5秒推送一条数据
+    index += 1
+    print(msg.decode('GBK'))  # 防止中文乱码
+
+    producer.send('test', msg, partition=0)  # 发送的topic为test

+ 18 - 0
tmp/kafka/kafkaReceiveTest.py

@@ -0,0 +1,18 @@
+from kafka import KafkaConsumer
+
+# consumer = KafkaConsumer('eip_rec_behave',
+#                          group_id='test1',  # 一个组消费一次
+#                          auto_offset_reset='earliest',  # 从最新数据读取,earliest,latest
+#                          enable_auto_commit=False,  # 关闭自动提醒
+#                          bootstrap_servers=['eip-kafka-2.qa.mlamp.cn']
+#                          )
+# for msg in consumer:
+#     print(msg.value)
+#     consumer.commit()
+consumer = KafkaConsumer('eip_rec_behave',
+                         group_id='test1',  # 一个组消费一次
+                         auto_offset_reset='latest',  # 从最新数据读取,earliest,latest
+                         bootstrap_servers=['eip-kafka-2.qa.mlamp.cn']
+                         )
+for msg in consumer:
+    print(msg.value)