KafkaService.php 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Bear
  5. * Date: 2020/5/6
  6. * Time: 下午2:10
  7. */
  8. namespace app\main\service;
  9. use app\main\model\object\BaseObject;
  10. use think\Config;
  11. use RdKafka\Conf;
  12. use RdKafka\Producer;
  13. use RdKafka\Consumer;
  14. /**
  15. * Class KafkaService
  16. * @package app\main\service
  17. */
  18. class KafkaService extends BaseService
  19. {
  20. /**
  21. * @var KafkaService
  22. */
  23. protected static $self = null;
  24. /**
  25. * @return BaseService|KafkaService
  26. */
  27. public static function instance()
  28. {
  29. if (self::$self == null) {
  30. self::$self = new self();
  31. }
  32. return self::$self;
  33. }
  34. /**
  35. * @param $msg
  36. * @return \app\main\model\object\ReturnObject
  37. */
  38. public function produceMsg($msg)
  39. {
  40. return $this->setData(true)->getReturn();
  41. }
  42. /**
  43. * 低级消费者
  44. */
  45. public function consumeMsg()
  46. {
  47. try {
  48. $setting = Config::get('kafka');
  49. $conf = new Conf();
  50. $conf->set('log_level', (string)LOG_INFO);
  51. // $conf->set('debug', 'all');
  52. $rk = new Consumer($conf);
  53. $rk->addBrokers($setting['servers']);
  54. $topic = $rk->newTopic($setting['topic']);
  55. $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
  56. while (true) {
  57. // The first argument is the partition (again).
  58. // The second argument is the timeout.
  59. $msg = $topic->consume(0, 1000);
  60. if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
  61. // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
  62. continue;
  63. } elseif ($msg->err) {
  64. LogService::error($msg->errstr());
  65. break;
  66. } else {
  67. LogService::info("get kafka msg:".$msg->payload);
  68. }
  69. }
  70. } catch (\Exception $e) {
  71. LogService::error($e->getMessage());
  72. }
  73. }
  74. /**
  75. * 高级消费者
  76. * @throws \RdKafka\Exception
  77. */
  78. public function consumeHigh()
  79. {
  80. $setting = Config::get('kafka');
  81. $conf = new \RdKafka\Conf();
  82. /**
  83. * 切换分片
  84. */
  85. $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
  86. switch ($err) {
  87. case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
  88. $kafka->assign($partitions);
  89. break;
  90. case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
  91. $kafka->assign(NULL);
  92. break;
  93. default:
  94. throw new \Exception($err);
  95. }
  96. });
  97. $conf->set('group.id', 'myConsumerGroup');
  98. $conf->set('metadata.broker.list', $setting['servers']);
  99. $conf->set('auto.offset.reset', 'smallest');
  100. $consumer = new \RdKafka\KafkaConsumer($conf);
  101. $consumer->subscribe([$setting['topic']]);
  102. while (true) {
  103. $message = $consumer->consume(5000);
  104. if ($message->payload) {
  105. LogService::debug('partition:'.$message->partition);
  106. LogService::debug('offset:'.$message->offset);
  107. LogService::debug('get kafka data:'.$message->payload);
  108. }
  109. switch ($message->err) {
  110. case RD_KAFKA_RESP_ERR_NO_ERROR:
  111. LogService::debug($message->errstr());
  112. break;
  113. case RD_KAFKA_RESP_ERR__PARTITION_EOF:
  114. break;
  115. case RD_KAFKA_RESP_ERR__TIMED_OUT:
  116. break;
  117. default:
  118. throw new \Exception($message->errstr(), $message->err);
  119. break;
  120. }
  121. }
  122. }
  123. }