SsdbToMysql.php 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. <?php
  2. namespace app\admin\command;
  3. use app\common\library\Redis;
  4. use app\common\model\CustomQrcode;
  5. use app\common\model\AdminConfig;
  6. use app\common\model\Admin;
  7. use app\common\model\BookCollect;
  8. use app\common\model\UserCollect;
  9. use app\common\library\Ssdb;
  10. use think\console\Command;
  11. use think\console\Input;
  12. use think\console\input\Option;
  13. use think\console\Output;
  14. use think\Db;
  15. use think\Log;
  16. use think\Request;
  17. class SsdbToMysql extends Command
  18. {
  19. protected function configure()
  20. {
  21. $this
  22. ->setName('ssdbToMysql')
  23. ->addOption('type', 't', Option::VALUE_OPTIONAL, '要处理的缓存类型,all:默认处理全部,quv:二维码UV统计,uc:用户统计,bc:小说统计:', 'all')
  24. ->addOption('refresh','r',Option::VALUE_OPTIONAL,'刷新Ssdb数据到Redis,quv:自定义二维码刷新到redis,pay:刷新渠道新增付费用户到Mysql,ref:刷新推广链接当天金额')
  25. ->addOption('process','p',Option::VALUE_OPTIONAL,'进程数,只对ref起作用')
  26. ->setDescription('将ssdb缓存中的数据存入mysql数据库');
  27. }
  28. protected function execute(Input $input, Output $output)
  29. {
  30. Request::instance()->module('admin'); //cli模式下无法获取到当前的项目模块,手动指定一下
  31. $process = $input->getOption('process');
  32. if($refresh = $input->getOption('refresh')){
  33. switch ($refresh){
  34. case 'quv':
  35. $this->refresh_quv($input,$output);
  36. break;
  37. case 'kl':
  38. $this->refresh_kl($input,$output);
  39. break;
  40. case 'pay':
  41. $this->refresh_pay($input,$output);
  42. break;
  43. case 'ref':
  44. Redis::instance()->del("fork_ref_lock");
  45. $this->refresh_ref_money($input,$output,intval($process));
  46. break;
  47. default:
  48. $output->writeln("Type: {$refresh} 无法识别的类型");
  49. break;
  50. }
  51. }else{
  52. $type = $input->getOption('type');
  53. switch ($type) {
  54. case 'all':
  55. $this->quv($input,$output);
  56. $this->uc($input,$output);
  57. $this->bc($input,$output);
  58. break;
  59. case 'quv':
  60. $this->quv($input, $output);
  61. break;
  62. case 'uc':
  63. $this->uc($input,$output);
  64. break;
  65. case 'bc':
  66. $this->bc($input,$output);
  67. break;
  68. default:
  69. $output->writeln("Type: {$type} 无法识别的类型");
  70. }
  71. }
  72. }
  73. private function refresh_quv(Input $input,Output $output){
  74. try {
  75. $output->writeln("刷新自定义二维码UV统计---开始");
  76. Log::info("刷新自定义二维码UV统计---开始");
  77. $ssdb = Ssdb::instance();
  78. $redis = Redis::instance();
  79. Db::table('custom_qrcode')->chunk(1000, function ($result) use ($redis,$ssdb,$output) {
  80. foreach ($result as $val) {
  81. $hash_key = "quv:{$val['admin_id']}:{$val['index']}";
  82. $redis_key = "QR_UV:{$val['admin_id']}:{$val['index']}";
  83. while($list = $ssdb->hkeys($hash_key,'','',2000)){
  84. foreach($list as $open_id){
  85. $redis->pfAdd($redis_key,$open_id);
  86. $ssdb->hdel($hash_key,$open_id);
  87. $output->info("刷新自定义二维码UV统计: id:{$val['id']} admin_id:{$val['admin_id']} index:{$val['index']} open_id:{$open_id}");
  88. Log::info("刷新自定义二维码UV统计: id:{$val['id']} admin_id:{$val['admin_id']} index:{$val['index']} open_id:{$open_id}");
  89. }
  90. }
  91. $output->info("刷新自定义二维码UV统计: id:{$val['id']} admin_id:{$val['admin_id']} index:{$val['index']} refresh success");
  92. Log::info("刷新自定义二维码UV统计: id:{$val['id']} admin_id:{$val['admin_id']} index:{$val['index']} refresh success");
  93. }
  94. });
  95. $output->info("刷新自定义二维码UV统计---完成");
  96. Log::info("刷新自定义二维码UV统计---完成");
  97. }catch (\Exception $e){
  98. $output->writeln('刷新自定义二维码UV统计---失败' . $e->getMessage());
  99. Log::error('刷新自定义二维码UV统计---失败' . $e->getMessage());
  100. }
  101. }
  102. private function refresh_kl(Input $input,Output $output){
  103. try {
  104. $output->writeln("刷新KL信息到Redis---开始");
  105. Log::info("刷新KL信息到Redis---开始");
  106. $ssdb = Ssdb::instance();
  107. $redis = Redis::instance();
  108. Db::table('admin_config')->chunk(1000,function($result) use($redis,$ssdb,$output){
  109. foreach($result as $channel){
  110. //转移AV
  111. $av = $ssdb->get("AV:{$channel['admin_id']}");
  112. $redis->set("KAV:{$channel['admin_id']}",intval($av));
  113. //转移AN
  114. $an = $ssdb->get("AN:{$channel['admin_id']}");
  115. $redis->set("KAN:{$channel['admin_id']}",intval($an));
  116. //转移CV
  117. $cv = $ssdb->get("CV:{$channel['admin_id']}");
  118. $redis->set("KCV:{$channel['admin_id']}",intval($cv));
  119. //转移CN
  120. $cn = $ssdb->get("CN:{$channel['admin_id']}");
  121. $redis->set("KCN:{$channel['admin_id']}",intval($cn));
  122. //转移DM
  123. $dm = $ssdb->get("DM:{$channel['admin_id']}");
  124. $redis->set("KDM:{$channel['admin_id']}",intval($dm));
  125. //转移AM
  126. $am = $ssdb->get("AM:{$channel['admin_id']}");
  127. $redis->set("KAM:{$channel['admin_id']}",intval($am));
  128. $output->info("刷新KL信息到Redis: admin_id:{$channel['admin_id']} AV:{$av} AN:{$an} CV:{$cv} CN:{$cn} DM:{$dm} AM:{$am} success");
  129. Log::info("刷新KL信息到Redis: admin_id:{$channel['admin_id']} AV:{$av} AN:{$an} CV:{$cv} CN:{$cn} DM:{$dm} AM:{$am} success");
  130. }
  131. });
  132. $output->info("刷新KL信息到Redis---完成");
  133. Log::info("刷新KL信息到Redis---完成");
  134. }catch (\Exception $e){
  135. $output->writeln('刷新KL信息到Redis---失败' . $e->getMessage());
  136. Log::error('刷新KL信息到Redis---失败' . $e->getMessage());
  137. }
  138. }
  139. private function refresh_ref_cr(Input $input,Output $output){
  140. //CR:推广链接ID --> KCR
  141. //IG: ---> KIP
  142. //IU: ---> USER is_white
  143. //M-T: ---> M-T:
  144. //推广链接计数
  145. }
  146. private function refresh_ref_money(Input $input,Output $output,$process){
  147. //$dayMTkey = "M-T:".$v['id'].":".date("d");
  148. }
  149. private function refresh_pay(Input $input,Output $output){
  150. $output->writeln("刷新渠道付费用户到mysql---开始");
  151. Log::info("刷新渠道付费用户到mysql---开始");
  152. $ssdb = Ssdb::instance();
  153. Db::table('admin_config')->chunk(1000,function($result) use($ssdb,$output){
  154. foreach($result as $channel){
  155. $hash_key = "U-C:".date("d").":".$channel['admin_id'];
  156. $pay = $ssdb->hget($hash_key,'P');
  157. $user_collect = model('UserCollect')
  158. ->where('admin_id',$channel['admin_id'])
  159. ->where('createdate',date('Ymd'))
  160. ->where('type',1)->find();
  161. if($user_collect){
  162. model('UserCollect')
  163. ->where('admin_id',$channel['admin_id'])
  164. ->where('createdate',date('Ymd'))
  165. ->where('type',1)
  166. ->update(['increase_recharge'=>intval($pay)]);
  167. $output->info("刷新KL信息到Redis: update admin_id:{$channel['admin_id']} Pay:{$pay} success");
  168. Log::info("刷新KL信息到Redis: update admin_id:{$channel['admin_id']} Pay:{$pay} success");
  169. }else{
  170. $data = [
  171. 'createdate' => date('Ymd'),
  172. 'type' => 1,
  173. 'admin_id' => $channel['admin_id'],
  174. 'increase_recharge' => intval($pay)
  175. ];
  176. model('UserCollect')->insert($data);
  177. $output->info("刷新KL信息到Redis: insert admin_id:{$channel['admin_id']} Pay:{$pay} success");
  178. Log::info("刷新KL信息到Redis: insert admin_id:{$channel['admin_id']} Pay:{$pay} success");
  179. }
  180. }
  181. });
  182. //统计管理员的
  183. $admin_hash_key = "U-C:".date("d").":0";
  184. $admin_pay = $ssdb->hget($admin_hash_key,'P');
  185. $admin_user_collect = model('UserCollect')
  186. ->where('admin_id',0)
  187. ->where('createdate',date('Ymd'))
  188. ->where('type',1)->find();
  189. if($admin_user_collect){
  190. model('UserCollect')
  191. ->where('admin_id',0)
  192. ->where('createdate',date('Ymd'))
  193. ->where('type',1)
  194. ->update(['increase_recharge'=>intval($admin_pay)]);
  195. $output->info("刷新KL信息到Redis: update admin_id:0 Pay:{$pay} success");
  196. Log::info("刷新KL信息到Redis: update admin_id:0 Pay:{$pay}} success");
  197. }else{
  198. $data = [
  199. 'createdate' => date('Ymd'),
  200. 'type' => 1,
  201. 'admin_id' => 0,
  202. 'increase_recharge' => intval($admin_pay)
  203. ];
  204. model('UserCollect')->insert($data);
  205. $output->info("刷新KL信息到Redis: insert admin_id:0 Pay:{$admin_pay} success");
  206. Log::info("刷新KL信息到Redis: insert admin_id:0 Pay:{$admin_pay} success");
  207. }
  208. }
  209. /**
  210. * 用户统计
  211. * @param Input $input
  212. * @param Output $output
  213. */
  214. private function uc(Input $input, Output $output){
  215. $output->info("SSDB_TO_MYSQL uc info start run");
  216. try{
  217. $ssdb = Ssdb::instance();
  218. $userCollect = new UserCollect();
  219. $unixTime = time();
  220. $timed = date("d",strtotime("-1 day"));
  221. $timeYmd = date("Ymd",strtotime("-1 day"));
  222. $adminConfig = new AdminConfig();
  223. if(!$channel_list = $adminConfig->select()){
  224. return;
  225. }
  226. $insert_sql = "INSERT INTO `user_collect`(`admin_id` , `type` , `createdate` , `increase_f` , `increase_m` , `increase_fllow` , `increase` , `increase_recharge`, `createtime`, `updatetime`) VALUES";
  227. $update_sql = "UPDATE `user_collect` SET `increase_f` = (CASE `id` %s END), `increase_m` = (CASE `id` %s END), `increase_fllow` = (CASE `id` %s END), `increase` = (CASE `id` %s END), `increase_recharge` = (CASE `id` %s END) , `updatetime` = {$unixTime}";
  228. $update_ids = [];
  229. $update_where = [];
  230. $is_insert = false;
  231. $insert_sql_array = [];
  232. //添加管理员
  233. array_push($channel_list,['admin_id'=>0]);
  234. //获取统计数据
  235. foreach($channel_list as $channel){
  236. $hash_name = 'U-C:'.$timed.':'.$channel['admin_id'];
  237. $channel_map = ['admin_id'=>$channel['admin_id'],'type'=>1,'createdate'=>$timeYmd];
  238. //获取关注用户数量
  239. $followUnknown = intval($ssdb->hget($hash_name,'F:0'));
  240. $followBoy = intval($ssdb->hget($hash_name,'F:1'));
  241. $followGirl = intval($ssdb->hget($hash_name,'F:2'));
  242. //获取新增用户数量
  243. $increaseUnknown = intval($ssdb->hget($hash_name,'A:0'));
  244. $increaseBoy = intval($ssdb->hget($hash_name,'A:1'));
  245. $increaseGirl = intval($ssdb->hget($hash_name,'A:2'));
  246. //获取支付用户数量
  247. $pay = intval($ssdb->hget($hash_name,'P'));
  248. //删除HASH
  249. $ssdb->hclear($hash_name);
  250. if(empty($followUnknown) && empty($followBoy) && empty($followGirl) && empty($increaseUnknown) && empty($increaseBoy) && empty($increaseGirl) && empty($pay)){
  251. //都为空时,不处理
  252. continue;
  253. }else{
  254. $channel_params = [
  255. 'increase_f' => $followGirl + $increaseGirl,
  256. 'increase_m' => $followBoy + $increaseBoy,
  257. 'increase_fllow' => $followUnknown + $followGirl + $followBoy,
  258. 'increase' => $increaseUnknown + $increaseGirl + $increaseBoy,
  259. 'increase_recharge' => $pay
  260. ];
  261. Log::info("SSDB_TO_MYSQL->UC->SSDB: admin_id:{$channel['admin_id']} createdate:{$timeYmd} data:".json_encode($channel_params));
  262. if($collect = $userCollect->where($channel_map)->find()){
  263. array_push($update_ids,$collect['id']);
  264. foreach($channel_params as $key => $val){
  265. if(!isset($update_where[$key])){
  266. $update_where[$key] = sprintf(" WHEN %d THEN `{$key}`+%d",$collect['id'],$val);
  267. }else{
  268. $update_where[$key] .= sprintf(" WHEN %d THEN `{$key}`+%d",$collect['id'],$val);
  269. }
  270. }
  271. }else{
  272. $is_insert = true;
  273. array_push($insert_sql_array,vsprintf(" (%d , %d , '%s' , %d , %d , %d , %d , %d , %d , %d)", array_merge($channel_map,$channel_params,array('createtime'=>time(),'updatetime'=>time()))));
  274. }
  275. }
  276. }
  277. //执行SQL插入语句
  278. if($is_insert){
  279. if($exec_array = array_chunk($insert_sql_array,10)){
  280. foreach($exec_array as $value){
  281. Log::info("SSDB_TO_MYSQL uc insert SQL: ".$insert_sql.' '.implode(',',$value));
  282. if(!$userCollect->execute($insert_sql.' '.implode(',',$value))){
  283. $output->error("SSDB_TO_MYSQL uc error INSERT_SQL execution failed");
  284. Log::error("SSDB_TO_MYSQL uc error INSERT_SQL execution failed");
  285. }else{
  286. $output->info("SSDB_TO_MYSQL uc info INSERT_SQL execution success");
  287. Log::info("SSDB_TO_MYSQL uc info INSERT_SQL execution success");
  288. }
  289. }
  290. }
  291. }
  292. //执行更新语句
  293. if(!empty($update_ids)){
  294. $update_sql = vsprintf($update_sql,$update_where)." WHERE id IN (".implode(',', $update_ids).")";
  295. Log::info("SSDB_TO_MYSQL uc update SQL: ".$update_sql);
  296. if(false === $userCollect->execute($update_sql)){
  297. $output->error("SSDB_TO_MYSQL uc error UPDATE_SQL execution failed");
  298. Log::error("SSDB_TO_MYSQL uc error UPDATE_SQL execution failed");
  299. }else{
  300. $output->info("SSDB_TO_MYSQL uc info UPDATE_SQL execution success");
  301. Log::info("SSDB_TO_MYSQL uc info UPDATE_SQL execution success");
  302. }
  303. }
  304. $output->info("SSDB_TO_MYSQL uc info end run");
  305. }catch (\Exception $e){
  306. $output->error('SSDB_TO_MYSQL uc error '.$e->getMessage());
  307. }
  308. }
  309. /**
  310. * 二维码关注统计
  311. * @param Input $input
  312. * @param Output $output
  313. */
  314. private function quv(Input $input, Output $output){
  315. $output->info("SSDB_TO_MYSQL quv info start run");
  316. try{
  317. $ssdb = Ssdb::instance();
  318. $coustomQrcode = new CustomQrcode();
  319. $source = $coustomQrcode->field('id,admin_id,index')->select();
  320. $update_sql = "UPDATE `custom_qrcode` SET `UV` = CASE `id` ";
  321. if($source){
  322. $ids = array();
  323. foreach($source as $val){
  324. $key = 'quv:'.$val['admin_id'].':'.$val['index'];
  325. $uv = $ssdb->hsize($key);
  326. if($uv != 0){
  327. $update_sql .= sprintf("WHEN %d THEN %d ", $val['id'], $uv);
  328. array_push($ids,$val['id']);
  329. }
  330. }
  331. if($ids){
  332. $update_sql .= "END WHERE id IN (".implode(',', $ids).")";
  333. Log::info("SSDB_TO_MYSQL quv update SQL: ".$update_sql);
  334. if(false === $coustomQrcode->execute($update_sql)){
  335. $output->error("SSDB_TO_MYSQL quv error UPDATE_SQL execution failed");
  336. Log::error("处理二维码关注统计---写入数据库失败\r\nSQL:".$update_sql);
  337. }else{
  338. $output->info("SSDB_TO_MYSQL quv info UPDATE_SQL execution success");
  339. Log::info("SSDB_TO_MYSQL quv info UPDATE_SQL execution success");
  340. }
  341. }
  342. }
  343. $output->info("SSDB_TO_MYSQL quv info end run");
  344. }catch (\Exception $e){
  345. $output->error('SSDB_TO_MYSQL quv error '.$e->getMessage());
  346. }
  347. }
  348. }