tianyun há 3 anos atrás
pai
commit
872fccd3fe
1 ficheiros alterados com 11 adições e 10 exclusões
  1. 11 10
      tmp/kafka/kafkaReceiveTest.py

+ 11 - 10
tmp/kafka/kafkaReceiveTest.py

@@ -1,18 +1,19 @@
 from kafka import KafkaConsumer
 from kafka import KafkaConsumer
 
 
-# consumer = KafkaConsumer('eip_rec_behave',
-#                          group_id='test1',  # 一个组消费一次
-#                          auto_offset_reset='earliest',  # 从最新数据读取,earliest,latest
-#                          enable_auto_commit=False,  # 关闭自动提醒
-#                          bootstrap_servers=['eip-kafka-2.qa.mlamp.cn']
-#                          )
-# for msg in consumer:
-#     print(msg.value)
-#     consumer.commit()
 consumer = KafkaConsumer('eip_rec_behave',
 consumer = KafkaConsumer('eip_rec_behave',
                          group_id='test1',  # 一个组消费一次
                          group_id='test1',  # 一个组消费一次
-                         auto_offset_reset='latest',  # 从最新数据读取,earliest,latest
+                         auto_offset_reset='earliest',  # 从最新数据读取,earliest,latest
+                         enable_auto_commit=False,  # 关闭自动提醒
                          bootstrap_servers=['eip-kafka-2.qa.mlamp.cn']
                          bootstrap_servers=['eip-kafka-2.qa.mlamp.cn']
                          )
                          )
 for msg in consumer:
 for msg in consumer:
     print(msg.value)
     print(msg.value)
+    consumer.commit()
+
+# consumer = KafkaConsumer('eip_rec_behave',
+#                          group_id='test1',  # 一个组消费一次
+#                          auto_offset_reset='latest',  # 从最新数据读取,earliest,latest
+#                          bootstrap_servers=['eip-kafka-2.qa.mlamp.cn']
+#                          )
+# for msg in consumer:
+#     print(msg.value)