setName('FlushStatisticalDataToDb') ->setDescription('客服消息统计数据入库'); } protected function execute(Input $input, Output $output) { Request::instance()->module('admin'); //cli模式下无法获取到当前的项目模块,手动指定一下 $this->flushCustomToDb(); } /** * 更新客服消息统计 */ private function flushCustomToDb(){ $this->handleQueueTask('flushCustomToDb',"collect_custom_list_new", function($result){ //使用主库的从库 $mainSlaveDbConfig = $this->getMainSlaveDbConfig(); foreach($result as $custom_id => $list){ foreach($list as $custom_idx => $values){ $rowRes = Db::connect($mainSlaveDbConfig)->query("select id from custom_url_collect where custom_id = {$custom_id} and idx = {$custom_idx} "); if($rowRes){ //存在更新数据 $id = $rowRes[0]['id']; $updateData = []; foreach($values as $d_key => $d_val){ if($d_key == 'uv'){ $updateData['uv'] = $d_val; }else{ $updateData[$d_key] = ['exp', "($d_key+$d_val)"]; } } if ($updateData) { $updateData['updatetime'] = date("Y-m-d H:i:s"); Log::info($updateData); if(model("CustomUrlCollect")->update($updateData,['id' => $id])){ Log::info("flushCustomToDb: custom_id:{$custom_id} custom_idx:{$custom_idx} Data:".json_encode($values)." Update Db Success"); }else{ Log::error("flushCustomToDb: custom_id:{$custom_id} custom_idx:{$custom_idx} Data:".json_encode($values)." Update Db Fail"); } } }else{ $item['custom_id'] = $custom_id; $item['idx'] = $custom_idx; $item = array_merge($item,$values); $res = model("CustomUrlCollect")->allowField(true)->insert($item); if($res){ Log::info("flushCustomToDb: custom_id:{$custom_id} custom_idx:{$custom_idx} Data:".json_encode($item)." Insert Db Success"); }else{ Log::error("flushCustomToDb: custom_id:{$custom_id} custom_idx:{$custom_idx} Data:".json_encode($item)." Insert Db Fail"); } } } } unset($result); },function($result){ $data = []; foreach($result as $message){ Log::info("flushCustomToDb fromredis:".$message); if(!$custom_info = json_decode($message,true)){ continue; } $custom_id = $custom_info['custom_id']; $custom_idx = $custom_info['idx']; //UV处理 if(intval($custom_info['type']) == self::CUSTOM_MSG_TYPE_UV ){ $uv_key = CustomConstants::getCustomUvKey($custom_id,$custom_idx); $custom_uv = Redis::instance()->pfcount($uv_key) ?? 0; $data[$custom_id][$custom_idx]['uv'] = $custom_uv; } //充值处理 if(intval($custom_info['type']) == self::CUSTOM_MSG_TYPE_PAY ){ //充值金额 if(isset($data[$custom_id][$custom_idx]['recharge_money'])){ $data[$custom_id][$custom_idx]['recharge_money'] += $custom_info['recharge_money']; }else{ $data[$custom_id][$custom_idx]['recharge_money'] = $custom_info['recharge_money']; } //充值笔数 if(isset($data[$custom_id][$custom_idx]['recharge_orders'])){ $data[$custom_id][$custom_idx]['recharge_orders'] += $custom_info['recharge_orders']; }else{ $data[$custom_id][$custom_idx]['recharge_orders'] = $custom_info['recharge_orders']; } } } unset($result); return $data; }); } /** * 处理队列 * @param $title * @param $keyName * @param callable $writeDb * @param callable $formatData */ private function handleQueueTask($title,$keyName,callable $writeDb,callable $formatData = null){ try { //获取队列长度 $list_size = Redis::instance()->lLen($keyName); Log::info("{$title}: Timer Task Sizes:{$list_size} Start"); //size不为0时处理 if($list_size){ //获取队列数据,数据为空时直接返回 if (!$result = $this->getRedisListData($keyName)) { return; } //格式化数据 if($formatData && is_callable($formatData)){ if(!$result = call_user_func($formatData,$result)){ return; } } //调用回调函数写入数据库 if($writeDb && is_callable($writeDb)){ call_user_func($writeDb,$result); } //获取队列长度 $list_size = Redis::instance()->lLen($keyName); } Log::info("{$title}: Timer Task Size:{$list_size} End"); }catch (\Exception $e){ $err_info = ['code' => $e->getCode(), 'msg' => $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()]; Log::error("{$title}: Error:".json_encode($err_info,JSON_UNESCAPED_UNICODE)); }catch (\Throwable $e){ $err_info = ['code' => $e->getCode(), 'msg' => $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()]; Log::error("{$title}: Error:".json_encode($err_info,JSON_UNESCAPED_UNICODE)); } } /** * 获取队列里面的数据,请注意入队使用rpush * @param string $key_name 队列名称 * @param int $limit 获取队列前面多少条,并删除 * @return array * @throws \Exception */ private function getRedisListData($key_name,$limit = 5000){ $result = Redis::instance()->lRange($key_name,0,$limit); //删除时可能不够5000条,按取出来的条数进行删除 Redis::instance()->lTrim($key_name,count($result),-1); return $result; } /** * 获取主库的从库配置 从库不存在返回主库 * @return array */ private function getMainSlaveDbConfig() { $hostArr = explode(',', Env::get('database.admin_hostname')); $portArr = explode(',', Env::get('database.admin_hostport', '3306,3306')); //默认主库 $mainDbConfig = array_merge(Config::get("database"), ['hostname' =>$hostArr[0], 'hostport' => $portArr[0]]); if (count($hostArr) >= 2) { //从库 $mainDbConfig = array_merge(Config::get("database"), ['hostname' =>$hostArr[1], 'hostport' => $portArr[1]]); } return $mainDbConfig; } }