UpdateRechargeDD.php 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. <?php
  2. namespace app\admin\command;
  3. use think\Config;
  4. use think\console\Command;
  5. use think\console\Input;
  6. use think\console\input\Argument;
  7. use think\console\Output;
  8. use think\Db;
  9. use think\Log;
  10. use think\Request;
  11. class UpdateRechargeDD extends Command
  12. {
  13. private $_orderCache = [];
  14. protected function configure()
  15. {
  16. $this->setName('UpdateRechargeDD')
  17. ->addArgument('startOrderId', Argument::REQUIRED, "订单起始id")
  18. ->addArgument('endOrderId', Argument::REQUIRED, "订单结束id")
  19. ->setDescription('通过订单表的kl字段,刷新recharge表的dd字段');
  20. }
  21. protected function execute(Input $input, Output $output)
  22. {
  23. Request::instance()->module('admin');
  24. $startOrderId = $input->getArgument('startOrderId');
  25. $endOrderId = $input->getArgument('endOrderId');
  26. $output->writeln('订单起始id:' . $startOrderId);
  27. $output->writeln('订单结束id:' . $endOrderId);
  28. $this->_orderCache['data'] = [];
  29. $this->_orderCache['count'] = 0;
  30. if ($startOrderId > $endOrderId) {
  31. $output->writeln('参数错误,起始id必须小于结束id');
  32. }
  33. $pageSize = 1000;
  34. $lastOrderId = $startOrderId - 1;
  35. try {
  36. while (true) {
  37. Log::info('beginId:' . $startOrderId);
  38. $orderModel = model('orders');
  39. $orderObjs = $orderModel->where('id', '>', $lastOrderId)
  40. ->where('id', '<=', $endOrderId)
  41. ->where('deduct', 1)
  42. ->where('state', '1')
  43. ->limit($pageSize)
  44. ->field(['id', 'user_id', 'type', 'finishtime'])->select();
  45. if (empty($orderObjs)) {
  46. $this->autoOrderRecharge(true);
  47. Log::info('ref_finish_lastId:' . $lastOrderId);
  48. $output->writeln('ref_finish_lastId:' . $lastOrderId);
  49. break;
  50. }
  51. foreach ($orderObjs as $orderObj) {
  52. $orders[] = $orderObj->toArray();
  53. }
  54. $this->preOrderCache($orders);
  55. $lastOrder = end($orders);
  56. $lastOrderId = $lastOrder['id'];
  57. $this->autoOrderRecharge(false);
  58. Log::info('ref_lastId:' . $lastOrderId);
  59. $output->writeln('ref_lastId:' . $lastOrderId);
  60. }
  61. } catch (\Exception $e) {
  62. Log::error($e->getMessage());
  63. }
  64. }
  65. /**
  66. * @param $orders
  67. */
  68. function preOrderCache($orders)
  69. {
  70. foreach ($orders as $order) {
  71. $mod = $order['user_id'] % 512;
  72. $this->_orderCache['data'][$mod][] = $order;
  73. $this->_orderCache['count']++;
  74. }
  75. }
  76. /**
  77. * 将订单插入充值记录表
  78. * @param bool $forceUpdate 强制将变量缓存的信息插入数据库
  79. * @throws \think\Exception
  80. * @throws \think\exception\PDOException
  81. */
  82. function autoOrderRecharge($forceUpdate = false)
  83. {
  84. if ($this->_orderCache['count'] == 0) {
  85. return;
  86. }
  87. if ($this->_orderCache['count'] > 10000 || $forceUpdate) {
  88. foreach ($this->_orderCache['data'] as $mod => $orders) {
  89. $db = $this->dbConnect($mod);
  90. $rechargeIds = [];
  91. foreach ($orders as $order) {
  92. $userId = $order['user_id'];
  93. $type = $order['type'];
  94. $finishtime = $order['finishtime'];
  95. $sql = <<<SQL
  96. select id, createtime from recharge where user_id = $userId and type = '$type' and createtime >= $finishtime order by createtime limit 2;
  97. SQL;
  98. $res = $db->query($sql);
  99. $count = count($res);
  100. if ($count == 0) {
  101. continue;
  102. } elseif ($count == 1) {
  103. $_recharge = current($res);
  104. $rechargeIds[] = $_recharge['id'];
  105. } elseif ($count == 2) {//用一个用户,2个recharge记录相差2s以内算作一个订单的充值
  106. $sub = abs($res[0]['createtime'] - $res[1]['createtime']);
  107. if ($sub < 3) {
  108. $rechargeIds[] = $res[0]['id'];
  109. $rechargeIds[] = $res[1]['id'];
  110. } else {
  111. $rechargeIds[] = $res[0]['id'];
  112. }
  113. }
  114. }
  115. if (!empty($rechargeIds)) {
  116. $strRechargeIds = implode(',', $rechargeIds);
  117. $sql = <<<SQL
  118. update recharge set dd=1 where id in ($strRechargeIds) ;
  119. SQL;
  120. $db->execute($sql);
  121. }
  122. $db->close();
  123. }
  124. Log::info('data_to_db_count:' . $this->_orderCache['count']);
  125. $this->_orderCache['data'] = null;
  126. $this->_orderCache['count'] = 0;
  127. }
  128. }
  129. /**
  130. * 获取数据库连接
  131. * @param $param 编号
  132. * @param $deploy 业务
  133. * @return \think\db\Connection
  134. * @throws \think\Exception
  135. */
  136. private function dbConnect($param, $deploy = 'user')
  137. {
  138. $db_config = $this->get_db_deploy($param, $deploy);
  139. $db = Db::connect($db_config);
  140. return $db;
  141. }
  142. /**
  143. * 获取db分库的配置参数
  144. *
  145. * @param string|int $param 取模值
  146. * @param string $deploy 分库前缀
  147. * @return array
  148. */
  149. function get_db_deploy($param, $deploy = 'user')
  150. {
  151. $db = Config::get('db');
  152. $mod = $param % $db[$deploy . '_num'];
  153. $mod = abs($mod);
  154. $list = explode(';', $db[$deploy . '_list']);
  155. Log::info("分库配置参数:{$param} 标志:{$deploy} 库数量:" . $db[$deploy . '_num'] . " 模:{$mod}");
  156. foreach ($list as $item) {
  157. $con = explode(':', $item); // 0=0-191库编号 1=192.168.1.149主IP 2=3306主端口 3=192.168.1.150从IP 4=3306从端口
  158. if (count($con) >= 3) {
  159. $c = explode('-', $con[0]); //库编号 0开始 1结束
  160. if (count($c) >= 2) {
  161. if ($c[0] <= $mod && $mod <= $c[1]) {
  162. $database = Config::get('database');
  163. if ($database['deploy'] == 1 && count($con) >= 5) { //开启主从 & 带主从配置
  164. $database['hostname'] = $con[1] . ',' . $con[3]; //192.168.1.149,192.168.1.150
  165. $database['hostport'] = $con[2] . ',' . $con[4]; //3306,3306
  166. } else { //只有主库
  167. $database['hostname'] = $con[1];
  168. $database['hostport'] = $con[2];
  169. }
  170. Log::info("分库获取成功 IP:{$database['hostname']} port: {$database['hostport']}");
  171. $database['database'] = str_replace('$mod', $mod, $db[$deploy . '_database']);
  172. return $database;
  173. }
  174. }
  175. }
  176. }
  177. Log::error("分库获取失败!");
  178. return [];
  179. }
  180. }