setName('UpdateRechargeOrdersId') ->addArgument('startUserId', Argument::REQUIRED, "开始用户ID") ->addArgument('endUserId', Argument::REQUIRED, "结束用户ID") ->setDescription('刷充值记录的orders_id字段'); } protected function execute(Input $input, Output $output) { Request::instance()->module('admin'); $startUserId = $input->getArgument('startUserId'); $endUserId = $input->getArgument('endUserId'); $output->writeln('开始用户ID:' . $startUserId); $output->writeln('结束用户ID:' . $endUserId); if ($startUserId > $endUserId) { $output->writeln('参数错误,起始id必须小于结束id'); } $pageSize = 1000; $do = true; $mainSlaveDb = Db::connect($this->getMainSlaveDbConfig()); while ($do) { $sql = "SELECT DISTINCT user_id FROM orders FORCE INDEX(orders_user_id) WHERE user_id > {$startUserId} AND user_id <= {$endUserId} AND state = '1' LIMIT {$pageSize};"; $userIds = $mainSlaveDb->query($sql); if ($userIds) { $userIds = array_column($userIds, 'user_id'); $userIdsStr = implode(',', $userIds); $sql = "SELECT user_id, id, kandian, free_kandian, day, type, finishtime, deduct, money FROM orders WHERE user_id in ({$userIdsStr}) and state = '1'"; $orderRows = $mainSlaveDb->query($sql); //订单按照user_id分组 $orderRows = ArrayHelper::index($orderRows, null, 'user_id'); foreach ($userIds as $userId) { try { $startUserId = $userId; $userOrderRows = $orderRows[$userId]; $userDbConfig = $this->getDbDeploy($userId, 'user'); $userTable = $userDbConfig['table'].'.recharge'; unset($userDbConfig['table']); //拉取所有的recharge记录 $sql = "SELECT id, kandian, free_kandian, day, createtime FROM {$userTable} WHERE user_id = {$userId} AND type in('1', '2') ORDER BY createtime ASC"; $rechargeRows = Db::connect($userDbConfig)->query($sql); foreach ($userOrderRows as $userOrderRow) { //开始进行匹配 $result = $this->checkRow($userOrderRow, $rechargeRows); if ($result) { Log::info("匹配结果: user_id:{$userId} order_id: {$userOrderRow['id']} recharge: ".json_encode($result, JSON_UNESCAPED_UNICODE)); $rechargeIds = array_values($result); $condition = [ 'id' => ['in', implode(',', $rechargeIds)], ]; $res = model("Recharge") ->setConnect($userId) ->update([ 'orders_id' => $userOrderRow['id'], 'dd' => $userOrderRow['deduct'] ], $condition); if ($res) { Log::info("更新结果成功: user_id:{$userId} order_id: {$userOrderRow['id']} recharge: ".json_encode($result, JSON_UNESCAPED_UNICODE)); } else { Log::error("匹配结果失败: user_id:{$userId} order_id: {$userOrderRow['id']} recharge: ".json_encode($result, JSON_UNESCAPED_UNICODE)); } } else { //没匹配到 需要记录一下 看情况人工处理 Log::error("未匹配到结果: user_id:{$userId} order_id: {$userOrderRow['id']}"); } } unset($userOrderRows); unset($rechargeRows); } catch (Exception $e) { Log::error("脚本异常 user_id: {$userId} ". $e); } } unset($orderRows); } else { $do = false; } unset($userIds); } } /** * 匹配记录 * @param $orderRow * @param $rechargeRows * @param array $notIn * @return array */ private function checkRow($orderRow, &$rechargeRows) { //先精准匹配时间和看点数 $orderTime = $orderRow['finishtime']; $hitData = []; foreach ($rechargeRows as $key => $rechargeRow) { if ($rechargeRow['createtime'] != $orderTime) { continue; } //看点充值 if ($orderRow['kandian'] > 0 && $orderRow['kandian'] == $rechargeRow['kandian'] ) { //永久看点 $hitData[] = $rechargeRow['id']; unset($rechargeRows[$key]); } if ($orderRow['free_kandian'] > 0 && $orderRow['free_kandian'] == $rechargeRow['free_kandian'] ) { //免费看点 $hitData[] = $rechargeRow['id']; unset($rechargeRows[$key]); } //vip充值 if ($orderRow['day'] > 0 && $orderRow['day'] == $rechargeRow['day'] ) { $hitData[] = $rechargeRow['id']; unset($rechargeRows[$key]); } } //模糊匹配一下时间 往后推3s $extraData = []; reset($rechargeRows); $endTime = $orderTime + 3; foreach ($rechargeRows as $k => $rechargeRow) { //往后推3s if ($rechargeRow['createtime'] >= $orderTime && $rechargeRow['createtime'] <= $endTime) { $extraData[] = $rechargeRow['id']; unset($rechargeRows[$k]); } } if (!empty($extraData)) { $hitData = array_merge($hitData, $extraData); unset($extraData); Log::error("加购结果: user_id:{$orderRow['user_id']} order_id: {$orderRow['id']}"); } else { if ($orderRow['type'] == '1' && (((int)($orderRow['money'] * 100)) > ((int)$orderRow['kandian']))) { //加购 Log::error("加购异常结果失败: user_id:{$orderRow['user_id']} order_id: {$orderRow['id']}"); } } return $hitData; } /** * 从库配置 * * @param $param * @param string $deploy * @return array|mixed */ private function getDbDeploy($param, $deploy = 'shard') { $db = Config::get('db'); $mod = $param % $db[$deploy . '_num']; $mod = abs($mod); if (isset($this->db[$deploy.'_'.$mod])) { return $this->db[$deploy.'_'.$mod]; } $list = explode(';', $db[$deploy . '_list']); foreach ($list as $item) { $con = explode(':', $item); // 0=0-191库编号 1=192.168.1.149主IP 2=3306主端口 3=192.168.1.150从IP 4=3306从端口 if (count($con) >= 3) { $c = explode('-', $con[0]); //库编号 0开始 1结束 if (count($c) >= 2) { if ($c[0] <= $mod && $mod <= $c[1]) { $database = Config::get('database'); if (count($con) >= 5) { //开启主从 & 带主从配置 $database['deploy'] = 1; $database['rw_separate'] = true; $database['hostname'] = $con[1] . ',' . $con[3]; //192.168.1.149,192.168.1.150 $database['hostport'] = $con[2] . ',' . $con[4]; //3306,3306 } else { //只有主库 $database['hostname'] = $con[1]; $database['hostport'] = $con[2]; } $database['database'] = 'mysql'; $database['table'] = str_replace('$mod', $mod, $db[$deploy . '_database']); $this->db[$deploy.'_'.$mod] = $database; return $database; } } } } return []; } /** * 获取主库的从库配置 从库不存在返回主库 * * @return array */ private function getMainSlaveDbConfig() { $hostArr = explode(',', Env::get('database.admin_hostname')); $portArr = explode(',', Env::get('database.admin_hostport', '3306,3306')); //默认主库 $mainDbConfig = array_merge(Config::get("database"), ['hostname' =>$hostArr[0], 'hostport' => $portArr[0]]); if (count($hostArr) >= 2) { //从库 if (isset($hostArr[2])) { //阳光从库2 $mainDbConfig = array_merge( Config::get("database"), ['hostname' =>$hostArr[2], 'hostport' => $portArr[2]] ); } else { //其他平台从库1 $mainDbConfig = array_merge( Config::get("database"), ['hostname' =>$hostArr[1], 'hostport' => $portArr[1]] ); } } return $mainDbConfig; } }