setName('UpdateConsumeForRechargeInfo') ->setDescription('消费记录的充值信息和订单信息脚本') ->addArgument('beginNum', Argument::REQUIRED, '数据库得开始编号') ->addArgument('endNum', Argument::REQUIRED, '数据库结束编号'); } private $beginNum; private $endNum; public function execute(Input $input, Output $output) { Request::instance()->module('admin'); $this->beginNum = $input->getArgument('beginNum'); $this->endNum = $input->getArgument('endNum'); try { while ($this->beginNum <= $this->endNum) { //连接shard库 $shardDbConfig = $this->getDbDeploy($this->beginNum, 'shard'); $consumeTable = $shardDbConfig['table'].'.consume'; unset($shardDbConfig['table']); //链接user库 $userDbConfig = $this->getDbDeploy($this->beginNum, 'user'); $rechargeTable = $userDbConfig['table'].'.recharge'; unset($userDbConfig['table']); $lastUserId = 0; $do = true; while ($do) { //取出消费记录得user_id $sql = "SELECT user_id FROM {$consumeTable} WHERE user_id > {$lastUserId} GROUP BY user_id ORDER BY user_id ASC LIMIT 1000 "; $userIds = Db::connect($shardDbConfig)->query($sql); if ($userIds) { $userIdsArr = array_column($userIds, 'user_id'); while (true) { if (!$userIdsArr) { unset($userIdsArr); break; } //一次跑10个用户 $someUserIds = array_splice($userIdsArr, 0, 10); $userIdsStr = implode(',', $someUserIds); //拉取消费记录 $consumeSql = "SELECT id,user_id,kandian,free_kandian,createtime,dd_kandian,dd_free_kandian FROM {$consumeTable} WHERE user_id in({$userIdsStr}) ORDER BY createtime ASC"; $consumeRows = Db::connect($shardDbConfig)->query($consumeSql); $consumeRows = ArrayHelper::index($consumeRows, null, 'user_id'); //拉取充值记录 $rechargeSql = "SELECT id,user_id,kandian,free_kandian,createtime,orders_id,free_endtime,dd FROM {$rechargeTable} WHERE user_id in ($userIdsStr) ORDER BY createtime ASC"; $rechargeRows = Db::connect($userDbConfig)->query($rechargeSql); $rechargeRows = ArrayHelper::index($rechargeRows, null, 'user_id'); foreach ($consumeRows as $lastUserId => $userConsumeRows) { $output->write("开始处理用户 {$lastUserId}"); if (!isset($rechargeRows[$lastUserId])) { $output->error("用户 {$lastUserId}, 充值记录为空"); continue; } $result = $this->checkRow($rechargeRows[$lastUserId], $userConsumeRows, $output); $output->write("完成处理用户 {$lastUserId}"); unset($result); } unset($consumeRows); unset($rechargeRows); } } else { $do = false; } unset($userIds); } $this->beginNum++; } } catch (Exception $e) { $output->error($this->beginNum.'__匹配消费记录和充值记录---error '.$e->getMessage(). ' line:'.$e->getLine()); } $output->writeln("任务 结束"); exit(); } /** * @param $rechargeRows * @param $consumeRows * @return array */ private function checkRow($rechargeRows, $consumeRows, Output $output) { $result = []; //将负数的充值记录认为是一条消费记录 按时间插入到消费记录中去 foreach ($rechargeRows as $rechargeRow) { if ($rechargeRow['kandian'] < 0) { $row = [ 'id' => $rechargeRow['id'], 'user_id' => $rechargeRow['user_id'], 'kandian' => abs($rechargeRow['kandian']), 'free_kandian' => 0, 'createtime' => $rechargeRow['createtime'], 'is_recharge' => 1, ]; $m = 0; foreach ($consumeRows as $k => $consumeRow) { $next = $k+1; if (isset($consumeRows[$next])) { if ($rechargeRow['createtime'] >= $consumeRow['createtime'] && $rechargeRow['createtime'] <= $consumeRows[$next]['createtime']) { //插入$k后面 $m = $k+1; break; } } else { if ($rechargeRow['createtime'] >= $consumeRow['createtime']) { //插入$k后面 $m = $k+1; break; } else { //插入$k前面 $m = $k; } } } array_splice($consumeRows, $m, 0, [$row]); } } foreach ($consumeRows as $consumeRow) { $user_id = $consumeRow['user_id']; $currentConsumeId = $consumeRow['id']; $result[$currentConsumeId] = []; if (count($rechargeRows) <=0) { $output->error("刷用户消费记录 用户id:{$user_id} 充值记录不够扣 消费记录id: {$currentConsumeId}"); break; } try { $res = $this->reduce($consumeRow, $rechargeRows); if ($res) { $update = []; if (isset($consumeRow['is_recharge'])) { //是负数记录 继续下一条 continue; } $ddKandian = $ddFreeKandian = 0; foreach ($res['recharge'] as $detail) { $ddKandian += $detail['dd_kandian']; $ddFreeKandian += $detail['dd_free_kandian']; } //维护一下扣量信息 if ($consumeRow['dd_kandian'] != $ddKandian || $consumeRow['dd_free_kandian'] != $ddFreeKandian ) { //需要重置扣量书币字段 $update['dd_kandian'] = $ddKandian; $update['dd_free_kandian'] = $ddFreeKandian; $output->write("扣量信息异常,需重新刷入, user_id: {$consumeRow['user_id']}, id: {$consumeRow['id']} 原始数据: dd_kandian:{$consumeRow['dd_kandian']} dd_free_kandian:{$consumeRow['dd_free_kandian']}"); model("Consume") ->setConnect($user_id) ->update($update, ['id' => $currentConsumeId]); } } } catch (Exception $e) { $output->error("消费记录更新字段 处理异常:{$e->getMessage()} line:{$e->getLine()}"); } } unset($consumeRows); unset($rechargeRows); return $result; } /** * 过滤 返回不在目标内的元素 * @param $rows * @param $unsetRows * @return array */ private function arrayRechargeFilter($rows, $unsetRows) { $result = []; foreach ($rows as $row) { if (!in_array($row['id'], $unsetRows)) { $result[] = $row; } } return $result; } /** * 匹配扣减 * @param $consume * @param $rechargeRows * @return array * @throws Exception */ private function reduce($consume, &$rechargeRows) { //初步确定范围 $release = []; $del = []; foreach ($rechargeRows as $rechargeRow) { if ($rechargeRow['kandian'] < 0) { continue; } if ($rechargeRow['createtime'] <= $consume['createtime']) { $release[] = $rechargeRow; } } //进行扣减 if ($release) { $recharge = $order = []; $kandian = $consume['kandian']; $freeKandian = $consume['free_kandian']; $update = []; //有变动的充值记录 //开始进行消费记录匹配 if ($freeKandian > 0) { //先处理免费看点 foreach ($release as $key => $item) { if ($item['kandian'] > 0) { continue; } if ($consume['createtime'] > $item['free_endtime']) { //判断是否过期 过期删除 $del[] = $item['id']; continue; } if ($freeKandian > $item['free_kandian']) { //recharge不够扣的 $freeKandian -= $item['free_kandian']; $recharge[] = [ 'id' => $item['id'], 'kandian' => 0, 'free_kandian' => $item['free_kandian'], 'dd_kandian' => 0, 'dd_free_kandian' => $item['dd'] == 1 ? $item['free_kandian'] : 0, ]; if ($item['orders_id']) { $order[] = $item['orders_id']; } $update[$item['id']] = [ 'free_kandian' => $item['free_kandian'], 'kandian' => 0, ]; } else { //够扣 $item['free_kandian'] = $item['free_kandian'] - $freeKandian; $recharge[] = [ 'id' => $item['id'], 'kandian' => 0, 'free_kandian' => $freeKandian, 'dd_kandian' => 0, 'dd_free_kandian' => $item['dd'] == 1 ? $freeKandian : 0, ]; if ($item['orders_id']) { $order[] = $item['orders_id']; } $update[$item['id']] = [ 'free_kandian' => $freeKandian, 'kandian' => 0, ]; $freeKandian = 0; break; } } //如果有剩余 抛出异常 if ($freeKandian > 0) { throw new Exception("匹配异常 consume_id:{$consume['id']} 免费书币不够扣", 101); } } reset($release); if ($kandian > 0) { //永久书币 foreach ($release as $key => $item) { if ($item['free_kandian'] > 0) { //免费的跳过 continue; } if ($kandian > $item['kandian']) { //不够扣的 $kandian = $kandian - $item['kandian']; $recharge[] = [ 'id' => $item['id'], 'kandian' => $item['kandian'], 'free_kandian' => 0, 'dd_kandian' => $item['dd'] == 1 ? $item['kandian'] : 0, 'dd_free_kandian' => 0, ]; $update[$item['id']] = [ 'free_kandian' => 0, 'kandian' => $item['kandian'], ]; if ($item['orders_id']) { $order[] = $item['orders_id']; } } else { //够扣 $item['kandian'] = $item['kandian'] - $kandian; $recharge[] = [ 'id' => $item['id'], 'kandian' => $kandian, 'free_kandian' => 0, 'dd_kandian' => $item['dd'] == 1 ? $kandian : 0, 'dd_free_kandian' => 0, ]; if ($item['orders_id']) { $order[] = $item['orders_id']; } $update[$item['id']] = [ 'free_kandian' => 0, 'kandian' => $kandian, ]; break; } } } //更新rechargeRows if ($update) { foreach ($rechargeRows as $key => &$rechargeRow) { $id = $rechargeRow['id']; if (isset($update[$id])) { if ($rechargeRow['kandian'] > 0) { $rechargeRow['kandian'] = $rechargeRow['kandian'] - $update[$id]['kandian']; if ($rechargeRow['kandian'] <= 0) { //是否删除都可以 $del[] = $id; } } if ($rechargeRow['free_kandian'] > 0) { $rechargeRow['free_kandian'] = $rechargeRow['free_kandian'] - $update[$id]['free_kandian']; if ($rechargeRow['free_kandian'] <= 0) { //是否删除都可以 $del[] = $id; } } } } } $rechargeRows = $this->arrayRechargeFilter($rechargeRows, $del); $result = [ 'recharge' => $recharge, 'orders_id' => array_unique($order), ]; unset($release); unset($recharge); unset($order); unset($update); } else { throw new Exception("匹配异常 consume_id:{$consume['id']} 未找到可匹配的充值记录", 103); } return $result; } /** * 从库配置 * * @param $param * @param string $deploy * @return array|mixed */ private function getDbDeploy($param, $deploy = 'shard') { $db = Config::get('db'); $mod = $param % $db[$deploy . '_num']; $mod = abs($mod); $list = explode(';', $db[$deploy . '_list']); foreach ($list as $item) { $con = explode(':', $item); // 0=0-191库编号 1=192.168.1.149主IP 2=3306主端口 3=192.168.1.150从IP 4=3306从端口 if (count($con) >= 3) { $c = explode('-', $con[0]); //库编号 0开始 1结束 if (count($c) >= 2) { if ($c[0] <= $mod && $mod <= $c[1]) { $database = Config::get('database'); if (count($con) >= 5) { //开启主从 & 带主从配置 $database['deploy'] = 1; $database['rw_separate'] = true; $database['hostname'] = $con[1] . ',' . $con[3]; //192.168.1.149,192.168.1.150 $database['hostport'] = $con[2] . ',' . $con[4]; //3306,3306 } else { //只有主库 $database['hostname'] = $con[1]; $database['hostport'] = $con[2]; } $database['database'] = 'mysql'; $database['table'] = str_replace('$mod', $mod, $db[$deploy . '_database']); return $database; } } } } return []; } }