type; if (is_array($type)) { foreach ($type as $t) { $analysisObject->type = $t; $data = $this->processMsg($user_id, $analysisObject)->data; KafkaService::instance()->produceMsg($data); } } else { $data = $this->processMsg($user_id, $analysisObject)->data; KafkaService::instance()->produceMsg($data); } }catch (\Exception $e) { LogService::error($e->getMessage()); } } /** * 获取发送数据 * @param $user_id int * @param AnalysisObject $analysisObject * @return \app\main\model\object\ReturnObject */ public function processMsg($user_id, AnalysisObject $analysisObject) { $aUser = UserService::instance()->getUserModel()->getUserInfo($user_id); $oUser = (new UserObject())->bind($aUser); $analysisObject->send_time = time(); $analysisObject->user = $this->getUser($oUser)->data; if (!$analysisObject->data) { $analysisObject->data = NULL; } if (!$analysisObject->user_from) { $analysisObject->user_from = NULL; } $post = $analysisObject->toArray(); return $this->setData($post)->getReturn(); } /** * 获取打点用户信息 * @param UserObject $userObject * @return \app\main\model\object\ReturnObject */ public function getUser(UserObject $userObject) { $user = [ 'id' => $userObject->id, ]; $groupId = AdminService::instance()->getAuthGroupAccessModel()->getGroupId($userObject->channel_id); if ($groupId == AdminConstants::ADMIN_GROUP_ID_AGENT) { $adminExtend = AdminService::instance()->getAdminExtendModel()->getInfo($userObject->channel_id); $channelId = $adminExtend['create_by']; $user['channel_id'] = $adminExtend['create_by']; $user['agent_id'] = $channelId; $user['peihao'] = 1; } else { $user['channel_id'] = $userObject->channel_id; $user['agent_id'] = $userObject->agent_id; $user['peihao'] = 0; } $user['is_new'] = (int)($userObject->createtime > strtotime(date('Y-m-d'))); return $this->setData($user)->getReturn(); } }