FlushStatisticalDataToDb.php 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. <?php
  2. /**
  3. * Created by: PhpStorm
  4. * User: lytian
  5. * Date: 2019/10/31
  6. * Time: 19:14
  7. */
  8. namespace app\admin\command;
  9. use app\common\library\Redis;
  10. use app\main\constants\CustomConstants;
  11. use think\Config;
  12. use think\console\Command;
  13. use think\console\Input;
  14. use think\console\Output;
  15. use think\Db;
  16. use think\Env;
  17. use think\Log;
  18. use think\Request;
  19. class FlushStatisticalDataToDb extends Command
  20. {
  21. const CUSTOM_MSG_TYPE_UV = 1;
  22. const CUSTOM_MSG_TYPE_PAY = 2;
  23. protected function configure()
  24. {
  25. $this->setName('FlushStatisticalDataToDb')
  26. ->setDescription('客服消息统计数据入库');
  27. }
  28. protected function execute(Input $input, Output $output)
  29. {
  30. Request::instance()->module('admin'); //cli模式下无法获取到当前的项目模块,手动指定一下
  31. $this->flushCustomToDb();
  32. }
  33. /**
  34. * 更新客服消息统计
  35. */
  36. private function flushCustomToDb(){
  37. $this->handleQueueTask('flushCustomToDb',"collect_custom_list_new", function($result){
  38. //使用主库的从库
  39. $mainSlaveDbConfig = $this->getMainSlaveDbConfig();
  40. foreach($result as $custom_id => $list){
  41. foreach($list as $custom_idx => $values){
  42. $rowRes = Db::connect($mainSlaveDbConfig)->query("select id from custom_url_collect where custom_id = {$custom_id} and idx = {$custom_idx} ");
  43. if($rowRes){
  44. //存在更新数据
  45. $id = $rowRes[0]['id'];
  46. $updateData = [];
  47. foreach($values as $d_key => $d_val){
  48. if($d_key == 'uv'){
  49. $updateData['uv'] = $d_val;
  50. }else{
  51. $updateData[$d_key] = ['exp', "($d_key+$d_val)"];
  52. }
  53. }
  54. if ($updateData) {
  55. $updateData['updatetime'] = date("Y-m-d H:i:s");
  56. Log::info($updateData);
  57. if(model("CustomUrlCollect")->update($updateData,['id' => $id])){
  58. Log::info("flushCustomToDb: custom_id:{$custom_id} custom_idx:{$custom_idx} Data:".json_encode($values)." Update Db Success");
  59. }else{
  60. Log::error("flushCustomToDb: custom_id:{$custom_id} custom_idx:{$custom_idx} Data:".json_encode($values)." Update Db Fail");
  61. }
  62. }
  63. }else{
  64. $item['custom_id'] = $custom_id;
  65. $item['idx'] = $custom_idx;
  66. $item = array_merge($item,$values);
  67. $res = model("CustomUrlCollect")->allowField(true)->insert($item);
  68. if($res){
  69. Log::info("flushCustomToDb: custom_id:{$custom_id} custom_idx:{$custom_idx} Data:".json_encode($item)." Insert Db Success");
  70. }else{
  71. Log::error("flushCustomToDb: custom_id:{$custom_id} custom_idx:{$custom_idx} Data:".json_encode($item)." Insert Db Fail");
  72. }
  73. }
  74. }
  75. }
  76. unset($result);
  77. },function($result){
  78. $data = [];
  79. foreach($result as $message){
  80. Log::info("flushCustomToDb fromredis:".$message);
  81. if(!$custom_info = json_decode($message,true)){
  82. continue;
  83. }
  84. $custom_id = $custom_info['custom_id'];
  85. $custom_idx = $custom_info['idx'];
  86. //UV处理
  87. if(intval($custom_info['type']) == self::CUSTOM_MSG_TYPE_UV ){
  88. $uv_key = CustomConstants::getCustomUvKey($custom_id,$custom_idx);
  89. $custom_uv = Redis::instance()->pfcount($uv_key) ?? 0;
  90. $data[$custom_id][$custom_idx]['uv'] = $custom_uv;
  91. }
  92. //充值处理
  93. if(intval($custom_info['type']) == self::CUSTOM_MSG_TYPE_PAY ){
  94. //充值金额
  95. if(isset($data[$custom_id][$custom_idx]['recharge_money'])){
  96. $data[$custom_id][$custom_idx]['recharge_money'] += $custom_info['recharge_money'];
  97. }else{
  98. $data[$custom_id][$custom_idx]['recharge_money'] = $custom_info['recharge_money'];
  99. }
  100. //充值笔数
  101. if(isset($data[$custom_id][$custom_idx]['recharge_orders'])){
  102. $data[$custom_id][$custom_idx]['recharge_orders'] += $custom_info['recharge_orders'];
  103. }else{
  104. $data[$custom_id][$custom_idx]['recharge_orders'] = $custom_info['recharge_orders'];
  105. }
  106. }
  107. }
  108. unset($result);
  109. return $data;
  110. });
  111. }
  112. /**
  113. * 处理队列
  114. * @param $title
  115. * @param $keyName
  116. * @param callable $writeDb
  117. * @param callable $formatData
  118. */
  119. private function handleQueueTask($title,$keyName,callable $writeDb,callable $formatData = null){
  120. try {
  121. //获取队列长度
  122. $list_size = Redis::instance()->lLen($keyName);
  123. Log::info("{$title}: Timer Task Sizes:{$list_size} Start");
  124. //size不为0时处理
  125. if($list_size){
  126. //获取队列数据,数据为空时直接返回
  127. if (!$result = $this->getRedisListData($keyName)) {
  128. return;
  129. }
  130. //格式化数据
  131. if($formatData && is_callable($formatData)){
  132. if(!$result = call_user_func($formatData,$result)){
  133. return;
  134. }
  135. }
  136. //调用回调函数写入数据库
  137. if($writeDb && is_callable($writeDb)){
  138. call_user_func($writeDb,$result);
  139. }
  140. //获取队列长度
  141. $list_size = Redis::instance()->lLen($keyName);
  142. }
  143. Log::info("{$title}: Timer Task Size:{$list_size} End");
  144. }catch (\Exception $e){
  145. $err_info = ['code' => $e->getCode(), 'msg' => $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()];
  146. Log::error("{$title}: Error:".json_encode($err_info,JSON_UNESCAPED_UNICODE));
  147. }catch (\Throwable $e){
  148. $err_info = ['code' => $e->getCode(), 'msg' => $e->getMessage(), 'line' => $e->getLine(), 'file' => $e->getFile(), 'trace' => $e->getTraceAsString()];
  149. Log::error("{$title}: Error:".json_encode($err_info,JSON_UNESCAPED_UNICODE));
  150. }
  151. }
  152. /**
  153. * 获取队列里面的数据,请注意入队使用rpush
  154. * @param string $key_name 队列名称
  155. * @param int $limit 获取队列前面多少条,并删除
  156. * @return array
  157. * @throws \Exception
  158. */
  159. private function getRedisListData($key_name,$limit = 5000){
  160. $result = Redis::instance()->lRange($key_name,0,$limit);
  161. //删除时可能不够5000条,按取出来的条数进行删除
  162. Redis::instance()->lTrim($key_name,count($result),-1);
  163. return $result;
  164. }
  165. /**
  166. * 获取主库的从库配置 从库不存在返回主库
  167. * @return array
  168. */
  169. private function getMainSlaveDbConfig()
  170. {
  171. $hostArr = explode(',', Env::get('database.admin_hostname'));
  172. $portArr = explode(',', Env::get('database.admin_hostport', '3306,3306'));
  173. //默认主库
  174. $mainDbConfig = array_merge(Config::get("database"), ['hostname' =>$hostArr[0], 'hostport' => $portArr[0]]);
  175. if (count($hostArr) >= 2) {
  176. //从库
  177. $mainDbConfig = array_merge(Config::get("database"), ['hostname' =>$hostArr[1], 'hostport' => $portArr[1]]);
  178. }
  179. return $mainDbConfig;
  180. }
  181. }