setName('ssdbToMysql') ->addOption('type', 't', Option::VALUE_OPTIONAL, '要处理的缓存类型,all:默认处理全部,quv:二维码UV统计,uc:用户统计,bc:小说统计:', 'all') ->addOption('refresh','r',Option::VALUE_OPTIONAL,'刷新Ssdb数据到Redis,quv:自定义二维码刷新到redis,pay:刷新渠道新增付费用户到Mysql,ref:刷新推广链接当天金额') ->addOption('process','p',Option::VALUE_OPTIONAL,'进程数,只对ref起作用') ->setDescription('将ssdb缓存中的数据存入mysql数据库'); } protected function execute(Input $input, Output $output) { Request::instance()->module('admin'); //cli模式下无法获取到当前的项目模块,手动指定一下 $process = $input->getOption('process'); if($refresh = $input->getOption('refresh')){ switch ($refresh){ case 'quv': $this->refresh_quv($input,$output); break; case 'kl': $this->refresh_kl($input,$output); break; case 'pay': $this->refresh_pay($input,$output); break; case 'ref': Redis::instance()->del("fork_ref_lock"); $this->refresh_ref_money($input,$output,intval($process)); break; default: $output->writeln("Type: {$refresh} 无法识别的类型"); break; } }else{ $type = $input->getOption('type'); switch ($type) { case 'all': $this->quv($input,$output); $this->uc($input,$output); $this->bc($input,$output); break; case 'quv': $this->quv($input, $output); break; case 'uc': $this->uc($input,$output); break; case 'bc': $this->bc($input,$output); break; default: $output->writeln("Type: {$type} 无法识别的类型"); } } } private function refresh_quv(Input $input,Output $output){ try { $output->writeln("刷新自定义二维码UV统计---开始"); Log::info("刷新自定义二维码UV统计---开始"); $ssdb = Ssdb::instance(); $redis = Redis::instance(); Db::table('custom_qrcode')->chunk(1000, function ($result) use ($redis,$ssdb,$output) { foreach ($result as $val) { $hash_key = "quv:{$val['admin_id']}:{$val['index']}"; $redis_key = "QR_UV:{$val['admin_id']}:{$val['index']}"; while($list = $ssdb->hkeys($hash_key,'','',2000)){ foreach($list as $open_id){ $redis->pfAdd($redis_key,$open_id); $ssdb->hdel($hash_key,$open_id); $output->info("刷新自定义二维码UV统计: id:{$val['id']} admin_id:{$val['admin_id']} index:{$val['index']} open_id:{$open_id}"); Log::info("刷新自定义二维码UV统计: id:{$val['id']} admin_id:{$val['admin_id']} index:{$val['index']} open_id:{$open_id}"); } } $output->info("刷新自定义二维码UV统计: id:{$val['id']} admin_id:{$val['admin_id']} index:{$val['index']} refresh success"); Log::info("刷新自定义二维码UV统计: id:{$val['id']} admin_id:{$val['admin_id']} index:{$val['index']} refresh success"); } }); $output->info("刷新自定义二维码UV统计---完成"); Log::info("刷新自定义二维码UV统计---完成"); }catch (\Exception $e){ $output->writeln('刷新自定义二维码UV统计---失败' . $e->getMessage()); Log::error('刷新自定义二维码UV统计---失败' . $e->getMessage()); } } private function refresh_kl(Input $input,Output $output){ try { $output->writeln("刷新KL信息到Redis---开始"); Log::info("刷新KL信息到Redis---开始"); $ssdb = Ssdb::instance(); $redis = Redis::instance(); Db::table('admin_config')->chunk(1000,function($result) use($redis,$ssdb,$output){ foreach($result as $channel){ //转移AV $av = $ssdb->get("AV:{$channel['admin_id']}"); $redis->set("KAV:{$channel['admin_id']}",intval($av)); //转移AN $an = $ssdb->get("AN:{$channel['admin_id']}"); $redis->set("KAN:{$channel['admin_id']}",intval($an)); //转移CV $cv = $ssdb->get("CV:{$channel['admin_id']}"); $redis->set("KCV:{$channel['admin_id']}",intval($cv)); //转移CN $cn = $ssdb->get("CN:{$channel['admin_id']}"); $redis->set("KCN:{$channel['admin_id']}",intval($cn)); //转移DM $dm = $ssdb->get("DM:{$channel['admin_id']}"); $redis->set("KDM:{$channel['admin_id']}",intval($dm)); //转移AM $am = $ssdb->get("AM:{$channel['admin_id']}"); $redis->set("KAM:{$channel['admin_id']}",intval($am)); $output->info("刷新KL信息到Redis: admin_id:{$channel['admin_id']} AV:{$av} AN:{$an} CV:{$cv} CN:{$cn} DM:{$dm} AM:{$am} success"); Log::info("刷新KL信息到Redis: admin_id:{$channel['admin_id']} AV:{$av} AN:{$an} CV:{$cv} CN:{$cn} DM:{$dm} AM:{$am} success"); } }); $output->info("刷新KL信息到Redis---完成"); Log::info("刷新KL信息到Redis---完成"); }catch (\Exception $e){ $output->writeln('刷新KL信息到Redis---失败' . $e->getMessage()); Log::error('刷新KL信息到Redis---失败' . $e->getMessage()); } } private function refresh_ref_cr(Input $input,Output $output){ //CR:推广链接ID --> KCR //IG: ---> KIP //IU: ---> USER is_white //M-T: ---> M-T: //推广链接计数 } private function refresh_ref_money(Input $input,Output $output,$process){ //$dayMTkey = "M-T:".$v['id'].":".date("d"); } private function refresh_pay(Input $input,Output $output){ $output->writeln("刷新渠道付费用户到mysql---开始"); Log::info("刷新渠道付费用户到mysql---开始"); $ssdb = Ssdb::instance(); Db::table('admin_config')->chunk(1000,function($result) use($ssdb,$output){ foreach($result as $channel){ $hash_key = "U-C:".date("d").":".$channel['admin_id']; $pay = $ssdb->hget($hash_key,'P'); $user_collect = model('UserCollect') ->where('admin_id',$channel['admin_id']) ->where('createdate',date('Ymd')) ->where('type',1)->find(); if($user_collect){ model('UserCollect') ->where('admin_id',$channel['admin_id']) ->where('createdate',date('Ymd')) ->where('type',1) ->update(['increase_recharge'=>intval($pay)]); $output->info("刷新KL信息到Redis: update admin_id:{$channel['admin_id']} Pay:{$pay} success"); Log::info("刷新KL信息到Redis: update admin_id:{$channel['admin_id']} Pay:{$pay} success"); }else{ $data = [ 'createdate' => date('Ymd'), 'type' => 1, 'admin_id' => $channel['admin_id'], 'increase_recharge' => intval($pay) ]; model('UserCollect')->insert($data); $output->info("刷新KL信息到Redis: insert admin_id:{$channel['admin_id']} Pay:{$pay} success"); Log::info("刷新KL信息到Redis: insert admin_id:{$channel['admin_id']} Pay:{$pay} success"); } } }); //统计管理员的 $admin_hash_key = "U-C:".date("d").":0"; $admin_pay = $ssdb->hget($admin_hash_key,'P'); $admin_user_collect = model('UserCollect') ->where('admin_id',0) ->where('createdate',date('Ymd')) ->where('type',1)->find(); if($admin_user_collect){ model('UserCollect') ->where('admin_id',0) ->where('createdate',date('Ymd')) ->where('type',1) ->update(['increase_recharge'=>intval($admin_pay)]); $output->info("刷新KL信息到Redis: update admin_id:0 Pay:{$pay} success"); Log::info("刷新KL信息到Redis: update admin_id:0 Pay:{$pay}} success"); }else{ $data = [ 'createdate' => date('Ymd'), 'type' => 1, 'admin_id' => 0, 'increase_recharge' => intval($admin_pay) ]; model('UserCollect')->insert($data); $output->info("刷新KL信息到Redis: insert admin_id:0 Pay:{$admin_pay} success"); Log::info("刷新KL信息到Redis: insert admin_id:0 Pay:{$admin_pay} success"); } } /** * 用户统计 * @param Input $input * @param Output $output */ private function uc(Input $input, Output $output){ $output->info("SSDB_TO_MYSQL uc info start run"); try{ $ssdb = Ssdb::instance(); $userCollect = new UserCollect(); $unixTime = time(); $timed = date("d",strtotime("-1 day")); $timeYmd = date("Ymd",strtotime("-1 day")); $adminConfig = new AdminConfig(); if(!$channel_list = $adminConfig->select()){ return; } $insert_sql = "INSERT INTO `user_collect`(`admin_id` , `type` , `createdate` , `increase_f` , `increase_m` , `increase_fllow` , `increase` , `increase_recharge`, `createtime`, `updatetime`) VALUES"; $update_sql = "UPDATE `user_collect` SET `increase_f` = (CASE `id` %s END), `increase_m` = (CASE `id` %s END), `increase_fllow` = (CASE `id` %s END), `increase` = (CASE `id` %s END), `increase_recharge` = (CASE `id` %s END) , `updatetime` = {$unixTime}"; $update_ids = []; $update_where = []; $is_insert = false; $insert_sql_array = []; //添加管理员 array_push($channel_list,['admin_id'=>0]); //获取统计数据 foreach($channel_list as $channel){ $hash_name = 'U-C:'.$timed.':'.$channel['admin_id']; $channel_map = ['admin_id'=>$channel['admin_id'],'type'=>1,'createdate'=>$timeYmd]; //获取关注用户数量 $followUnknown = intval($ssdb->hget($hash_name,'F:0')); $followBoy = intval($ssdb->hget($hash_name,'F:1')); $followGirl = intval($ssdb->hget($hash_name,'F:2')); //获取新增用户数量 $increaseUnknown = intval($ssdb->hget($hash_name,'A:0')); $increaseBoy = intval($ssdb->hget($hash_name,'A:1')); $increaseGirl = intval($ssdb->hget($hash_name,'A:2')); //获取支付用户数量 $pay = intval($ssdb->hget($hash_name,'P')); //删除HASH $ssdb->hclear($hash_name); if(empty($followUnknown) && empty($followBoy) && empty($followGirl) && empty($increaseUnknown) && empty($increaseBoy) && empty($increaseGirl) && empty($pay)){ //都为空时,不处理 continue; }else{ $channel_params = [ 'increase_f' => $followGirl + $increaseGirl, 'increase_m' => $followBoy + $increaseBoy, 'increase_fllow' => $followUnknown + $followGirl + $followBoy, 'increase' => $increaseUnknown + $increaseGirl + $increaseBoy, 'increase_recharge' => $pay ]; Log::info("SSDB_TO_MYSQL->UC->SSDB: admin_id:{$channel['admin_id']} createdate:{$timeYmd} data:".json_encode($channel_params)); if($collect = $userCollect->where($channel_map)->find()){ array_push($update_ids,$collect['id']); foreach($channel_params as $key => $val){ if(!isset($update_where[$key])){ $update_where[$key] = sprintf(" WHEN %d THEN `{$key}`+%d",$collect['id'],$val); }else{ $update_where[$key] .= sprintf(" WHEN %d THEN `{$key}`+%d",$collect['id'],$val); } } }else{ $is_insert = true; array_push($insert_sql_array,vsprintf(" (%d , %d , '%s' , %d , %d , %d , %d , %d , %d , %d)", array_merge($channel_map,$channel_params,array('createtime'=>time(),'updatetime'=>time())))); } } } //执行SQL插入语句 if($is_insert){ if($exec_array = array_chunk($insert_sql_array,10)){ foreach($exec_array as $value){ Log::info("SSDB_TO_MYSQL uc insert SQL: ".$insert_sql.' '.implode(',',$value)); if(!$userCollect->execute($insert_sql.' '.implode(',',$value))){ $output->error("SSDB_TO_MYSQL uc error INSERT_SQL execution failed"); Log::error("SSDB_TO_MYSQL uc error INSERT_SQL execution failed"); }else{ $output->info("SSDB_TO_MYSQL uc info INSERT_SQL execution success"); Log::info("SSDB_TO_MYSQL uc info INSERT_SQL execution success"); } } } } //执行更新语句 if(!empty($update_ids)){ $update_sql = vsprintf($update_sql,$update_where)." WHERE id IN (".implode(',', $update_ids).")"; Log::info("SSDB_TO_MYSQL uc update SQL: ".$update_sql); if(false === $userCollect->execute($update_sql)){ $output->error("SSDB_TO_MYSQL uc error UPDATE_SQL execution failed"); Log::error("SSDB_TO_MYSQL uc error UPDATE_SQL execution failed"); }else{ $output->info("SSDB_TO_MYSQL uc info UPDATE_SQL execution success"); Log::info("SSDB_TO_MYSQL uc info UPDATE_SQL execution success"); } } $output->info("SSDB_TO_MYSQL uc info end run"); }catch (\Exception $e){ $output->error('SSDB_TO_MYSQL uc error '.$e->getMessage()); } } /** * 二维码关注统计 * @param Input $input * @param Output $output */ private function quv(Input $input, Output $output){ $output->info("SSDB_TO_MYSQL quv info start run"); try{ $ssdb = Ssdb::instance(); $coustomQrcode = new CustomQrcode(); $source = $coustomQrcode->field('id,admin_id,index')->select(); $update_sql = "UPDATE `custom_qrcode` SET `UV` = CASE `id` "; if($source){ $ids = array(); foreach($source as $val){ $key = 'quv:'.$val['admin_id'].':'.$val['index']; $uv = $ssdb->hsize($key); if($uv != 0){ $update_sql .= sprintf("WHEN %d THEN %d ", $val['id'], $uv); array_push($ids,$val['id']); } } if($ids){ $update_sql .= "END WHERE id IN (".implode(',', $ids).")"; Log::info("SSDB_TO_MYSQL quv update SQL: ".$update_sql); if(false === $coustomQrcode->execute($update_sql)){ $output->error("SSDB_TO_MYSQL quv error UPDATE_SQL execution failed"); Log::error("处理二维码关注统计---写入数据库失败\r\nSQL:".$update_sql); }else{ $output->info("SSDB_TO_MYSQL quv info UPDATE_SQL execution success"); Log::info("SSDB_TO_MYSQL quv info UPDATE_SQL execution success"); } } } $output->info("SSDB_TO_MYSQL quv info end run"); }catch (\Exception $e){ $output->error('SSDB_TO_MYSQL quv error '.$e->getMessage()); } } }