KafkaDotService.php 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Bear
  5. * Date: 2020/5/7
  6. * Time: 上午11:05
  7. */
  8. namespace app\main\service;
  9. use app\main\constants\AdminConstants;
  10. use app\main\model\object\AnalysisObject;
  11. use app\main\model\object\UserObject;
  12. /**
  13. * Class KafkaDotService
  14. * @package app\main\service
  15. */
  16. class KafkaDotService extends BaseService
  17. {
  18. /**
  19. * @var KafkaDotService
  20. */
  21. protected static $self = null;
  22. /**
  23. * @return BaseService|KafkaDotService
  24. */
  25. public static function instance()
  26. {
  27. if (self::$self == null) {
  28. self::$self = new self();
  29. }
  30. return self::$self;
  31. }
  32. /**
  33. * 发送数据
  34. * @param $user_id
  35. * @param AnalysisObject $analysisObject
  36. */
  37. public function sendMsg($user_id, AnalysisObject $analysisObject)
  38. {
  39. try{
  40. $type = $analysisObject->type;
  41. if (is_array($type)) {
  42. foreach ($type as $t) {
  43. $analysisObject->type = $t;
  44. $data = $this->processMsg($user_id, $analysisObject)->data;
  45. KafkaService::instance()->produceMsg($data);
  46. }
  47. } else {
  48. $data = $this->processMsg($user_id, $analysisObject)->data;
  49. KafkaService::instance()->produceMsg($data);
  50. }
  51. }catch (\Exception $e) {
  52. LogService::error($e->getMessage());
  53. }
  54. }
  55. /**
  56. * 获取发送数据
  57. * @param $user_id int
  58. * @param AnalysisObject $analysisObject
  59. * @return \app\main\model\object\ReturnObject
  60. */
  61. public function processMsg($user_id, AnalysisObject $analysisObject)
  62. {
  63. $aUser = UserService::instance()->getUserModel()->getUserInfo($user_id);
  64. $oUser = (new UserObject())->bind($aUser);
  65. $analysisObject->send_time = time();
  66. $analysisObject->user = $this->getUser($oUser)->data;
  67. if (!$analysisObject->data) {
  68. $analysisObject->data = NULL;
  69. }
  70. if (!$analysisObject->user_from) {
  71. $analysisObject->user_from = NULL;
  72. }
  73. $post = $analysisObject->toArray();
  74. return $this->setData($post)->getReturn();
  75. }
  76. /**
  77. * 获取打点用户信息
  78. * @param UserObject $userObject
  79. * @return \app\main\model\object\ReturnObject
  80. */
  81. public function getUser(UserObject $userObject)
  82. {
  83. $user = [
  84. 'id' => $userObject->id,
  85. ];
  86. $groupId = AdminService::instance()->getAuthGroupAccessModel()->getGroupId($userObject->channel_id);
  87. if ($groupId == AdminConstants::ADMIN_GROUP_ID_AGENT) {
  88. $adminExtend = AdminService::instance()->getAdminExtendModel()->getInfo($userObject->channel_id);
  89. $channelId = $adminExtend['create_by'];
  90. $user['channel_id'] = $adminExtend['create_by'];
  91. $user['agent_id'] = $channelId;
  92. $user['peihao'] = 1;
  93. } else {
  94. $user['channel_id'] = $userObject->channel_id;
  95. $user['agent_id'] = $userObject->agent_id;
  96. $user['peihao'] = 0;
  97. }
  98. $user['is_new'] = (int)($userObject->createtime > strtotime(date('Y-m-d')));
  99. return $this->setData($user)->getReturn();
  100. }
  101. }