UpdateConsumeForRechargeInfo.php 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Bear
  5. * Date: 2019/1/7
  6. * Time: 下午3:49
  7. */
  8. namespace app\admin\command;
  9. use app\main\helper\ArrayHelper;
  10. use think\Config;
  11. use think\console\Command;
  12. use think\console\Input;
  13. use think\console\input\Argument;
  14. use think\console\Output;
  15. use think\Db;
  16. use think\Exception;
  17. use think\Log;
  18. use think\Request;
  19. class UpdateConsumeForRechargeInfo extends Command
  20. {
  21. public function Configure()
  22. {
  23. $this->setName('UpdateConsumeForRechargeInfo')
  24. ->setDescription('消费记录的充值信息和订单信息脚本')
  25. ->addArgument('beginNum', Argument::REQUIRED, '数据库得开始编号')
  26. ->addArgument('endNum', Argument::REQUIRED, '数据库结束编号');
  27. }
  28. private $beginNum;
  29. private $endNum;
  30. public function execute(Input $input, Output $output)
  31. {
  32. Request::instance()->module('admin');
  33. $this->beginNum = $input->getArgument('beginNum');
  34. $this->endNum = $input->getArgument('endNum');
  35. try {
  36. while ($this->beginNum <= $this->endNum) {
  37. //连接shard库
  38. $shardDbConfig = $this->getDbDeploy($this->beginNum, 'shard');
  39. $consumeTable = $shardDbConfig['table'].'.consume';
  40. unset($shardDbConfig['table']);
  41. //链接user库
  42. $userDbConfig = $this->getDbDeploy($this->beginNum, 'user');
  43. $rechargeTable = $userDbConfig['table'].'.recharge';
  44. unset($userDbConfig['table']);
  45. $lastUserId = 0;
  46. $do = true;
  47. while ($do) {
  48. //取出消费记录得user_id
  49. $sql = "SELECT user_id FROM {$consumeTable} WHERE user_id > {$lastUserId} GROUP BY user_id ORDER BY user_id ASC LIMIT 1000 ";
  50. $userIds = Db::connect($shardDbConfig)->query($sql);
  51. if ($userIds) {
  52. $userIdsArr = array_column($userIds, 'user_id');
  53. while (true) {
  54. if (!$userIdsArr) {
  55. unset($userIdsArr);
  56. break;
  57. }
  58. //一次跑10个用户
  59. $someUserIds = array_splice($userIdsArr, 0, 10);
  60. $userIdsStr = implode(',', $someUserIds);
  61. //拉取消费记录
  62. $consumeSql = "SELECT id,user_id,kandian,free_kandian,createtime,dd_kandian,dd_free_kandian FROM {$consumeTable} WHERE user_id in({$userIdsStr}) ORDER BY createtime ASC";
  63. $consumeRows = Db::connect($shardDbConfig)->query($consumeSql);
  64. $consumeRows = ArrayHelper::index($consumeRows, null, 'user_id');
  65. //拉取充值记录
  66. $rechargeSql = "SELECT id,user_id,kandian,free_kandian,createtime,orders_id,free_endtime,dd FROM {$rechargeTable} WHERE user_id in ($userIdsStr) ORDER BY createtime ASC";
  67. $rechargeRows = Db::connect($userDbConfig)->query($rechargeSql);
  68. $rechargeRows = ArrayHelper::index($rechargeRows, null, 'user_id');
  69. foreach ($consumeRows as $lastUserId => $userConsumeRows) {
  70. $output->write("开始处理用户 {$lastUserId}");
  71. if (!isset($rechargeRows[$lastUserId])) {
  72. $output->error("用户 {$lastUserId}, 充值记录为空");
  73. continue;
  74. }
  75. $result = $this->checkRow($rechargeRows[$lastUserId], $userConsumeRows, $output);
  76. $output->write("完成处理用户 {$lastUserId}");
  77. unset($result);
  78. }
  79. unset($consumeRows);
  80. unset($rechargeRows);
  81. }
  82. } else {
  83. $do = false;
  84. }
  85. unset($userIds);
  86. }
  87. $this->beginNum++;
  88. }
  89. } catch (Exception $e) {
  90. $output->error($this->beginNum.'__匹配消费记录和充值记录---error '.$e->getMessage(). ' line:'.$e->getLine());
  91. }
  92. $output->writeln("任务 结束");
  93. exit();
  94. }
  95. /**
  96. * @param $rechargeRows
  97. * @param $consumeRows
  98. * @return array
  99. */
  100. private function checkRow($rechargeRows, $consumeRows, Output $output)
  101. {
  102. $result = [];
  103. //将负数的充值记录认为是一条消费记录 按时间插入到消费记录中去
  104. foreach ($rechargeRows as $rechargeRow) {
  105. if ($rechargeRow['kandian'] < 0) {
  106. $row = [
  107. 'id' => $rechargeRow['id'],
  108. 'user_id' => $rechargeRow['user_id'],
  109. 'kandian' => abs($rechargeRow['kandian']),
  110. 'free_kandian' => 0,
  111. 'createtime' => $rechargeRow['createtime'],
  112. 'is_recharge' => 1,
  113. ];
  114. $m = 0;
  115. foreach ($consumeRows as $k => $consumeRow) {
  116. $next = $k+1;
  117. if (isset($consumeRows[$next])) {
  118. if ($rechargeRow['createtime'] >= $consumeRow['createtime'] && $rechargeRow['createtime'] <= $consumeRows[$next]['createtime']) {
  119. //插入$k后面
  120. $m = $k+1;
  121. break;
  122. }
  123. } else {
  124. if ($rechargeRow['createtime'] >= $consumeRow['createtime']) {
  125. //插入$k后面
  126. $m = $k+1;
  127. break;
  128. } else {
  129. //插入$k前面
  130. $m = $k;
  131. }
  132. }
  133. }
  134. array_splice($consumeRows, $m, 0, [$row]);
  135. }
  136. }
  137. foreach ($consumeRows as $consumeRow) {
  138. $user_id = $consumeRow['user_id'];
  139. $currentConsumeId = $consumeRow['id'];
  140. $result[$currentConsumeId] = [];
  141. if (count($rechargeRows) <=0) {
  142. $output->error("刷用户消费记录 用户id:{$user_id} 充值记录不够扣 消费记录id: {$currentConsumeId}");
  143. break;
  144. }
  145. try {
  146. $res = $this->reduce($consumeRow, $rechargeRows);
  147. if ($res) {
  148. $update = [];
  149. if (isset($consumeRow['is_recharge'])) {
  150. //是负数记录 继续下一条
  151. continue;
  152. }
  153. $ddKandian = $ddFreeKandian = 0;
  154. foreach ($res['recharge'] as $detail) {
  155. $ddKandian += $detail['dd_kandian'];
  156. $ddFreeKandian += $detail['dd_free_kandian'];
  157. }
  158. //维护一下扣量信息
  159. if ($consumeRow['dd_kandian'] != $ddKandian
  160. || $consumeRow['dd_free_kandian'] != $ddFreeKandian
  161. ) {
  162. //需要重置扣量书币字段
  163. $update['dd_kandian'] = $ddKandian;
  164. $update['dd_free_kandian'] = $ddFreeKandian;
  165. $output->write("扣量信息异常,需重新刷入, user_id: {$consumeRow['user_id']}, id: {$consumeRow['id']} 原始数据: dd_kandian:{$consumeRow['dd_kandian']} dd_free_kandian:{$consumeRow['dd_free_kandian']}");
  166. model("Consume")
  167. ->setConnect($user_id)
  168. ->update($update, ['id' => $currentConsumeId]);
  169. }
  170. }
  171. } catch (Exception $e) {
  172. $output->error("消费记录更新字段 处理异常:{$e->getMessage()} line:{$e->getLine()}");
  173. }
  174. }
  175. unset($consumeRows);
  176. unset($rechargeRows);
  177. return $result;
  178. }
  179. /**
  180. * 过滤 返回不在目标内的元素
  181. * @param $rows
  182. * @param $unsetRows
  183. * @return array
  184. */
  185. private function arrayRechargeFilter($rows, $unsetRows)
  186. {
  187. $result = [];
  188. foreach ($rows as $row) {
  189. if (!in_array($row['id'], $unsetRows)) {
  190. $result[] = $row;
  191. }
  192. }
  193. return $result;
  194. }
  195. /**
  196. * 匹配扣减
  197. * @param $consume
  198. * @param $rechargeRows
  199. * @return array
  200. * @throws Exception
  201. */
  202. private function reduce($consume, &$rechargeRows)
  203. {
  204. //初步确定范围
  205. $release = [];
  206. $del = [];
  207. foreach ($rechargeRows as $rechargeRow) {
  208. if ($rechargeRow['kandian'] < 0) {
  209. continue;
  210. }
  211. if ($rechargeRow['createtime'] <= $consume['createtime']) {
  212. $release[] = $rechargeRow;
  213. }
  214. }
  215. //进行扣减
  216. if ($release) {
  217. $recharge = $order = [];
  218. $kandian = $consume['kandian'];
  219. $freeKandian = $consume['free_kandian'];
  220. $update = []; //有变动的充值记录
  221. //开始进行消费记录匹配
  222. if ($freeKandian > 0) {
  223. //先处理免费看点
  224. foreach ($release as $key => $item) {
  225. if ($item['kandian'] > 0) {
  226. continue;
  227. }
  228. if ($consume['createtime'] > $item['free_endtime']) {
  229. //判断是否过期 过期删除
  230. $del[] = $item['id'];
  231. continue;
  232. }
  233. if ($freeKandian > $item['free_kandian']) {
  234. //recharge不够扣的
  235. $freeKandian -= $item['free_kandian'];
  236. $recharge[] = [
  237. 'id' => $item['id'],
  238. 'kandian' => 0,
  239. 'free_kandian' => $item['free_kandian'],
  240. 'dd_kandian' => 0,
  241. 'dd_free_kandian' => $item['dd'] == 1 ? $item['free_kandian'] : 0,
  242. ];
  243. if ($item['orders_id']) {
  244. $order[] = $item['orders_id'];
  245. }
  246. $update[$item['id']] = [
  247. 'free_kandian' => $item['free_kandian'],
  248. 'kandian' => 0,
  249. ];
  250. } else {
  251. //够扣
  252. $item['free_kandian'] = $item['free_kandian'] - $freeKandian;
  253. $recharge[] = [
  254. 'id' => $item['id'],
  255. 'kandian' => 0,
  256. 'free_kandian' => $freeKandian,
  257. 'dd_kandian' => 0,
  258. 'dd_free_kandian' => $item['dd'] == 1 ? $freeKandian : 0,
  259. ];
  260. if ($item['orders_id']) {
  261. $order[] = $item['orders_id'];
  262. }
  263. $update[$item['id']] = [
  264. 'free_kandian' => $freeKandian,
  265. 'kandian' => 0,
  266. ];
  267. $freeKandian = 0;
  268. break;
  269. }
  270. }
  271. //如果有剩余 抛出异常
  272. if ($freeKandian > 0) {
  273. throw new Exception("匹配异常 consume_id:{$consume['id']} 免费书币不够扣", 101);
  274. }
  275. }
  276. reset($release);
  277. if ($kandian > 0) {
  278. //永久书币
  279. foreach ($release as $key => $item) {
  280. if ($item['free_kandian'] > 0) {
  281. //免费的跳过
  282. continue;
  283. }
  284. if ($kandian > $item['kandian']) {
  285. //不够扣的
  286. $kandian = $kandian - $item['kandian'];
  287. $recharge[] = [
  288. 'id' => $item['id'],
  289. 'kandian' => $item['kandian'],
  290. 'free_kandian' => 0,
  291. 'dd_kandian' => $item['dd'] == 1 ? $item['kandian'] : 0,
  292. 'dd_free_kandian' => 0,
  293. ];
  294. $update[$item['id']] = [
  295. 'free_kandian' => 0,
  296. 'kandian' => $item['kandian'],
  297. ];
  298. if ($item['orders_id']) {
  299. $order[] = $item['orders_id'];
  300. }
  301. } else {
  302. //够扣
  303. $item['kandian'] = $item['kandian'] - $kandian;
  304. $recharge[] = [
  305. 'id' => $item['id'],
  306. 'kandian' => $kandian,
  307. 'free_kandian' => 0,
  308. 'dd_kandian' => $item['dd'] == 1 ? $kandian : 0,
  309. 'dd_free_kandian' => 0,
  310. ];
  311. if ($item['orders_id']) {
  312. $order[] = $item['orders_id'];
  313. }
  314. $update[$item['id']] = [
  315. 'free_kandian' => 0,
  316. 'kandian' => $kandian,
  317. ];
  318. break;
  319. }
  320. }
  321. }
  322. //更新rechargeRows
  323. if ($update) {
  324. foreach ($rechargeRows as $key => &$rechargeRow) {
  325. $id = $rechargeRow['id'];
  326. if (isset($update[$id])) {
  327. if ($rechargeRow['kandian'] > 0) {
  328. $rechargeRow['kandian'] = $rechargeRow['kandian'] - $update[$id]['kandian'];
  329. if ($rechargeRow['kandian'] <= 0) {
  330. //是否删除都可以
  331. $del[] = $id;
  332. }
  333. }
  334. if ($rechargeRow['free_kandian'] > 0) {
  335. $rechargeRow['free_kandian'] = $rechargeRow['free_kandian'] - $update[$id]['free_kandian'];
  336. if ($rechargeRow['free_kandian'] <= 0) {
  337. //是否删除都可以
  338. $del[] = $id;
  339. }
  340. }
  341. }
  342. }
  343. }
  344. $rechargeRows = $this->arrayRechargeFilter($rechargeRows, $del);
  345. $result = [
  346. 'recharge' => $recharge,
  347. 'orders_id' => array_unique($order),
  348. ];
  349. unset($release);
  350. unset($recharge);
  351. unset($order);
  352. unset($update);
  353. } else {
  354. throw new Exception("匹配异常 consume_id:{$consume['id']} 未找到可匹配的充值记录", 103);
  355. }
  356. return $result;
  357. }
  358. /**
  359. * 从库配置
  360. *
  361. * @param $param
  362. * @param string $deploy
  363. * @return array|mixed
  364. */
  365. private function getDbDeploy($param, $deploy = 'shard')
  366. {
  367. $db = Config::get('db');
  368. $mod = $param % $db[$deploy . '_num'];
  369. $mod = abs($mod);
  370. $list = explode(';', $db[$deploy . '_list']);
  371. foreach ($list as $item) {
  372. $con = explode(':', $item); // 0=0-191库编号 1=192.168.1.149主IP 2=3306主端口 3=192.168.1.150从IP 4=3306从端口
  373. if (count($con) >= 3) {
  374. $c = explode('-', $con[0]); //库编号 0开始 1结束
  375. if (count($c) >= 2) {
  376. if ($c[0] <= $mod && $mod <= $c[1]) {
  377. $database = Config::get('database');
  378. if (count($con) >= 5) { //开启主从 & 带主从配置
  379. $database['deploy'] = 1;
  380. $database['rw_separate'] = true;
  381. $database['hostname'] = $con[1] . ',' . $con[3]; //192.168.1.149,192.168.1.150
  382. $database['hostport'] = $con[2] . ',' . $con[4]; //3306,3306
  383. } else { //只有主库
  384. $database['hostname'] = $con[1];
  385. $database['hostport'] = $con[2];
  386. }
  387. $database['database'] = 'mysql';
  388. $database['table'] = str_replace('$mod', $mod, $db[$deploy . '_database']);
  389. return $database;
  390. }
  391. }
  392. }
  393. }
  394. return [];
  395. }
  396. }