setData(true)->getReturn(); } /** * 低级消费者 */ public function consumeMsg() { try { $setting = Config::get('kafka'); $conf = new Conf(); $conf->set('log_level', (string)LOG_INFO); // $conf->set('debug', 'all'); $rk = new Consumer($conf); $rk->addBrokers($setting['servers']); $topic = $rk->newTopic($setting['topic']); $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); while (true) { // The first argument is the partition (again). // The second argument is the timeout. $msg = $topic->consume(0, 1000); if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) { // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead. continue; } elseif ($msg->err) { LogService::error($msg->errstr()); break; } else { LogService::info("get kafka msg:".$msg->payload); } } } catch (\Exception $e) { LogService::error($e->getMessage()); } } /** * 高级消费者 * @throws \RdKafka\Exception */ public function consumeHigh() { $setting = Config::get('kafka'); $conf = new \RdKafka\Conf(); /** * 切换分片 */ $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: $kafka->assign(NULL); break; default: throw new \Exception($err); } }); $conf->set('group.id', 'myConsumerGroup'); $conf->set('metadata.broker.list', $setting['servers']); $conf->set('auto.offset.reset', 'smallest'); $consumer = new \RdKafka\KafkaConsumer($conf); $consumer->subscribe([$setting['topic']]); while (true) { $message = $consumer->consume(5000); if ($message->payload) { LogService::debug('partition:'.$message->partition); LogService::debug('offset:'.$message->offset); LogService::debug('get kafka data:'.$message->payload); } switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: LogService::debug($message->errstr()); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: break; case RD_KAFKA_RESP_ERR__TIMED_OUT: break; default: throw new \Exception($message->errstr(), $message->err); break; } } } }