PostData.php 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783
  1. <?php
  2. /**
  3. * Created by: PhpStorm
  4. * User: lytian
  5. * Date: 2019/10/14
  6. * Time: 11:45
  7. */
  8. namespace app\admin\command;
  9. use app\main\helper\ArrayHelper;
  10. use fast\Http;
  11. use think\Config;
  12. use think\console\Command;
  13. use think\console\Input;
  14. use think\console\input\Option;
  15. use think\console\Output;
  16. use think\Db;
  17. use think\Env;
  18. use think\Exception;
  19. use think\Request;
  20. class PostData extends Command
  21. {
  22. protected $selectBeginTime;
  23. protected $selectEndTime;
  24. protected $books = [];
  25. protected $host = 'http://crawler-api.mokagm.com';
  26. protected $apiList = [
  27. 'order' => '/api/v1/yifengaf/receive/order',
  28. 'user' => '/api/v1/yifengaf/receive/user',
  29. 'recharge' => '/api/v1/yifengaf/receive/recharge',
  30. 'consume' => '/api/v1/yifengaf/receive/consume',
  31. 'read' => '/api/v1/yifengaf/receive/read',
  32. 'check' => '/api/v1/yifengaf/receive/check',
  33. ];
  34. private $userDbArr = [];
  35. private $shardDbArr = [];
  36. public function configure()
  37. {
  38. $this->setName('PostData')
  39. ->addOption('keyword', 'k', Option::VALUE_OPTIONAL, '渠道昵称关键词', '希果')
  40. ->addOption('endtime', 'e', Option::VALUE_OPTIONAL, '结束时间戳', null)
  41. ->setDescription('推送数据-梦嘉');
  42. }
  43. public function execute(Input $input, Output $output)
  44. {
  45. $keyword = $input->getOption('keyword');
  46. if (!$keyword) {
  47. echo "缺少渠道信息关键字";
  48. exit;
  49. }
  50. $endDate = $input->getOption('endtime');
  51. if ($endDate && is_string($endDate)) {
  52. $endTime = strtotime($endDate);
  53. if (!$endTime || !is_int($endTime) || $endTime < strtotime('2019-01-01 00:00:00') || $endTime > strtotime(date('Ymd'))) {
  54. echo '时间戳格式错误或超出范围';
  55. exit;
  56. }
  57. $this->selectEndTime = $endTime;
  58. } else {
  59. $this->selectEndTime = strtotime(date('Ymd'));
  60. }
  61. //@todo 测试地址 上线前删除
  62. /* $this->host = 'http://mp.koread.cn';
  63. $this->apiList = [
  64. 'order' => '/api/receivetest/index',
  65. 'user' => '/api/receivetest/index',
  66. 'recharge' => '/api/receivetest/index',
  67. 'consume' => '/api/receivetest/index',
  68. 'read' => '/api/receivetest/index',
  69. 'check' => '/api/receivetest/index',
  70. ];*/
  71. if (empty($this->host)) {
  72. echo '请设置接口域名';
  73. exit;
  74. }
  75. /**脚本执行开始**/
  76. $beginTime = time();
  77. $lastLog = Db::table('postdata_log')->order("id", 'desc')->limit(1)->find();
  78. if (!$lastLog) {
  79. //首次执行
  80. $this->selectBeginTime = 0;
  81. } else {
  82. if ($lastLog['status'] == 0) {
  83. echo "上次脚本还未执行结束";
  84. exit;
  85. }
  86. $this->selectBeginTime = $lastLog['select_end_time'];
  87. }
  88. //开始执行
  89. $id = Db::table('postdata_log')->insertGetId([
  90. 'begin_time' => $beginTime,
  91. 'select_begin_time' => $this->selectBeginTime,
  92. 'select_end_time' => $this->selectEndTime,
  93. 'status' => 0,
  94. 'createtime' => time(),
  95. 'updatetime' => time(),
  96. ]);
  97. //定位渠道和代理
  98. $mainSlaveDbConfig = $this->getMainSlaveDbConfig();
  99. $adminIdsRes = Db::connect($mainSlaveDbConfig)->query("select admin.id, ac.appid, JSON_EXTRACT(ac.json,'$.authorizer_info.nick_name') as app_nickname from admin left join admin_config ac on ac.admin_id = admin.id where admin.nickname like '%{$keyword}%'");
  100. if (!$adminIdsRes) {
  101. echo "渠道不存在 执行结束" . PHP_EOL;
  102. }
  103. $channelIds = array_column($adminIdsRes, 'id');
  104. $channelTree = ArrayHelper::index($adminIdsRes, 'id');
  105. $channelIdsStr = implode(',', $channelIds);
  106. $agentIdsRes = Db::connect($mainSlaveDbConfig)->query("select admin_id from admin_extend where create_by in ({$channelIdsStr})");
  107. if(!$agentIdsRes) {
  108. $agentIds = [];
  109. } else {
  110. $agentIds = array_column($agentIdsRes, 'admin_id');
  111. }
  112. //先执行订单
  113. $adminIds = array_merge($channelIds, $agentIds);
  114. $totalOrders = $totalUsers = $totalRecharge = $totalConsume = $totalRecent = 0;
  115. $totalSuccessOrders = $totalSuccessUsers = $totalSuccessRecharge = $totalSuccessConsume = $totalSuccessRecent = 0;
  116. $orderLimit = $userLimit = $rechargeLimit = $consumeLimit = $recentLimit = 100;
  117. //发送的数据集合
  118. $postUserData = $postConsumeData = $postRechargeData = $postRecnetData = $postData = [];
  119. $p = true;
  120. while($p) {
  121. //100个渠道一起跑
  122. $adminIdsArr = array_splice($adminIds, 0, 100);
  123. $adminIdsStr = implode(',', $adminIdsArr);
  124. $do = true;
  125. $j = 1;
  126. $orderId = 0;
  127. while ($do) {
  128. $sql = "SELECT id, user_id, out_trade_no as merchant_id, transaction_id, type, money, state, business_line as `from`, createtime as create_time, finishtime as finish_time, book_id, referral_id, referral_id_permanent FROM orders FORCE INDEX(createtime) WHERE id > {$orderId} AND admin_id in({$adminIdsStr}) AND createtime >= {$this->selectBeginTime} AND createtime < {$this->selectEndTime} ORDER BY createtime ASC LIMIT {$orderLimit}";
  129. $date = date("Y-m-d H:i:s");
  130. try {
  131. if (Config::get('polardb.orders_apply_polardb')) {
  132. $rows = Db::connect(Config::get('polardb'))->query($sql);
  133. } else {
  134. $rows = Db::connect($mainSlaveDbConfig)->query($sql);
  135. }
  136. $total = count($rows);
  137. $totalOrders += $total;
  138. if ($total < 1) {
  139. $do = false;
  140. } else {
  141. //开始
  142. $common = [
  143. 'push_time' => time(),
  144. 'user_id' => ''
  145. ];
  146. foreach ($rows as $row) {
  147. try {
  148. $orderId = $row['id'];
  149. $bookId = $row['book_id'];
  150. $referralId = $row['referral_id'] ?: ($row['referral_id_permanent'] ?: 0);
  151. unset($row['id']);
  152. unset($row['book_id']);
  153. unset($row['referral_id']);
  154. unset($row['referral_id_permanent']);
  155. $bookName = $bookTags = $referralUrl = '';
  156. if ($bookId) {
  157. list($bookName, $bookTags) = $this->getBook($bookId);
  158. }
  159. if ($referralId) {
  160. $referralUrl = '/t/'.$referralId;
  161. }
  162. $item = array_merge($common,$row, ['book_name' => $bookName, 'book_tags' => $bookTags, 'referral_url' => $referralUrl]);
  163. //转换一下数据类型为string
  164. $this->toString($item);
  165. $postData[] = $item;
  166. unset($item);
  167. } catch (Exception $e) {
  168. echo "订单记录异常:用户ID:{$row['user_id']} --记录ID:{$orderId} --时间:{$date}". $e->getMessage() .PHP_EOL;
  169. }
  170. }
  171. if (count($postData) >= $orderLimit) {
  172. if ($this->doPost($this->apiList['order'], $postData)) {
  173. $totalSuccessOrders += count($postData);
  174. }
  175. $postData = [];
  176. }
  177. if ($total < $orderLimit) {
  178. $do = false;
  179. }
  180. }
  181. unset($rows);
  182. } catch (Exception $e) {
  183. echo "用户充值记录异常:SQL:{$sql} --时间:{$date}". $e->getMessage() .PHP_EOL;
  184. }
  185. $j++;
  186. }
  187. if (empty($adminIds)) {
  188. $p = false;
  189. }
  190. }
  191. //剩余的一次发送完
  192. if ($postData) {
  193. if ($this->doPost($this->apiList['order'], $postData)) {
  194. $totalSuccessOrders += count($postData);
  195. }
  196. unset($postData);
  197. }
  198. unset($adminIds);
  199. unset($agentIds);
  200. $continue = true;
  201. while($continue) {
  202. //100个渠道一起跑
  203. $channelIdArr = array_splice($channelIds, 0, 100);
  204. $channelIdStr = implode(',', $channelIdArr);
  205. //用户 1 至 512
  206. $userDbcount = Config::get('db.user_num');
  207. for($i = 1; $i<=$userDbcount; $i++) {
  208. $userDbConfig = $this->getDbDeploy($i, 'user');
  209. $userTable = $userDbConfig['table'].'.user';
  210. unset($userDbConfig['table']);
  211. $do = true;
  212. $userId = 0;
  213. while ($do) {
  214. $sql = "SELECT id as user_id, nickname, openid, mobile, province, city, sex, is_subscribe, operate_time as operatetime, createtime as registertime, channel_id FROM {$userTable} FORCE INDEX(user_updatetime) WHERE channel_id in($channelIdStr) AND id > {$userId} and updatetime >= {$this->selectBeginTime} AND updatetime < {$this->selectEndTime} ORDER BY id ASC LIMIT {$userLimit}";
  215. $date = date("Y-m-d H:i:s");
  216. try {
  217. $rows = Db::connect($userDbConfig)->query($sql);
  218. $total = count($rows);
  219. $totalUsers += $total;
  220. if ($total < 1) {
  221. $do = false;
  222. } else {
  223. $common = [
  224. 'push_time' => time(),
  225. 'user_id' => ''
  226. ];
  227. $userIdArr = array_column($rows, 'user_id');
  228. foreach ($rows as $row) {
  229. try {
  230. $userId = $row['user_id'];
  231. $userChannelId = $row['channel_id'];
  232. unset($row['channel_id']);
  233. $item = array_merge($common, $row, [
  234. 'authorizer_nickname' => str_replace(['"'], [''], $channelTree[$userChannelId]['app_nickname']),
  235. 'authorizer_appid' => $channelTree[$userChannelId]['appid']
  236. ]);
  237. $this->toString($item);
  238. $postUserData[] = $item;
  239. unset($item);
  240. } catch (Exception $e) {
  241. echo "用户记录异常:用户ID:{$row['user_id']} --记录ID:{$row['user_id']} --时间:{$date}". $e->getMessage() .PHP_EOL;
  242. }
  243. }
  244. if (count($postUserData) >= $userLimit) {
  245. if ($this->doPost($this->apiList['user'], $postUserData)) {
  246. $totalSuccessUsers += count($postUserData);
  247. };
  248. //充值记录
  249. $this->selectRecharge($postUserData,$totalRecharge, $totalSuccessRecharge, $rechargeLimit, $postRechargeData);
  250. //消费记录
  251. $this->selectConsume($postUserData, $totalConsume, $totalSuccessConsume, $consumeLimit, $postConsumeData);
  252. //阅读记录
  253. $this->selectRecent($postUserData, $totalRecent, $totalSuccessRecent, $recentLimit, $postRecnetData);
  254. $postUserData = [];
  255. }
  256. if ($total < $userLimit) {
  257. $do = false;
  258. }
  259. }
  260. unset($rows);
  261. } catch (Exception $e) {
  262. echo "用户记录异常:SQL:{$sql} --时间:{$date}". $e->getMessage() .PHP_EOL;
  263. }
  264. }
  265. }
  266. if (empty($channelIds)) {
  267. $continue = false;
  268. }
  269. }
  270. if ($postUserData) {
  271. //剩余发送一下
  272. if ($this->doPost($this->apiList['user'], $postUserData)) {
  273. $totalSuccessUsers += count($postUserData);
  274. };
  275. //充值记录
  276. $this->selectRecharge($postUserData,$totalRecharge, $totalSuccessRecharge, $rechargeLimit, $postRechargeData);
  277. //消费记录
  278. $this->selectConsume($postUserData, $totalConsume, $totalSuccessConsume, $consumeLimit, $postConsumeData);
  279. //阅读记录
  280. $this->selectRecent($postUserData, $totalRecent, $totalSuccessRecent, $recentLimit, $postRecnetData);
  281. unset($postUserData);
  282. }
  283. if ($postRechargeData) {
  284. if ($this->doPost($this->apiList['recharge'], $postRechargeData)) {
  285. $totalSuccessRecharge += count($postRechargeData);
  286. }
  287. unset($postRechargeData);
  288. }
  289. if ($postConsumeData) {
  290. if ($this->doPost($this->apiList['consume'], $postConsumeData)) {
  291. $totalSuccessConsume += count($postConsumeData);
  292. }
  293. unset($postConsumeData);
  294. }
  295. if ($postRecnetData) {
  296. if ($this->doPost($this->apiList['read'], $postRecnetData)) {
  297. $totalSuccessRecent += count($postRecnetData);
  298. }
  299. unset($postRecnetData);
  300. }
  301. //发送成功的结果
  302. if ($totalSuccessOrders || $totalSuccessUsers || $totalSuccessRecharge || $totalSuccessConsume || $totalSuccessRecent) {
  303. $postCheckData = [
  304. 'user_info_num' => $totalSuccessUsers,
  305. 'consume_info_num' => $totalSuccessConsume,
  306. 'order_info_num' => $totalSuccessOrders,
  307. 'read_info_num' => $totalSuccessRecent,
  308. 'recharge_info_num' => $totalSuccessRecharge,
  309. 'push_start_time' => $beginTime,
  310. 'push_end_time' => time(),
  311. ];
  312. $this->toString($postCheckData);
  313. $this->doPost($this->apiList['check'], $postCheckData);
  314. }
  315. unset($channelTree);
  316. unset($channelIds);
  317. /**脚本执行结束**/
  318. $endTime = time();
  319. Db::table('postdata_log')->where('id', 'eq', $id)->update([
  320. 'end_time' => $endTime,
  321. 'status' => 1,
  322. 'updatetime' => time(),
  323. 'user_count' => $totalUsers,
  324. 'order_count' => $totalOrders,
  325. 'recharge_count' => $totalRecharge,
  326. 'consume_count' => $totalConsume,
  327. 'recent_count' => $totalRecent,
  328. 'success_user_count' => $totalSuccessUsers,
  329. 'success_order_count' => $totalSuccessOrders,
  330. 'success_recharge_count' => $totalSuccessRecharge,
  331. 'success_consume_count' => $totalSuccessConsume,
  332. 'success_recent_count' => $totalSuccessRecent,
  333. ]);
  334. $output->write("任务 结束");
  335. }
  336. /**
  337. * 充值记录
  338. */
  339. private function selectRecharge($postUserData, &$totalRecharge, &$totalSuccessRecharge, $rechargeLimit = 100, &$postRechargeData)
  340. {
  341. $userIds = array_column($postUserData,'user_id');
  342. if ($userIds) {
  343. //先取模分组吧 减少查询
  344. $userGroup = [];
  345. $db = Config::get('db');
  346. foreach ($userIds as $userId) {
  347. $mod = $userId % $db['user_num'];
  348. $mod = abs($mod);
  349. $userGroup[$mod][] = $userId;
  350. }
  351. foreach ($userGroup as $dbId=>$userIdArr) {
  352. if (!isset($this->userDbArr[$dbId])) {
  353. $userDbConfig = $this->getDbDeploy($dbId, 'user');
  354. $this->userDbArr[$dbId] = $userDbConfig;
  355. } else {
  356. $userDbConfig = $this->userDbArr[$dbId];
  357. }
  358. $rechargeTable = $userDbConfig['table'].'.recharge';
  359. unset($userDbConfig['table']);
  360. $rechargeId = 0;
  361. $common = [
  362. 'push_time' => time(),
  363. 'user_id' => ''
  364. ];
  365. $userIdStr = implode(',', $userIdArr);
  366. $do = true;
  367. while($do) {
  368. $rechargeSql = "SELECT id as recharge_id, user_id, type, kandian as shubi, free_kandian as free_shubi, free_endtime, day, hour, createtime as create_time, vip_starttime FROM {$rechargeTable} FORCE INDEX(createtime) WHERE id > {$rechargeId} AND user_id in({$userIdStr}) AND createtime >= {$this->selectBeginTime} AND createtime < {$this->selectEndTime} ORDER BY id asc limit {$rechargeLimit}";
  369. $date = date("Y-m-d H:i:s");
  370. try {
  371. $rechargeRows = Db::connect($userDbConfig)->query($rechargeSql);
  372. if ($rechargeRows) {
  373. $totalRecharge += count($rechargeRows);
  374. foreach ($rechargeRows as $rechargeRow) {
  375. try {
  376. $rechargeId = $rechargeRow['recharge_id'];
  377. $vipEndtime = '';
  378. if ($rechargeRow['day'] || $rechargeRow['hour']) {
  379. $day = $rechargeRow['day'];
  380. $hour = $rechargeRow['hour'];
  381. $vipEndtime = $rechargeRow['vip_starttime'] + 86400 * $day + $hour * 3600;
  382. }
  383. $item = array_merge($common, $rechargeRow, ['vip_endtime' => $vipEndtime]);
  384. $this->toString($item);
  385. $postRechargeData[] = $item;
  386. unset($item);
  387. } catch (Exception $e) {
  388. echo "用户充值记录异常:用户ID:{$rechargeRow['user_id']} --记录ID:{$rechargeId} --时间:{$date}" . $e->getMessage() .PHP_EOL;
  389. }
  390. }
  391. if (count($postRechargeData) >= $rechargeLimit) {
  392. if ($this->doPost($this->apiList['recharge'], $postRechargeData)) {
  393. $totalSuccessRecharge += count($postRechargeData);
  394. }
  395. $postRechargeData = [];
  396. }
  397. if (count($rechargeRows) < $rechargeLimit) {
  398. $do = false;
  399. }
  400. } else {
  401. $do = false;
  402. }
  403. unset($rechargeRows);
  404. } catch (Exception $e) {
  405. echo "用户充值记录异常:SQL:{$rechargeSql} --时间:{$date}" .$e->getMessage() .PHP_EOL;
  406. }
  407. }
  408. }
  409. }
  410. }
  411. /**
  412. * 消费记录
  413. */
  414. private function selectConsume($postUserData, &$totalConsume, &$totalSuccessConsume, $consumeLimit = 100, &$postConsumeData)
  415. {
  416. $userIds = array_column($postUserData,'user_id');
  417. if ($userIds) {
  418. //先取模分组吧 减少查询
  419. $userGroup = [];
  420. $db = Config::get('db');
  421. foreach ($userIds as $userId) {
  422. $mod = $userId % $db['shard_num'];
  423. $mod = abs($mod);
  424. $userGroup[$mod][] = $userId;
  425. }
  426. foreach ($userGroup as $dbId=>$userIdArr) {
  427. if (!isset($this->shardDbArr[$dbId])) {
  428. $shardDbConfig = $this->getDbDeploy($dbId, 'shard');
  429. $this->shardDbArr[$dbId] = $shardDbConfig;
  430. } else {
  431. $shardDbConfig = $this->shardDbArr[$dbId];
  432. }
  433. $consumeTable = $shardDbConfig['table'].'.consume';
  434. unset($shardDbConfig['table']);
  435. $consumeId = 0;
  436. $common = [
  437. 'push_time' => time(),
  438. 'user_id' => ''
  439. ];
  440. $userIdStr = implode(',', $userIdArr);
  441. $do = true;
  442. while($do) {
  443. $consumeSql = "SELECT id, user_id, createtime as consume_time, book_id, book_name, chapter_id, chapter_name, kandian as shubi, free_kandian as free_shubi FROM {$consumeTable} FORCE INDEX(consume_createtime) WHERE id > {$consumeId} AND user_id in ({$userIdStr}) AND createtime >= {$this->selectBeginTime} AND createtime < {$this->selectEndTime} ORDER BY id asc LIMIT 500";
  444. $date = date("Y-m-d H:i:s");
  445. try {
  446. $consumeRows = Db::connect($shardDbConfig)->query($consumeSql);
  447. if ($consumeRows) {
  448. $totalConsume += count($consumeRows);
  449. foreach ($consumeRows as $consumeRow) {
  450. try {
  451. $consumeId = $consumeRow['id'];
  452. unset($consumeRow['id']);
  453. $item = array_merge($common, $consumeRow);
  454. $this->toString($item);
  455. $postConsumeData[] = $item;
  456. unset($item);
  457. } catch (Exception $e) {
  458. echo "用户消费记录异常:用户ID:{$consumeRow['user_id']} --记录ID:{$consumeId} --时间:{$date}". $e->getMessage() .PHP_EOL;
  459. }
  460. }
  461. if (count($postConsumeData) >= $consumeLimit) {
  462. if ($this->doPost($this->apiList['consume'], $postConsumeData)) {
  463. $totalSuccessConsume += count($postConsumeData);
  464. }
  465. $postConsumeData = [];
  466. }
  467. if (count($consumeRows) < $consumeLimit) {
  468. $do = false;
  469. }
  470. } else {
  471. $do = false;
  472. }
  473. unset($consumeRows);
  474. } catch (Exception $e) {
  475. echo "用户消费记录异常:SQL:{$consumeSql} --时间:{$date} error:" . $e->getMessage() .PHP_EOL;
  476. }
  477. }
  478. }
  479. }
  480. }
  481. /**
  482. * 阅读记录
  483. */
  484. private function selectRecent($postUserData, &$totalRecent, &$totalSuccessRecent, $recentLimit = 100, &$postRecnetData)
  485. {
  486. $userIds = array_column($postUserData,'user_id');
  487. if ($userIds) {
  488. //先取模分组吧 减少查询
  489. $userGroup = [];
  490. $db = Config::get('db');
  491. foreach ($userIds as $userId) {
  492. $mod = $userId % $db['shard_num'];
  493. $mod = abs($mod);
  494. $userGroup[$mod][] = $userId;
  495. }
  496. foreach ($userGroup as $dbId=>$userIdArr) {
  497. if (!isset($this->shardDbArr[$dbId])) {
  498. $shardDbConfig = $this->getDbDeploy($dbId, 'shard');
  499. $this->shardDbArr[$dbId] = $shardDbConfig;
  500. } else {
  501. $shardDbConfig = $this->shardDbArr[$dbId];
  502. }
  503. $recentTable = $shardDbConfig['table'].'.user_recently_read';
  504. unset($shardDbConfig['table']);
  505. $recentId = 0;
  506. $common = [
  507. 'push_time' => time(),
  508. 'user_id' => ''
  509. ];
  510. $userIdStr = implode(',', $userIdArr);
  511. $do = true;
  512. while($do) {
  513. $recentSql = "SELECT id, user_id, book_id, chapter_id, chapter_name, flag, updatetime as update_time FROM {$recentTable} FORCE INDEX(sign_updatetime) WHERE id > {$recentId} AND user_id in ({$userIdStr}) AND updatetime >= {$this->selectBeginTime} AND updatetime < {$this->selectEndTime} ORDER BY id asc LIMIT 500";
  514. $date = date("Y-m-d H:i:s");
  515. try {
  516. $recentRows = Db::connect($shardDbConfig)->query($recentSql);
  517. if ($recentRows) {
  518. $totalRecent += count($recentRows);
  519. foreach ($recentRows as $recentRow) {
  520. try {
  521. $recentId = $recentRow['id'];
  522. unset($recentRow['id']);
  523. list($bookName, $bookTags) = $this->getBook($recentRow['book_id']);
  524. unset($bookTags);
  525. $item = array_merge($common, $recentRow, ['book_name' => $bookName]);
  526. unset($bookName);
  527. $this->toString($item);
  528. $postRecnetData[] = $item;
  529. unset($item);
  530. } catch (Exception $e) {
  531. echo "用户阅读记录异常:用户ID:{$recentRow['user_id']} --记录ID:{$recentId} --时间:{$date}". $e->getMessage() .PHP_EOL;
  532. }
  533. }
  534. if (count($postRecnetData) >= $recentLimit) {
  535. if ($this->doPost($this->apiList['read'], $postRecnetData)) {
  536. $totalSuccessRecent += count($postRecnetData);
  537. }
  538. $postRecnetData = [];
  539. }
  540. if (count($recentRows) < $recentLimit) {
  541. $do = false;
  542. }
  543. } else {
  544. $do = false;
  545. }
  546. unset($recentRows);
  547. } catch (Exception $e) {
  548. echo "用户用户记录异常:SQL:{$recentSql} --时间:{$date}". $e->getMessage() .PHP_EOL;
  549. }
  550. }
  551. }
  552. }
  553. }
  554. /**
  555. * 发送数据
  556. * @param $url
  557. * @param $postData
  558. * @param int $times
  559. * @return bool
  560. */
  561. private function doPost($url, $postData, $times = 1)
  562. {
  563. $sleep = 60*1000000/120;
  564. usleep($sleep);
  565. if ($times >=3) {
  566. echo "重试次数过多 直接跳过" .PHP_EOL;
  567. return false;
  568. }
  569. $times++;
  570. $token = 'hzQhFpVrCXRvZfXl';
  571. $strs="QWERTYUIOPASDFGHJKLZXCVBNM1234567890qwertyuiopasdfghjklzxcvbnm";
  572. $nonce=substr(str_shuffle($strs),mt_rand(0,strlen($strs)-11),6);
  573. $query = [
  574. 'nonce' => $nonce, // 正式环境使用随机生成
  575. 'timestamp' => time(),
  576. 'client_id' => 1005,
  577. ];
  578. $query['signature'] = sha1($token.$query['timestamp'].$query['client_id'].$query['nonce']);
  579. $http = new Http();
  580. $ret = $http->post($this->host.$url.'?'.http_build_query($query),json_encode($postData, JSON_UNESCAPED_SLASHES));
  581. //$dataJson = json_encode($postData, 256);
  582. if ($ret) {
  583. $res = json_decode($ret, true);
  584. if ($res['code'] != 200) {
  585. //失败 休息一下重试一次
  586. sleep(2);
  587. echo "进行重试 访问次数:{$times}".PHP_EOL;
  588. $this->doPost($url, $postData, $times);
  589. }
  590. //echo "推送结果:{$ret} 推送数据:{$dataJson}".PHP_EOL;
  591. echo "推送结果:{$ret} , URL:{$url}".PHP_EOL;
  592. } else {
  593. echo "进行重试 访问次数:{$times} 失败返回结果:{$ret}".PHP_EOL;
  594. $this->doPost($url, $postData, $times);
  595. }
  596. return true;
  597. }
  598. /**
  599. * 转换数据
  600. * @param $data
  601. */
  602. private function toString(&$data)
  603. {
  604. if (is_array($data)) {
  605. array_walk($data, function (&$val, $key) {$val = strval($val);});
  606. }
  607. }
  608. /**
  609. * 从库配置
  610. * @param $param
  611. * @param string $deploy
  612. * @return array|mixed
  613. */
  614. private function getDbDeploy($param, $deploy = 'shard')
  615. {
  616. $db = Config::get('db');
  617. $mod = $param % $db[$deploy . '_num'];
  618. $mod = abs($mod);
  619. $list = explode(';', $db[$deploy . '_list']);
  620. foreach ($list as $item) {
  621. $con = explode(':', $item); // 0=0-191库编号 1=192.168.1.149主IP 2=3306主端口 3=192.168.1.150从IP 4=3306从端口
  622. if (count($con) >= 3) {
  623. $c = explode('-', $con[0]); //库编号 0开始 1结束
  624. if (count($c) >= 2) {
  625. if ($c[0] <= $mod && $mod <= $c[1]) {
  626. $database = Config::get('database');
  627. if (count($con) >= 5) { //开启主从 & 带主从配置
  628. $database['deploy'] = 1;
  629. $database['rw_separate'] = true;
  630. $database['hostname'] = $con[1] . ',' . $con[3]; //192.168.1.149,192.168.1.150
  631. $database['hostport'] = $con[2] . ',' . $con[4]; //3306,3306
  632. } else { //只有主库
  633. $database['hostname'] = $con[1];
  634. $database['hostport'] = $con[2];
  635. }
  636. $database['database'] = 'mysql';
  637. $database['table'] = str_replace('$mod', $mod, $db[$deploy . '_database']);
  638. return $database;
  639. }
  640. }
  641. }
  642. }
  643. return [];
  644. }
  645. /**
  646. * 返回书籍
  647. * @param $bookId
  648. * @return mixed
  649. * @throws Exception
  650. * @throws \think\db\exception\BindParamException
  651. * @throws \think\exception\PDOException
  652. */
  653. private function getBook($bookId)
  654. {
  655. if (!isset($this->books[$bookId])) {
  656. //查询
  657. $mainSlaveDbConfig = $this->getMainSlaveDbConfig();
  658. $bookSql = "select book.name, bc.name as category_name from book left join book_category as bc on bc.id = book.book_category_id where book.id = {$bookId}";
  659. $bookRow = Db::connect($mainSlaveDbConfig)->query($bookSql);
  660. $this->books[$bookId] = array_values($bookRow[0]);
  661. }
  662. return $this->books[$bookId];
  663. }
  664. /**
  665. * 获取主库的从库配置 从库不存在返回主库
  666. * @return array
  667. */
  668. private function getMainSlaveDbConfig()
  669. {
  670. $hostArr = explode(',', Env::get('database.admin_hostname'));
  671. $portArr = explode(',', Env::get('database.admin_hostport', '3306,3306'));
  672. //默认主库
  673. $mainDbConfig = array_merge(Config::get("database"), ['hostname' =>$hostArr[0], 'hostport' => $portArr[0]]);
  674. if (count($hostArr) >= 2) {
  675. //从库
  676. $mainDbConfig = array_merge(Config::get("database"), ['hostname' =>$hostArr[1], 'hostport' => $portArr[1]]);
  677. }
  678. return $mainDbConfig;
  679. }
  680. }