123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- <?php
- /**
- * Created by PhpStorm.
- * User: Bear
- * Date: 2020/5/6
- * Time: 下午2:10
- */
- namespace app\main\service;
- use app\main\model\object\BaseObject;
- use think\Config;
- use RdKafka\Conf;
- use RdKafka\Producer;
- use RdKafka\Consumer;
- /**
- * Class KafkaService
- * @package app\main\service
- */
- class KafkaService extends BaseService
- {
- /**
- * @var KafkaService
- */
- protected static $self = null;
- /**
- * @return BaseService|KafkaService
- */
- public static function instance()
- {
- if (self::$self == null) {
- self::$self = new self();
- }
- return self::$self;
- }
- /**
- * @param $msg
- * @return \app\main\model\object\ReturnObject
- */
- public function produceMsg($msg)
- {
- return $this->setData(true)->getReturn();
- }
- /**
- * 低级消费者
- */
- public function consumeMsg()
- {
- try {
- $setting = Config::get('kafka');
- $conf = new Conf();
- $conf->set('log_level', (string)LOG_INFO);
- // $conf->set('debug', 'all');
- $rk = new Consumer($conf);
- $rk->addBrokers($setting['servers']);
- $topic = $rk->newTopic($setting['topic']);
- $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
- while (true) {
- // The first argument is the partition (again).
- // The second argument is the timeout.
- $msg = $topic->consume(0, 1000);
- if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
- // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
- continue;
- } elseif ($msg->err) {
- LogService::error($msg->errstr());
- break;
- } else {
- LogService::info("get kafka msg:".$msg->payload);
- }
- }
- } catch (\Exception $e) {
- LogService::error($e->getMessage());
- }
- }
- /**
- * 高级消费者
- * @throws \RdKafka\Exception
- */
- public function consumeHigh()
- {
- $setting = Config::get('kafka');
- $conf = new \RdKafka\Conf();
- /**
- * 切换分片
- */
- $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
- switch ($err) {
- case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
- $kafka->assign($partitions);
- break;
- case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
- $kafka->assign(NULL);
- break;
- default:
- throw new \Exception($err);
- }
- });
- $conf->set('group.id', 'myConsumerGroup');
- $conf->set('metadata.broker.list', $setting['servers']);
- $conf->set('auto.offset.reset', 'smallest');
- $consumer = new \RdKafka\KafkaConsumer($conf);
- $consumer->subscribe([$setting['topic']]);
- while (true) {
- $message = $consumer->consume(5000);
- if ($message->payload) {
- LogService::debug('partition:'.$message->partition);
- LogService::debug('offset:'.$message->offset);
- LogService::debug('get kafka data:'.$message->payload);
- }
- switch ($message->err) {
- case RD_KAFKA_RESP_ERR_NO_ERROR:
- LogService::debug($message->errstr());
- break;
- case RD_KAFKA_RESP_ERR__PARTITION_EOF:
- break;
- case RD_KAFKA_RESP_ERR__TIMED_OUT:
- break;
- default:
- throw new \Exception($message->errstr(), $message->err);
- break;
- }
- }
- }
- }
|