RefreshSsdb.php 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Bear
  5. * Date: 2019/1/7
  6. * Time: 下午3:49
  7. */
  8. namespace app\admin\command;
  9. use app\common\library\Redis;
  10. use app\common\library\Ssdb;
  11. use think\console\Command;
  12. use think\console\Input;
  13. use think\console\input\Option;
  14. use think\console\Output;
  15. use think\Db;
  16. use think\Log;
  17. use think\Request;
  18. class RefreshSsdb extends Command
  19. {
  20. public function Configure()
  21. {
  22. $this
  23. ->setName('RefreshSsdb')
  24. ->addOption('type','t',Option::VALUE_OPTIONAL,'刷新Ssdb数据到Redis,a:自定义二维码UV,渠道新增付费用户,KL计数器,b:ID白名单,IP白名单,CR计数器,M-T统计金额')
  25. ->setDescription('转移SSDB数据到Redis');
  26. }
  27. /**
  28. * @param Input $input
  29. * @param Output $output
  30. * @return int|null|void
  31. * @throws \Exception
  32. */
  33. public function execute(Input $input, Output $output)
  34. {
  35. Request::instance()->module('admin');
  36. $type = $input->getOption('type');
  37. $limit = 10;
  38. switch ($type){
  39. case 'a':
  40. $this->refresh_quv($input,$output);
  41. $this->refresh_kl($input,$output);
  42. $this->refresh_pay($input,$output);
  43. break;
  44. case 'b':
  45. $this->refresh_user($input,$output);
  46. $this->refresh_ip($input,$output);
  47. $this->refresh_cr($input,$output);
  48. $this->refresh_mt($input,$output,$limit);
  49. case 'quv':
  50. $this->refresh_quv($input,$output);
  51. break;
  52. case 'kl':
  53. $this->refresh_kl($input,$output);
  54. break;
  55. case 'pay':
  56. $this->refresh_pay($input,$output);
  57. break;
  58. case 'user':
  59. $this->refresh_user($input,$output);
  60. break;
  61. case 'ip':
  62. $this->refresh_ip($input,$output);
  63. break;
  64. case 'cr':
  65. $this->refresh_cr($input,$output);
  66. break;
  67. case 'mt':
  68. $this->refresh_mt($input,$output);
  69. break;
  70. default:
  71. $output->writeln("Type: {$type} 无法识别的类型");
  72. break;
  73. }
  74. }
  75. private function refresh_mt(Input $input,Output $output){
  76. $ssdb = Ssdb::instance();
  77. $redis = Redis::instance();
  78. $this->success($output,"刷新推广链接M-T信息---开始");
  79. while($source = $ssdb->keys('M-T:','',5000)){
  80. $list = array_filter($source,function($k_name) use($output,$ssdb,$redis) {
  81. if(strpos($k_name,'M-T:') === 0){
  82. //刷新当天推广链接充值
  83. if($current_ref_money = $ssdb->get($k_name)){
  84. if(!$redis->exists($k_name)){
  85. $redis->set($k_name,$current_ref_money);
  86. }else{
  87. $redis->incrBy($k_name,$current_ref_money);
  88. }
  89. $redis->expire($k_name,86400);
  90. $this->success($output,"刷新推广链接M-T信息: {$k_name} Val:{$current_ref_money} success");
  91. $ssdb->del($k_name);
  92. }else{
  93. $this->success($output,"刷新推广链接M-T信息: {$k_name} Val:{$current_ref_money} skip");
  94. }
  95. return true;
  96. }
  97. });
  98. if(empty($list)){
  99. $this->success($output,"刷新推广链接M-T信息---结束");
  100. break;
  101. }
  102. unset($source);
  103. }
  104. }
  105. private function refresh_cr(Input $input,Output $output){
  106. $ssdb = Ssdb::instance();
  107. $redis = Redis::instance();
  108. $this->success($output,"刷新推广链接CR信息---开始");
  109. while($source = $ssdb->keys('CR:','',5000)){
  110. $list = array_filter($source,function($k_name) use($output,$ssdb,$redis) {
  111. if(strpos($k_name,'CR:') === 0){
  112. //刷新CR数据
  113. if($cr = intval($ssdb->get($k_name))){
  114. if(!$redis->exists("K{$k_name}")){
  115. $redis->set("K{$k_name}",$cr);
  116. }else{
  117. $redis->incrBy("K{$k_name}",$cr);
  118. }
  119. $ssdb->del($k_name);
  120. $this->success($output,"刷新推广链接CR信息: {$k_name} Val:{$cr} success");
  121. }else{
  122. $this->success($output,"刷新推广链接CR信息: {$k_name} Val:{$cr} skip");
  123. }
  124. return true;
  125. }
  126. });
  127. if(empty($list)){
  128. break;
  129. }
  130. unset($source);
  131. }
  132. $this->success($output,"刷新推广链接CR信息---结束");
  133. }
  134. private function refresh_ip(Input $input,Output $output){
  135. $ssdb = Ssdb::instance();
  136. $redis = Redis::instance();
  137. $this->success($output,"刷新用户IP白名单---开始");
  138. while($source = $ssdb->hlist('IG:','',5000)){
  139. $list = array_filter($source,function($hash_name) use($output,$ssdb,$redis) {
  140. if(strpos($hash_name,'IG:') === 0){
  141. $size = $ssdb->hsize($hash_name);
  142. $this->success($output,"刷新用户IP白名单: {$hash_name} Size:{$size}");
  143. while($ip_list = $ssdb->hkeys($hash_name, '', '', 5000)){
  144. array_walk($ip_list,function($ip) use($hash_name,$redis,$ssdb,$output){
  145. $ssdb->hdel($hash_name,$ip);
  146. list($key_name,$key_ip) = explode(':',$hash_name);
  147. $redis->sadd("KIP:{$key_ip}",$ip);
  148. $this->success($output,"刷新用户IP白名单: {$hash_name} IP:{$ip} update success");
  149. });
  150. }
  151. $this->success($output,"刷新用户IP白名单: {$hash_name} Size:{$size} End");
  152. return true;
  153. }
  154. });
  155. if(empty($list)){
  156. break;
  157. }
  158. unset($source);
  159. }
  160. $this->success($output,"刷新用户IP白名单---结束");
  161. }
  162. private function refresh_user(Input $input,Output $output){
  163. $ssdb = Ssdb::instance();
  164. $redis = Redis::instance();
  165. $this->success($output,"刷新用户ID白名单---开始");
  166. while($source = $ssdb->hlist('IU:','',5000)){
  167. $list = array_filter($source,function($hash_name) use($output,$ssdb,$redis) {
  168. if(strpos($hash_name,'IU:') === 0){
  169. $size = $ssdb->hsize($hash_name);
  170. $this->success($output,"刷新用户ID白名单: {$hash_name} Size:{$size}");
  171. while($user_id_list = $ssdb->hkeys($hash_name, '', '', 5000)){
  172. array_walk($user_id_list,function($user_id) use($hash_name,$redis,$ssdb,$output){
  173. model('User')
  174. ->setConnect($user_id)
  175. ->where('id',$user_id)
  176. ->update(['is_white'=>1]);
  177. $ssdb->hdel($hash_name,$user_id);
  178. $redis->del("UN:{$user_id}");
  179. $this->success($output,"刷新用户ID白名单: {$hash_name} UserId:{$user_id} update success");
  180. });
  181. }
  182. $this->success($output,"刷新用户ID白名单: {$hash_name} Size:{$size} End");
  183. return true;
  184. }
  185. });
  186. if(empty($list)){
  187. break;
  188. }
  189. unset($source);
  190. }
  191. $this->success($output,"刷新用户ID白名单---结束");
  192. }
  193. private function refresh_quv(Input $input,Output $output){
  194. try {
  195. $this->success($output,"刷新自定义二维码UV统计---开始");
  196. $ssdb = Ssdb::instance();
  197. $redis = Redis::instance();
  198. Db::table('custom_qrcode')->chunk(1000, function ($result) use ($redis,$ssdb,$output) {
  199. foreach ($result as $val) {
  200. $hash_key = "quv:{$val['admin_id']}:{$val['index']}";
  201. $redis_key = "QR_UV:{$val['admin_id']}:{$val['index']}";
  202. while($list = $ssdb->hkeys($hash_key,'','',2000)){
  203. foreach($list as $open_id){
  204. $redis->pfAdd($redis_key,$open_id);
  205. $ssdb->hdel($hash_key,$open_id);
  206. $this->success($output,"刷新自定义二维码UV统计: id:{$val['id']} admin_id:{$val['admin_id']} index:{$val['index']} open_id:{$open_id}");
  207. }
  208. }
  209. $this->success($output,"刷新自定义二维码UV统计: id:{$val['id']} admin_id:{$val['admin_id']} index:{$val['index']} refresh success");
  210. }
  211. });
  212. $this->success($output,"刷新自定义二维码UV统计---完成");
  213. }catch (\Exception $e){
  214. $this->error($output,'刷新自定义二维码UV统计---失败' . $e->getMessage());
  215. }
  216. }
  217. private function refresh_kl(Input $input,Output $output){
  218. try {
  219. $this->success($output,"刷新KL信息到Redis---开始");
  220. $ssdb = Ssdb::instance();
  221. $redis = Redis::instance();
  222. Db::table('admin')->chunk(1000,function($result) use($redis,$ssdb,$output){
  223. foreach($result as $channel){
  224. //转移AV
  225. if($av = $ssdb->get("AV:{$channel['id']}")){
  226. if(!$redis->exists("KAV:{$channel['id']}")){
  227. $redis->set("KAV:{$channel['id']}",intval($av));
  228. }else{
  229. $redis->incrBy("KAV:{$channel['id']}",intval($av));
  230. }
  231. }
  232. $ssdb->del("AV:{$channel['id']}");
  233. //转移AN
  234. if($an = $ssdb->get("AN:{$channel['id']}")){
  235. if(!$redis->exists("KAN:{$channel['id']}")){
  236. $redis->set("KAN:{$channel['id']}",intval($an));
  237. }else{
  238. $redis->incrBy("KAN:{$channel['id']}",intval($an));
  239. }
  240. }
  241. $ssdb->del("AN:{$channel['id']}");
  242. //转移CV
  243. if($cv = $ssdb->get("CV:{$channel['id']}")){
  244. if(!$redis->exists("KCV:{$channel['id']}")){
  245. $redis->set("KCV:{$channel['id']}",intval($cv));
  246. }else{
  247. $redis->incrBy("KCV:{$channel['id']}",intval($cv));
  248. }
  249. }
  250. $ssdb->del("CV:{$channel['id']}");
  251. //转移CN
  252. if($cn = $ssdb->get("CN:{$channel['id']}")){
  253. if(!$redis->exists("KCN:{$channel['id']}")){
  254. $redis->set("KCN:{$channel['id']}",intval($cn));
  255. }else{
  256. $redis->incrBy("KCN:{$channel['id']}",intval($cn));
  257. }
  258. }
  259. $ssdb->del("CN:{$channel['id']}");
  260. //获取当前时间的前24小时
  261. $before_time = $this->getBeforeTwentyFourTime();
  262. foreach($before_time as $val){
  263. $time = date('dH',$val);
  264. //转移DM
  265. $dm = $ssdb->get("DM:{$channel['id']}:{$time}");
  266. if(!$redis->exists("KDM:{$channel['id']}:{$time}")){
  267. $redis->set("KDM:{$channel['id']}:{$time}",intval($dm));
  268. }else{
  269. $redis->incrBy("KDM:{$channel['id']}:{$time}",intval($dm));
  270. }
  271. $redis->expire("KDM:{$channel['id']}:{$time}",86400);
  272. $ssdb->del("DM:{$channel['id']}:{$time}");
  273. //转移AM
  274. $am = $ssdb->get("AM:{$channel['id']}:{$time}");
  275. if(!$redis->exists("KAM:{$channel['id']}:{$time}")){
  276. $redis->set("KAM:{$channel['id']}:{$time}",intval($am));
  277. }else{
  278. $redis->incrBy("KAM:{$channel['id']}:{$time}",intval($am));
  279. }
  280. $redis->expire("KAM:{$channel['id']}:{$time}",86400);
  281. $ssdb->del("AM:{$channel['id']}:{$time}");
  282. $this->success($output,"刷新KL信息到Redis: admin_id:{$channel['id']} DM:{$channel['id']}:{$time} Val:{$dm} AM:{$channel['id']}:{$time} Val:{$am} success");
  283. }
  284. $this->success($output,"刷新KL信息到Redis: admin_id:{$channel['id']} AV:{$av} AN:{$an} CV:{$cv} CN:{$cn} success");
  285. }
  286. });
  287. $this->success($output,"刷新KL信息到Redis---完成");
  288. }catch (\Exception $e){
  289. $this->error($output,'刷新KL信息到Redis---失败' . $e->getMessage());
  290. }
  291. }
  292. private function refresh_pay(Input $input,Output $output){
  293. $this->success($output,"刷新渠道付费用户到mysql---开始");
  294. $ssdb = Ssdb::instance();
  295. Db::table('admin')->chunk(1000,function($result) use($ssdb,$output){
  296. foreach($result as $channel){
  297. $hash_key = "U-C:".date("d").":".$channel['id'];
  298. $pay = $ssdb->hget($hash_key,'P');
  299. if($pay){
  300. $user_collect = model('UserCollect')
  301. ->where('admin_id',$channel['id'])
  302. ->where('createdate',date('Ymd'))
  303. ->where('type',1)->find();
  304. if($user_collect){
  305. model('UserCollect')
  306. ->where('admin_id',$channel['id'])
  307. ->where('createdate',date('Ymd'))
  308. ->where('type',1)
  309. ->update(['increase_recharge'=>intval($pay)]);
  310. $this->success($output,"刷新KL信息到Redis: update admin_id:{$channel['id']} Pay:{$pay} success");
  311. }else{
  312. $data = [
  313. 'createdate' => date('Ymd'),
  314. 'type' => 1,
  315. 'admin_id' => $channel['id'],
  316. 'increase_recharge' => intval($pay)
  317. ];
  318. model('UserCollect')->insert($data);
  319. $this->success($output,"刷新KL信息到Redis: insert admin_id:{$channel['id']} Pay:{$pay} success");
  320. }
  321. }
  322. $ssdb->hdel($hash_key,'P');
  323. }
  324. });
  325. //统计管理员的
  326. $admin_hash_key = "U-C:".date("d").":0";
  327. $admin_pay = $ssdb->hget($admin_hash_key,'P');
  328. if($admin_pay){
  329. $admin_user_collect = model('UserCollect')
  330. ->where('admin_id',0)
  331. ->where('createdate',date('Ymd'))
  332. ->where('type',1)->find();
  333. if($admin_user_collect){
  334. model('UserCollect')
  335. ->where('admin_id',0)
  336. ->where('createdate',date('Ymd'))
  337. ->where('type',1)
  338. ->update(['increase_recharge'=>intval($admin_pay)]);
  339. $this->success($output,"刷新KL信息到Redis: update admin_id:0 Pay:{$admin_pay} success");
  340. }else{
  341. $data = [
  342. 'createdate' => date('Ymd'),
  343. 'type' => 1,
  344. 'admin_id' => 0,
  345. 'increase_recharge' => intval($admin_pay)
  346. ];
  347. model('UserCollect')->insert($data);
  348. $this->success($output,"刷新KL信息到Redis: insert admin_id:0 Pay:{$admin_pay} success");
  349. }
  350. }
  351. $ssdb->hdel($admin_hash_key,'P');
  352. }
  353. /**
  354. * 获取当前时间的前24小时,包含当前小时
  355. * @return array
  356. */
  357. public function getBeforeTwentyFourTime(){
  358. $before_array = [];
  359. $current_time = time();
  360. $first_time = strtotime('-1 day');
  361. for ($time=$first_time; $time<=$current_time; $time+=3600){
  362. array_push($before_array,$time);
  363. }
  364. return $before_array;
  365. }
  366. private function success(Output $output,$message){
  367. $output->info($message);
  368. Log::info($message);
  369. }
  370. private function error(Output $output,$message){
  371. $output->error($message);
  372. Log::error($message);
  373. }
  374. }