123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- <?php
- /**
- * Created by PhpStorm.
- * User: Bear
- * Date: 2019/1/7
- * Time: 下午3:49
- */
- namespace app\admin\command;
- use app\main\helper\ArrayHelper;
- use think\Config;
- use think\console\Command;
- use think\console\Input;
- use think\console\input\Argument;
- use think\console\Output;
- use think\Db;
- use think\Exception;
- use think\Log;
- use think\Request;
- class UpdateConsumeForRechargeInfo extends Command
- {
- public function Configure()
- {
- $this->setName('UpdateConsumeForRechargeInfo')
- ->setDescription('消费记录的充值信息和订单信息脚本')
- ->addArgument('beginNum', Argument::REQUIRED, '数据库得开始编号')
- ->addArgument('endNum', Argument::REQUIRED, '数据库结束编号');
- }
- private $beginNum;
- private $endNum;
- public function execute(Input $input, Output $output)
- {
- Request::instance()->module('admin');
- $this->beginNum = $input->getArgument('beginNum');
- $this->endNum = $input->getArgument('endNum');
- try {
- while ($this->beginNum <= $this->endNum) {
- //连接shard库
- $shardDbConfig = $this->getDbDeploy($this->beginNum, 'shard');
- $consumeTable = $shardDbConfig['table'].'.consume';
- unset($shardDbConfig['table']);
- //链接user库
- $userDbConfig = $this->getDbDeploy($this->beginNum, 'user');
- $rechargeTable = $userDbConfig['table'].'.recharge';
- unset($userDbConfig['table']);
- $lastUserId = 0;
- $do = true;
- while ($do) {
- //取出消费记录得user_id
- $sql = "SELECT user_id FROM {$consumeTable} WHERE user_id > {$lastUserId} GROUP BY user_id ORDER BY user_id ASC LIMIT 1000 ";
- $userIds = Db::connect($shardDbConfig)->query($sql);
- if ($userIds) {
- $userIdsArr = array_column($userIds, 'user_id');
- while (true) {
- if (!$userIdsArr) {
- unset($userIdsArr);
- break;
- }
- //一次跑10个用户
- $someUserIds = array_splice($userIdsArr, 0, 10);
- $userIdsStr = implode(',', $someUserIds);
- //拉取消费记录
- $consumeSql = "SELECT id,user_id,kandian,free_kandian,createtime,dd_kandian,dd_free_kandian FROM {$consumeTable} WHERE user_id in({$userIdsStr}) ORDER BY createtime ASC";
- $consumeRows = Db::connect($shardDbConfig)->query($consumeSql);
- $consumeRows = ArrayHelper::index($consumeRows, null, 'user_id');
- //拉取充值记录
- $rechargeSql = "SELECT id,user_id,kandian,free_kandian,createtime,orders_id,free_endtime,dd FROM {$rechargeTable} WHERE user_id in ($userIdsStr) ORDER BY createtime ASC";
- $rechargeRows = Db::connect($userDbConfig)->query($rechargeSql);
- $rechargeRows = ArrayHelper::index($rechargeRows, null, 'user_id');
- foreach ($consumeRows as $lastUserId => $userConsumeRows) {
- $output->write("开始处理用户 {$lastUserId}");
- if (!isset($rechargeRows[$lastUserId])) {
- $output->error("用户 {$lastUserId}, 充值记录为空");
- continue;
- }
- $result = $this->checkRow($rechargeRows[$lastUserId], $userConsumeRows, $output);
- $output->write("完成处理用户 {$lastUserId}");
- unset($result);
- }
- unset($consumeRows);
- unset($rechargeRows);
- }
- } else {
- $do = false;
- }
- unset($userIds);
- }
- $this->beginNum++;
- }
- } catch (Exception $e) {
- $output->error($this->beginNum.'__匹配消费记录和充值记录---error '.$e->getMessage(). ' line:'.$e->getLine());
- }
- $output->writeln("任务 结束");
- exit();
- }
- /**
- * @param $rechargeRows
- * @param $consumeRows
- * @return array
- */
- private function checkRow($rechargeRows, $consumeRows, Output $output)
- {
- $result = [];
- //将负数的充值记录认为是一条消费记录 按时间插入到消费记录中去
- foreach ($rechargeRows as $rechargeRow) {
- if ($rechargeRow['kandian'] < 0) {
- $row = [
- 'id' => $rechargeRow['id'],
- 'user_id' => $rechargeRow['user_id'],
- 'kandian' => abs($rechargeRow['kandian']),
- 'free_kandian' => 0,
- 'createtime' => $rechargeRow['createtime'],
- 'is_recharge' => 1,
- ];
- $m = 0;
- foreach ($consumeRows as $k => $consumeRow) {
- $next = $k+1;
- if (isset($consumeRows[$next])) {
- if ($rechargeRow['createtime'] >= $consumeRow['createtime'] && $rechargeRow['createtime'] <= $consumeRows[$next]['createtime']) {
- //插入$k后面
- $m = $k+1;
- break;
- }
- } else {
- if ($rechargeRow['createtime'] >= $consumeRow['createtime']) {
- //插入$k后面
- $m = $k+1;
- break;
- } else {
- //插入$k前面
- $m = $k;
- }
- }
- }
- array_splice($consumeRows, $m, 0, [$row]);
- }
- }
- foreach ($consumeRows as $consumeRow) {
- $user_id = $consumeRow['user_id'];
- $currentConsumeId = $consumeRow['id'];
- $result[$currentConsumeId] = [];
- if (count($rechargeRows) <=0) {
- $output->error("刷用户消费记录 用户id:{$user_id} 充值记录不够扣 消费记录id: {$currentConsumeId}");
- break;
- }
- try {
- $res = $this->reduce($consumeRow, $rechargeRows);
- if ($res) {
- $update = [];
- if (isset($consumeRow['is_recharge'])) {
- //是负数记录 继续下一条
- continue;
- }
- $ddKandian = $ddFreeKandian = 0;
- foreach ($res['recharge'] as $detail) {
- $ddKandian += $detail['dd_kandian'];
- $ddFreeKandian += $detail['dd_free_kandian'];
- }
- //维护一下扣量信息
- if ($consumeRow['dd_kandian'] != $ddKandian
- || $consumeRow['dd_free_kandian'] != $ddFreeKandian
- ) {
- //需要重置扣量书币字段
- $update['dd_kandian'] = $ddKandian;
- $update['dd_free_kandian'] = $ddFreeKandian;
- $output->write("扣量信息异常,需重新刷入, user_id: {$consumeRow['user_id']}, id: {$consumeRow['id']} 原始数据: dd_kandian:{$consumeRow['dd_kandian']} dd_free_kandian:{$consumeRow['dd_free_kandian']}");
- model("Consume")
- ->setConnect($user_id)
- ->update($update, ['id' => $currentConsumeId]);
- }
- }
- } catch (Exception $e) {
- $output->error("消费记录更新字段 处理异常:{$e->getMessage()} line:{$e->getLine()}");
- }
- }
- unset($consumeRows);
- unset($rechargeRows);
- return $result;
- }
- /**
- * 过滤 返回不在目标内的元素
- * @param $rows
- * @param $unsetRows
- * @return array
- */
- private function arrayRechargeFilter($rows, $unsetRows)
- {
- $result = [];
- foreach ($rows as $row) {
- if (!in_array($row['id'], $unsetRows)) {
- $result[] = $row;
- }
- }
- return $result;
- }
- /**
- * 匹配扣减
- * @param $consume
- * @param $rechargeRows
- * @return array
- * @throws Exception
- */
- private function reduce($consume, &$rechargeRows)
- {
- //初步确定范围
- $release = [];
- $del = [];
- foreach ($rechargeRows as $rechargeRow) {
- if ($rechargeRow['kandian'] < 0) {
- continue;
- }
- if ($rechargeRow['createtime'] <= $consume['createtime']) {
- $release[] = $rechargeRow;
- }
- }
- //进行扣减
- if ($release) {
- $recharge = $order = [];
- $kandian = $consume['kandian'];
- $freeKandian = $consume['free_kandian'];
- $update = []; //有变动的充值记录
- //开始进行消费记录匹配
- if ($freeKandian > 0) {
- //先处理免费看点
- foreach ($release as $key => $item) {
- if ($item['kandian'] > 0) {
- continue;
- }
- if ($consume['createtime'] > $item['free_endtime']) {
- //判断是否过期 过期删除
- $del[] = $item['id'];
- continue;
- }
- if ($freeKandian > $item['free_kandian']) {
- //recharge不够扣的
- $freeKandian -= $item['free_kandian'];
- $recharge[] = [
- 'id' => $item['id'],
- 'kandian' => 0,
- 'free_kandian' => $item['free_kandian'],
- 'dd_kandian' => 0,
- 'dd_free_kandian' => $item['dd'] == 1 ? $item['free_kandian'] : 0,
- ];
- if ($item['orders_id']) {
- $order[] = $item['orders_id'];
- }
- $update[$item['id']] = [
- 'free_kandian' => $item['free_kandian'],
- 'kandian' => 0,
- ];
- } else {
- //够扣
- $item['free_kandian'] = $item['free_kandian'] - $freeKandian;
- $recharge[] = [
- 'id' => $item['id'],
- 'kandian' => 0,
- 'free_kandian' => $freeKandian,
- 'dd_kandian' => 0,
- 'dd_free_kandian' => $item['dd'] == 1 ? $freeKandian : 0,
- ];
- if ($item['orders_id']) {
- $order[] = $item['orders_id'];
- }
- $update[$item['id']] = [
- 'free_kandian' => $freeKandian,
- 'kandian' => 0,
- ];
- $freeKandian = 0;
- break;
- }
- }
- //如果有剩余 抛出异常
- if ($freeKandian > 0) {
- throw new Exception("匹配异常 consume_id:{$consume['id']} 免费书币不够扣", 101);
- }
- }
- reset($release);
- if ($kandian > 0) {
- //永久书币
- foreach ($release as $key => $item) {
- if ($item['free_kandian'] > 0) {
- //免费的跳过
- continue;
- }
- if ($kandian > $item['kandian']) {
- //不够扣的
- $kandian = $kandian - $item['kandian'];
- $recharge[] = [
- 'id' => $item['id'],
- 'kandian' => $item['kandian'],
- 'free_kandian' => 0,
- 'dd_kandian' => $item['dd'] == 1 ? $item['kandian'] : 0,
- 'dd_free_kandian' => 0,
- ];
- $update[$item['id']] = [
- 'free_kandian' => 0,
- 'kandian' => $item['kandian'],
- ];
- if ($item['orders_id']) {
- $order[] = $item['orders_id'];
- }
- } else {
- //够扣
- $item['kandian'] = $item['kandian'] - $kandian;
- $recharge[] = [
- 'id' => $item['id'],
- 'kandian' => $kandian,
- 'free_kandian' => 0,
- 'dd_kandian' => $item['dd'] == 1 ? $kandian : 0,
- 'dd_free_kandian' => 0,
- ];
- if ($item['orders_id']) {
- $order[] = $item['orders_id'];
- }
- $update[$item['id']] = [
- 'free_kandian' => 0,
- 'kandian' => $kandian,
- ];
- break;
- }
- }
- }
- //更新rechargeRows
- if ($update) {
- foreach ($rechargeRows as $key => &$rechargeRow) {
- $id = $rechargeRow['id'];
- if (isset($update[$id])) {
- if ($rechargeRow['kandian'] > 0) {
- $rechargeRow['kandian'] = $rechargeRow['kandian'] - $update[$id]['kandian'];
- if ($rechargeRow['kandian'] <= 0) {
- //是否删除都可以
- $del[] = $id;
- }
- }
- if ($rechargeRow['free_kandian'] > 0) {
- $rechargeRow['free_kandian'] = $rechargeRow['free_kandian'] - $update[$id]['free_kandian'];
- if ($rechargeRow['free_kandian'] <= 0) {
- //是否删除都可以
- $del[] = $id;
- }
- }
- }
- }
- }
- $rechargeRows = $this->arrayRechargeFilter($rechargeRows, $del);
- $result = [
- 'recharge' => $recharge,
- 'orders_id' => array_unique($order),
- ];
- unset($release);
- unset($recharge);
- unset($order);
- unset($update);
- } else {
- throw new Exception("匹配异常 consume_id:{$consume['id']} 未找到可匹配的充值记录", 103);
- }
- return $result;
- }
- /**
- * 从库配置
- *
- * @param $param
- * @param string $deploy
- * @return array|mixed
- */
- private function getDbDeploy($param, $deploy = 'shard')
- {
- $db = Config::get('db');
- $mod = $param % $db[$deploy . '_num'];
- $mod = abs($mod);
- $list = explode(';', $db[$deploy . '_list']);
- foreach ($list as $item) {
- $con = explode(':', $item); // 0=0-191库编号 1=192.168.1.149主IP 2=3306主端口 3=192.168.1.150从IP 4=3306从端口
- if (count($con) >= 3) {
- $c = explode('-', $con[0]); //库编号 0开始 1结束
- if (count($c) >= 2) {
- if ($c[0] <= $mod && $mod <= $c[1]) {
- $database = Config::get('database');
- if (count($con) >= 5) { //开启主从 & 带主从配置
- $database['deploy'] = 1;
- $database['rw_separate'] = true;
- $database['hostname'] = $con[1] . ',' . $con[3]; //192.168.1.149,192.168.1.150
- $database['hostport'] = $con[2] . ',' . $con[4]; //3306,3306
- } else { //只有主库
- $database['hostname'] = $con[1];
- $database['hostport'] = $con[2];
- }
- $database['database'] = 'mysql';
- $database['table'] = str_replace('$mod', $mod, $db[$deploy . '_database']);
- return $database;
- }
- }
- }
- }
- return [];
- }
- }
|