123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- <?php
- namespace app\admin\command;
- use app\common\library\Redis;
- use app\common\model\CustomQrcode;
- use app\common\model\AdminConfig;
- use app\common\model\Admin;
- use app\common\model\BookCollect;
- use app\common\model\UserCollect;
- use app\common\library\Ssdb;
- use think\console\Command;
- use think\console\Input;
- use think\console\input\Option;
- use think\console\Output;
- use think\Db;
- use think\Log;
- use think\Request;
- class SsdbToMysql extends Command
- {
- protected function configure()
- {
- $this
- ->setName('ssdbToMysql')
- ->addOption('type', 't', Option::VALUE_OPTIONAL, '要处理的缓存类型,all:默认处理全部,quv:二维码UV统计,uc:用户统计,bc:小说统计:', 'all')
- ->addOption('refresh','r',Option::VALUE_OPTIONAL,'刷新Ssdb数据到Redis,quv:自定义二维码刷新到redis,pay:刷新渠道新增付费用户到Mysql,ref:刷新推广链接当天金额')
- ->addOption('process','p',Option::VALUE_OPTIONAL,'进程数,只对ref起作用')
- ->setDescription('将ssdb缓存中的数据存入mysql数据库');
- }
- protected function execute(Input $input, Output $output)
- {
- Request::instance()->module('admin'); //cli模式下无法获取到当前的项目模块,手动指定一下
- $process = $input->getOption('process');
- if($refresh = $input->getOption('refresh')){
- switch ($refresh){
- case 'quv':
- $this->refresh_quv($input,$output);
- break;
- case 'kl':
- $this->refresh_kl($input,$output);
- break;
- case 'pay':
- $this->refresh_pay($input,$output);
- break;
- case 'ref':
- Redis::instance()->del("fork_ref_lock");
- $this->refresh_ref_money($input,$output,intval($process));
- break;
- default:
- $output->writeln("Type: {$refresh} 无法识别的类型");
- break;
- }
- }else{
- $type = $input->getOption('type');
- switch ($type) {
- case 'all':
- $this->quv($input,$output);
- $this->uc($input,$output);
- $this->bc($input,$output);
- break;
- case 'quv':
- $this->quv($input, $output);
- break;
- case 'uc':
- $this->uc($input,$output);
- break;
- case 'bc':
- $this->bc($input,$output);
- break;
- default:
- $output->writeln("Type: {$type} 无法识别的类型");
- }
- }
- }
- private function refresh_quv(Input $input,Output $output){
- try {
- $output->writeln("刷新自定义二维码UV统计---开始");
- Log::info("刷新自定义二维码UV统计---开始");
- $ssdb = Ssdb::instance();
- $redis = Redis::instance();
- Db::table('custom_qrcode')->chunk(1000, function ($result) use ($redis,$ssdb,$output) {
- foreach ($result as $val) {
- $hash_key = "quv:{$val['admin_id']}:{$val['index']}";
- $redis_key = "QR_UV:{$val['admin_id']}:{$val['index']}";
- while($list = $ssdb->hkeys($hash_key,'','',2000)){
- foreach($list as $open_id){
- $redis->pfAdd($redis_key,$open_id);
- $ssdb->hdel($hash_key,$open_id);
- $output->info("刷新自定义二维码UV统计: id:{$val['id']} admin_id:{$val['admin_id']} index:{$val['index']} open_id:{$open_id}");
- Log::info("刷新自定义二维码UV统计: id:{$val['id']} admin_id:{$val['admin_id']} index:{$val['index']} open_id:{$open_id}");
- }
- }
- $output->info("刷新自定义二维码UV统计: id:{$val['id']} admin_id:{$val['admin_id']} index:{$val['index']} refresh success");
- Log::info("刷新自定义二维码UV统计: id:{$val['id']} admin_id:{$val['admin_id']} index:{$val['index']} refresh success");
- }
- });
- $output->info("刷新自定义二维码UV统计---完成");
- Log::info("刷新自定义二维码UV统计---完成");
- }catch (\Exception $e){
- $output->writeln('刷新自定义二维码UV统计---失败' . $e->getMessage());
- Log::error('刷新自定义二维码UV统计---失败' . $e->getMessage());
- }
- }
- private function refresh_kl(Input $input,Output $output){
- try {
- $output->writeln("刷新KL信息到Redis---开始");
- Log::info("刷新KL信息到Redis---开始");
- $ssdb = Ssdb::instance();
- $redis = Redis::instance();
- Db::table('admin_config')->chunk(1000,function($result) use($redis,$ssdb,$output){
- foreach($result as $channel){
- //转移AV
- $av = $ssdb->get("AV:{$channel['admin_id']}");
- $redis->set("KAV:{$channel['admin_id']}",intval($av));
- //转移AN
- $an = $ssdb->get("AN:{$channel['admin_id']}");
- $redis->set("KAN:{$channel['admin_id']}",intval($an));
- //转移CV
- $cv = $ssdb->get("CV:{$channel['admin_id']}");
- $redis->set("KCV:{$channel['admin_id']}",intval($cv));
- //转移CN
- $cn = $ssdb->get("CN:{$channel['admin_id']}");
- $redis->set("KCN:{$channel['admin_id']}",intval($cn));
- //转移DM
- $dm = $ssdb->get("DM:{$channel['admin_id']}");
- $redis->set("KDM:{$channel['admin_id']}",intval($dm));
- //转移AM
- $am = $ssdb->get("AM:{$channel['admin_id']}");
- $redis->set("KAM:{$channel['admin_id']}",intval($am));
- $output->info("刷新KL信息到Redis: admin_id:{$channel['admin_id']} AV:{$av} AN:{$an} CV:{$cv} CN:{$cn} DM:{$dm} AM:{$am} success");
- Log::info("刷新KL信息到Redis: admin_id:{$channel['admin_id']} AV:{$av} AN:{$an} CV:{$cv} CN:{$cn} DM:{$dm} AM:{$am} success");
- }
- });
- $output->info("刷新KL信息到Redis---完成");
- Log::info("刷新KL信息到Redis---完成");
- }catch (\Exception $e){
- $output->writeln('刷新KL信息到Redis---失败' . $e->getMessage());
- Log::error('刷新KL信息到Redis---失败' . $e->getMessage());
- }
- }
- private function refresh_ref_cr(Input $input,Output $output){
- //CR:推广链接ID --> KCR
- //IG: ---> KIP
- //IU: ---> USER is_white
- //M-T: ---> M-T:
- //推广链接计数
- }
- private function refresh_ref_money(Input $input,Output $output,$process){
- //$dayMTkey = "M-T:".$v['id'].":".date("d");
- }
- private function refresh_pay(Input $input,Output $output){
- $output->writeln("刷新渠道付费用户到mysql---开始");
- Log::info("刷新渠道付费用户到mysql---开始");
- $ssdb = Ssdb::instance();
- Db::table('admin_config')->chunk(1000,function($result) use($ssdb,$output){
- foreach($result as $channel){
- $hash_key = "U-C:".date("d").":".$channel['admin_id'];
- $pay = $ssdb->hget($hash_key,'P');
- $user_collect = model('UserCollect')
- ->where('admin_id',$channel['admin_id'])
- ->where('createdate',date('Ymd'))
- ->where('type',1)->find();
- if($user_collect){
- model('UserCollect')
- ->where('admin_id',$channel['admin_id'])
- ->where('createdate',date('Ymd'))
- ->where('type',1)
- ->update(['increase_recharge'=>intval($pay)]);
- $output->info("刷新KL信息到Redis: update admin_id:{$channel['admin_id']} Pay:{$pay} success");
- Log::info("刷新KL信息到Redis: update admin_id:{$channel['admin_id']} Pay:{$pay} success");
- }else{
- $data = [
- 'createdate' => date('Ymd'),
- 'type' => 1,
- 'admin_id' => $channel['admin_id'],
- 'increase_recharge' => intval($pay)
- ];
- model('UserCollect')->insert($data);
- $output->info("刷新KL信息到Redis: insert admin_id:{$channel['admin_id']} Pay:{$pay} success");
- Log::info("刷新KL信息到Redis: insert admin_id:{$channel['admin_id']} Pay:{$pay} success");
- }
- }
- });
- //统计管理员的
- $admin_hash_key = "U-C:".date("d").":0";
- $admin_pay = $ssdb->hget($admin_hash_key,'P');
- $admin_user_collect = model('UserCollect')
- ->where('admin_id',0)
- ->where('createdate',date('Ymd'))
- ->where('type',1)->find();
- if($admin_user_collect){
- model('UserCollect')
- ->where('admin_id',0)
- ->where('createdate',date('Ymd'))
- ->where('type',1)
- ->update(['increase_recharge'=>intval($admin_pay)]);
- $output->info("刷新KL信息到Redis: update admin_id:0 Pay:{$pay} success");
- Log::info("刷新KL信息到Redis: update admin_id:0 Pay:{$pay}} success");
- }else{
- $data = [
- 'createdate' => date('Ymd'),
- 'type' => 1,
- 'admin_id' => 0,
- 'increase_recharge' => intval($admin_pay)
- ];
- model('UserCollect')->insert($data);
- $output->info("刷新KL信息到Redis: insert admin_id:0 Pay:{$admin_pay} success");
- Log::info("刷新KL信息到Redis: insert admin_id:0 Pay:{$admin_pay} success");
- }
- }
- /**
- * 用户统计
- * @param Input $input
- * @param Output $output
- */
- private function uc(Input $input, Output $output){
- $output->info("SSDB_TO_MYSQL uc info start run");
- try{
- $ssdb = Ssdb::instance();
- $userCollect = new UserCollect();
- $unixTime = time();
- $timed = date("d",strtotime("-1 day"));
- $timeYmd = date("Ymd",strtotime("-1 day"));
- $adminConfig = new AdminConfig();
- if(!$channel_list = $adminConfig->select()){
- return;
- }
- $insert_sql = "INSERT INTO `user_collect`(`admin_id` , `type` , `createdate` , `increase_f` , `increase_m` , `increase_fllow` , `increase` , `increase_recharge`, `createtime`, `updatetime`) VALUES";
- $update_sql = "UPDATE `user_collect` SET `increase_f` = (CASE `id` %s END), `increase_m` = (CASE `id` %s END), `increase_fllow` = (CASE `id` %s END), `increase` = (CASE `id` %s END), `increase_recharge` = (CASE `id` %s END) , `updatetime` = {$unixTime}";
- $update_ids = [];
- $update_where = [];
- $is_insert = false;
- $insert_sql_array = [];
- //添加管理员
- array_push($channel_list,['admin_id'=>0]);
- //获取统计数据
- foreach($channel_list as $channel){
- $hash_name = 'U-C:'.$timed.':'.$channel['admin_id'];
- $channel_map = ['admin_id'=>$channel['admin_id'],'type'=>1,'createdate'=>$timeYmd];
- //获取关注用户数量
- $followUnknown = intval($ssdb->hget($hash_name,'F:0'));
- $followBoy = intval($ssdb->hget($hash_name,'F:1'));
- $followGirl = intval($ssdb->hget($hash_name,'F:2'));
- //获取新增用户数量
- $increaseUnknown = intval($ssdb->hget($hash_name,'A:0'));
- $increaseBoy = intval($ssdb->hget($hash_name,'A:1'));
- $increaseGirl = intval($ssdb->hget($hash_name,'A:2'));
- //获取支付用户数量
- $pay = intval($ssdb->hget($hash_name,'P'));
- //删除HASH
- $ssdb->hclear($hash_name);
- if(empty($followUnknown) && empty($followBoy) && empty($followGirl) && empty($increaseUnknown) && empty($increaseBoy) && empty($increaseGirl) && empty($pay)){
- //都为空时,不处理
- continue;
- }else{
- $channel_params = [
- 'increase_f' => $followGirl + $increaseGirl,
- 'increase_m' => $followBoy + $increaseBoy,
- 'increase_fllow' => $followUnknown + $followGirl + $followBoy,
- 'increase' => $increaseUnknown + $increaseGirl + $increaseBoy,
- 'increase_recharge' => $pay
- ];
- Log::info("SSDB_TO_MYSQL->UC->SSDB: admin_id:{$channel['admin_id']} createdate:{$timeYmd} data:".json_encode($channel_params));
- if($collect = $userCollect->where($channel_map)->find()){
- array_push($update_ids,$collect['id']);
- foreach($channel_params as $key => $val){
- if(!isset($update_where[$key])){
- $update_where[$key] = sprintf(" WHEN %d THEN `{$key}`+%d",$collect['id'],$val);
- }else{
- $update_where[$key] .= sprintf(" WHEN %d THEN `{$key}`+%d",$collect['id'],$val);
- }
- }
- }else{
- $is_insert = true;
- array_push($insert_sql_array,vsprintf(" (%d , %d , '%s' , %d , %d , %d , %d , %d , %d , %d)", array_merge($channel_map,$channel_params,array('createtime'=>time(),'updatetime'=>time()))));
- }
- }
- }
- //执行SQL插入语句
- if($is_insert){
- if($exec_array = array_chunk($insert_sql_array,10)){
- foreach($exec_array as $value){
- Log::info("SSDB_TO_MYSQL uc insert SQL: ".$insert_sql.' '.implode(',',$value));
- if(!$userCollect->execute($insert_sql.' '.implode(',',$value))){
- $output->error("SSDB_TO_MYSQL uc error INSERT_SQL execution failed");
- Log::error("SSDB_TO_MYSQL uc error INSERT_SQL execution failed");
- }else{
- $output->info("SSDB_TO_MYSQL uc info INSERT_SQL execution success");
- Log::info("SSDB_TO_MYSQL uc info INSERT_SQL execution success");
- }
- }
- }
- }
- //执行更新语句
- if(!empty($update_ids)){
- $update_sql = vsprintf($update_sql,$update_where)." WHERE id IN (".implode(',', $update_ids).")";
- Log::info("SSDB_TO_MYSQL uc update SQL: ".$update_sql);
- if(false === $userCollect->execute($update_sql)){
- $output->error("SSDB_TO_MYSQL uc error UPDATE_SQL execution failed");
- Log::error("SSDB_TO_MYSQL uc error UPDATE_SQL execution failed");
- }else{
- $output->info("SSDB_TO_MYSQL uc info UPDATE_SQL execution success");
- Log::info("SSDB_TO_MYSQL uc info UPDATE_SQL execution success");
- }
- }
- $output->info("SSDB_TO_MYSQL uc info end run");
- }catch (\Exception $e){
- $output->error('SSDB_TO_MYSQL uc error '.$e->getMessage());
- }
- }
- /**
- * 二维码关注统计
- * @param Input $input
- * @param Output $output
- */
- private function quv(Input $input, Output $output){
- $output->info("SSDB_TO_MYSQL quv info start run");
- try{
- $ssdb = Ssdb::instance();
- $coustomQrcode = new CustomQrcode();
- $source = $coustomQrcode->field('id,admin_id,index')->select();
- $update_sql = "UPDATE `custom_qrcode` SET `UV` = CASE `id` ";
- if($source){
- $ids = array();
- foreach($source as $val){
- $key = 'quv:'.$val['admin_id'].':'.$val['index'];
- $uv = $ssdb->hsize($key);
- if($uv != 0){
- $update_sql .= sprintf("WHEN %d THEN %d ", $val['id'], $uv);
- array_push($ids,$val['id']);
- }
- }
- if($ids){
- $update_sql .= "END WHERE id IN (".implode(',', $ids).")";
- Log::info("SSDB_TO_MYSQL quv update SQL: ".$update_sql);
- if(false === $coustomQrcode->execute($update_sql)){
- $output->error("SSDB_TO_MYSQL quv error UPDATE_SQL execution failed");
- Log::error("处理二维码关注统计---写入数据库失败\r\nSQL:".$update_sql);
- }else{
- $output->info("SSDB_TO_MYSQL quv info UPDATE_SQL execution success");
- Log::info("SSDB_TO_MYSQL quv info UPDATE_SQL execution success");
- }
- }
- }
- $output->info("SSDB_TO_MYSQL quv info end run");
- }catch (\Exception $e){
- $output->error('SSDB_TO_MYSQL quv error '.$e->getMessage());
- }
- }
- }
|