123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- <?php
- /**
- * Created by: PhpStorm
- * User: lytian
- * Date: 2019/10/31
- * Time: 19:14
- */
- namespace app\admin\command;
- use app\common\library\Redis;
- use app\main\constants\CustomConstants;
- use think\Config;
- use think\console\Command;
- use think\console\Input;
- use think\console\Output;
- use think\Db;
- use think\Env;
- use think\Log;
- use think\Request;
- class FlushStatisticalDataToDb extends Command
- {
- const CUSTOM_MSG_TYPE_UV = 1;
- const CUSTOM_MSG_TYPE_PAY = 2;
- protected function configure()
- {
- $this->setName('FlushStatisticalDataToDb')
- ->setDescription('客服消息统计数据入库');
- }
- protected function execute(Input $input, Output $output)
- {
- Request::instance()->module('admin'); //cli模式下无法获取到当前的项目模块,手动指定一下
- $this->flushCustomToDb();
- }
- /**
- * 更新客服消息统计
- */
- private function flushCustomToDb(){
- $this->handleQueueTask('flushCustomToDb',"collect_custom_list_new", function($result){
- //使用主库的从库
- $mainSlaveDbConfig = $this->getMainSlaveDbConfig();
- foreach($result as $custom_id => $list){
- foreach($list as $custom_idx => $values){
- $rowRes = Db::connect($mainSlaveDbConfig)->query("select id from custom_url_collect where custom_id = {$custom_id} and idx = {$custom_idx} ");
- if($rowRes){
- //存在更新数据
- $id = $rowRes[0]['id'];
- $updateData = [];
- foreach($values as $d_key => $d_val){
- if($d_key == 'uv'){
- $updateData['uv'] = $d_val;
- }else{
- $updateData[$d_key] = ['exp', "($d_key+$d_val)"];
- }
- }
- if ($updateData) {
- $updateData['updatetime'] = date("Y-m-d H:i:s");
- Log::info($updateData);
- if(model("CustomUrlCollect")->update($updateData,['id' => $id])){
- Log::info("flushCustomToDb: custom_id:{$custom_id} custom_idx:{$custom_idx} Data:".json_encode($values)." Update Db Success");
- }else{
- Log::error("flushCustomToDb: custom_id:{$custom_id} custom_idx:{$custom_idx} Data:".json_encode($values)." Update Db Fail");
- }
- }
- }else{
- $item['custom_id'] = $custom_id;
- $item['idx'] = $custom_idx;
- $item = array_merge($item,$values);
- $res = model("CustomUrlCollect")->allowField(true)->insert($item);
- if($res){
- Log::info("flushCustomToDb: custom_id:{$custom_id} custom_idx:{$custom_idx} Data:".json_encode($item)." Insert Db Success");
- }else{
- Log::error("flushCustomToDb: custom_id:{$custom_id} custom_idx:{$custom_idx} Data:".json_encode($item)." Insert Db Fail");
- }
- }
- }
- }
- unset($result);
- },function($result){
- $data = [];
- foreach($result as $message){
- Log::info("flushCustomToDb fromredis:".$message);
- if(!$custom_info = json_decode($message,true)){
- continue;
- }
- $custom_id = $custom_info['custom_id'];
- $custom_idx = $custom_info['idx'];
- //UV处理
- if(intval($custom_info['type']) == self::CUSTOM_MSG_TYPE_UV ){
- $uv_key = CustomConstants::getCustomUvKey($custom_id,$custom_idx);
- $custom_uv = Redis::instance()->pfcount($uv_key) ?? 0;
- $data[$custom_id][$custom_idx]['uv'] = $custom_uv;
- }
- //充值处理
- if(intval($custom_info['type']) == self::CUSTOM_MSG_TYPE_PAY ){
- //充值金额
- if(isset($data[$custom_id][$custom_idx]['recharge_money'])){
- $data[$custom_id][$custom_idx]['recharge_money'] += $custom_info['recharge_money'];
- }else{
- $data[$custom_id][$custom_idx]['recharge_money'] = $custom_info['recharge_money'];
- }
- //充值笔数
- if(isset($data[$custom_id][$custom_idx]['recharge_orders'])){
- $data[$custom_id][$custom_idx]['recharge_orders'] += $custom_info['recharge_orders'];
- }else{
- $data[$custom_id][$custom_idx]['recharge_orders'] = $custom_info['recharge_orders'];
- }
- }
- }
- unset($result);
- return $data;
- });
- }
- /**
- * 处理队列
- * @param $title
- * @param $keyName
- * @param callable $writeDb
- * @param callable $formatData
- */
- private function handleQueueTask($title,$keyName,callable $writeDb,callable $formatData = null){
- try {
- //获取队列长度
- $list_size = Redis::instance()->lLen($keyName);
- Log::info("{$title}: Timer Task Sizes:{$list_size} Start");
- //size不为0时处理
- if($list_size){
- //获取队列数据,数据为空时直接返回
- if (!$result = $this->getRedisListData($keyName)) {
- return;
- }
- //格式化数据
- if($formatData && is_callable($formatData)){
- if(!$result = call_user_func($formatData,$result)){
- return;
- }
- }
- //调用回调函数写入数据库
- if($writeDb && is_callable($writeDb)){
- call_user_func($writeDb,$result);
- }
- //获取队列长度
- $list_size = Redis::instance()->lLen($keyName);
- }
- Log::info("{$title}: Timer Task Size:{$list_size} End");
- }catch (\Exception $e){
- $err_info = ['code' => $e->getCode(), 'msg' => $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()];
- Log::error("{$title}: Error:".json_encode($err_info,JSON_UNESCAPED_UNICODE));
- }catch (\Throwable $e){
- $err_info = ['code' => $e->getCode(), 'msg' => $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()];
- Log::error("{$title}: Error:".json_encode($err_info,JSON_UNESCAPED_UNICODE));
- }
- }
- /**
- * 获取队列里面的数据,请注意入队使用rpush
- * @param string $key_name 队列名称
- * @param int $limit 获取队列前面多少条,并删除
- * @return array
- * @throws \Exception
- */
- private function getRedisListData($key_name,$limit = 5000){
- $result = Redis::instance()->lRange($key_name,0,$limit);
- //删除时可能不够5000条,按取出来的条数进行删除
- Redis::instance()->lTrim($key_name,count($result),-1);
- return $result;
- }
- /**
- * 获取主库的从库配置 从库不存在返回主库
- * @return array
- */
- private function getMainSlaveDbConfig()
- {
- $hostArr = explode(',', Env::get('database.admin_hostname'));
- $portArr = explode(',', Env::get('database.admin_hostport', '3306,3306'));
- //默认主库
- $mainDbConfig = array_merge(Config::get("database"), ['hostname' =>$hostArr[0], 'hostport' => $portArr[0]]);
- if (count($hostArr) >= 2) {
- //从库
- $mainDbConfig = array_merge(Config::get("database"), ['hostname' =>$hostArr[1], 'hostport' => $portArr[1]]);
- }
- return $mainDbConfig;
- }
- }
|