setName('UpdateRechargeDD') ->addArgument('startOrderId', Argument::REQUIRED, "订单起始id") ->addArgument('endOrderId', Argument::REQUIRED, "订单结束id") ->setDescription('通过订单表的kl字段,刷新recharge表的dd字段'); } protected function execute(Input $input, Output $output) { Request::instance()->module('admin'); $startOrderId = $input->getArgument('startOrderId'); $endOrderId = $input->getArgument('endOrderId'); $output->writeln('订单起始id:' . $startOrderId); $output->writeln('订单结束id:' . $endOrderId); $this->_orderCache['data'] = []; $this->_orderCache['count'] = 0; if ($startOrderId > $endOrderId) { $output->writeln('参数错误,起始id必须小于结束id'); } $pageSize = 1000; $lastOrderId = $startOrderId - 1; try { while (true) { Log::info('beginId:' . $startOrderId); $orderModel = model('orders'); $orderObjs = $orderModel->where('id', '>', $lastOrderId) ->where('id', '<=', $endOrderId) ->where('deduct', 1) ->where('state', '1') ->limit($pageSize) ->field(['id', 'user_id', 'type', 'finishtime'])->select(); if (empty($orderObjs)) { $this->autoOrderRecharge(true); Log::info('ref_finish_lastId:' . $lastOrderId); $output->writeln('ref_finish_lastId:' . $lastOrderId); break; } foreach ($orderObjs as $orderObj) { $orders[] = $orderObj->toArray(); } $this->preOrderCache($orders); $lastOrder = end($orders); $lastOrderId = $lastOrder['id']; $this->autoOrderRecharge(false); Log::info('ref_lastId:' . $lastOrderId); $output->writeln('ref_lastId:' . $lastOrderId); } } catch (\Exception $e) { Log::error($e->getMessage()); } } /** * @param $orders */ function preOrderCache($orders) { foreach ($orders as $order) { $mod = $order['user_id'] % 512; $this->_orderCache['data'][$mod][] = $order; $this->_orderCache['count']++; } } /** * 将订单插入充值记录表 * @param bool $forceUpdate 强制将变量缓存的信息插入数据库 * @throws \think\Exception * @throws \think\exception\PDOException */ function autoOrderRecharge($forceUpdate = false) { if ($this->_orderCache['count'] == 0) { return; } if ($this->_orderCache['count'] > 10000 || $forceUpdate) { foreach ($this->_orderCache['data'] as $mod => $orders) { $db = $this->dbConnect($mod); $rechargeIds = []; foreach ($orders as $order) { $userId = $order['user_id']; $type = $order['type']; $finishtime = $order['finishtime']; $sql = <<= $finishtime order by createtime limit 2; SQL; $res = $db->query($sql); $count = count($res); if ($count == 0) { continue; } elseif ($count == 1) { $_recharge = current($res); $rechargeIds[] = $_recharge['id']; } elseif ($count == 2) {//用一个用户,2个recharge记录相差2s以内算作一个订单的充值 $sub = abs($res[0]['createtime'] - $res[1]['createtime']); if ($sub < 3) { $rechargeIds[] = $res[0]['id']; $rechargeIds[] = $res[1]['id']; } else { $rechargeIds[] = $res[0]['id']; } } } if (!empty($rechargeIds)) { $strRechargeIds = implode(',', $rechargeIds); $sql = <<execute($sql); } $db->close(); } Log::info('data_to_db_count:' . $this->_orderCache['count']); $this->_orderCache['data'] = null; $this->_orderCache['count'] = 0; } } /** * 获取数据库连接 * @param $param 编号 * @param $deploy 业务 * @return \think\db\Connection * @throws \think\Exception */ private function dbConnect($param, $deploy = 'user') { $db_config = $this->get_db_deploy($param, $deploy); $db = Db::connect($db_config); return $db; } /** * 获取db分库的配置参数 * * @param string|int $param 取模值 * @param string $deploy 分库前缀 * @return array */ function get_db_deploy($param, $deploy = 'user') { $db = Config::get('db'); $mod = $param % $db[$deploy . '_num']; $mod = abs($mod); $list = explode(';', $db[$deploy . '_list']); Log::info("分库配置参数:{$param} 标志:{$deploy} 库数量:" . $db[$deploy . '_num'] . " 模:{$mod}"); foreach ($list as $item) { $con = explode(':', $item); // 0=0-191库编号 1=192.168.1.149主IP 2=3306主端口 3=192.168.1.150从IP 4=3306从端口 if (count($con) >= 3) { $c = explode('-', $con[0]); //库编号 0开始 1结束 if (count($c) >= 2) { if ($c[0] <= $mod && $mod <= $c[1]) { $database = Config::get('database'); if ($database['deploy'] == 1 && count($con) >= 5) { //开启主从 & 带主从配置 $database['hostname'] = $con[1] . ',' . $con[3]; //192.168.1.149,192.168.1.150 $database['hostport'] = $con[2] . ',' . $con[4]; //3306,3306 } else { //只有主库 $database['hostname'] = $con[1]; $database['hostport'] = $con[2]; } Log::info("分库获取成功 IP:{$database['hostname']} port: {$database['hostport']}"); $database['database'] = str_replace('$mod', $mod, $db[$deploy . '_database']); return $database; } } } } Log::error("分库获取失败!"); return []; } }