'/api/v1/yifengaf/receive/order', 'user' => '/api/v1/yifengaf/receive/user', 'recharge' => '/api/v1/yifengaf/receive/recharge', 'consume' => '/api/v1/yifengaf/receive/consume', 'read' => '/api/v1/yifengaf/receive/read', 'check' => '/api/v1/yifengaf/receive/check', ]; private $userDbArr = []; private $shardDbArr = []; public function configure() { $this->setName('PostData') ->addOption('keyword', 'k', Option::VALUE_OPTIONAL, '渠道昵称关键词', '希果') ->addOption('endtime', 'e', Option::VALUE_OPTIONAL, '结束时间戳', null) ->setDescription('推送数据-梦嘉'); } public function execute(Input $input, Output $output) { $keyword = $input->getOption('keyword'); if (!$keyword) { echo "缺少渠道信息关键字"; exit; } $endDate = $input->getOption('endtime'); if ($endDate && is_string($endDate)) { $endTime = strtotime($endDate); if (!$endTime || !is_int($endTime) || $endTime < strtotime('2019-01-01 00:00:00') || $endTime > strtotime(date('Ymd'))) { echo '时间戳格式错误或超出范围'; exit; } $this->selectEndTime = $endTime; } else { $this->selectEndTime = strtotime(date('Ymd')); } //@todo 测试地址 上线前删除 /* $this->host = 'http://mp.koread.cn'; $this->apiList = [ 'order' => '/api/receivetest/index', 'user' => '/api/receivetest/index', 'recharge' => '/api/receivetest/index', 'consume' => '/api/receivetest/index', 'read' => '/api/receivetest/index', 'check' => '/api/receivetest/index', ];*/ if (empty($this->host)) { echo '请设置接口域名'; exit; } /**脚本执行开始**/ $beginTime = time(); $lastLog = Db::table('postdata_log')->order("id", 'desc')->limit(1)->find(); if (!$lastLog) { //首次执行 $this->selectBeginTime = 0; } else { if ($lastLog['status'] == 0) { echo "上次脚本还未执行结束"; exit; } $this->selectBeginTime = $lastLog['select_end_time']; } //开始执行 $id = Db::table('postdata_log')->insertGetId([ 'begin_time' => $beginTime, 'select_begin_time' => $this->selectBeginTime, 'select_end_time' => $this->selectEndTime, 'status' => 0, 'createtime' => time(), 'updatetime' => time(), ]); //定位渠道和代理 $mainSlaveDbConfig = $this->getMainSlaveDbConfig(); $adminIdsRes = Db::connect($mainSlaveDbConfig)->query("select admin.id, ac.appid, JSON_EXTRACT(ac.json,'$.authorizer_info.nick_name') as app_nickname from admin left join admin_config ac on ac.admin_id = admin.id where admin.nickname like '%{$keyword}%'"); if (!$adminIdsRes) { echo "渠道不存在 执行结束" . PHP_EOL; } $channelIds = array_column($adminIdsRes, 'id'); $channelTree = ArrayHelper::index($adminIdsRes, 'id'); $channelIdsStr = implode(',', $channelIds); $agentIdsRes = Db::connect($mainSlaveDbConfig)->query("select admin_id from admin_extend where create_by in ({$channelIdsStr})"); if(!$agentIdsRes) { $agentIds = []; } else { $agentIds = array_column($agentIdsRes, 'admin_id'); } //先执行订单 $adminIds = array_merge($channelIds, $agentIds); $totalOrders = $totalUsers = $totalRecharge = $totalConsume = $totalRecent = 0; $totalSuccessOrders = $totalSuccessUsers = $totalSuccessRecharge = $totalSuccessConsume = $totalSuccessRecent = 0; $orderLimit = $userLimit = $rechargeLimit = $consumeLimit = $recentLimit = 100; //发送的数据集合 $postUserData = $postConsumeData = $postRechargeData = $postRecnetData = $postData = []; $p = true; while($p) { //100个渠道一起跑 $adminIdsArr = array_splice($adminIds, 0, 100); $adminIdsStr = implode(',', $adminIdsArr); $do = true; $j = 1; $orderId = 0; while ($do) { $sql = "SELECT id, user_id, out_trade_no as merchant_id, transaction_id, type, money, state, business_line as `from`, createtime as create_time, finishtime as finish_time, book_id, referral_id, referral_id_permanent FROM orders FORCE INDEX(createtime) WHERE id > {$orderId} AND admin_id in({$adminIdsStr}) AND createtime >= {$this->selectBeginTime} AND createtime < {$this->selectEndTime} ORDER BY createtime ASC LIMIT {$orderLimit}"; $date = date("Y-m-d H:i:s"); try { if (Config::get('polardb.orders_apply_polardb')) { $rows = Db::connect(Config::get('polardb'))->query($sql); } else { $rows = Db::connect($mainSlaveDbConfig)->query($sql); } $total = count($rows); $totalOrders += $total; if ($total < 1) { $do = false; } else { //开始 $common = [ 'push_time' => time(), 'user_id' => '' ]; foreach ($rows as $row) { try { $orderId = $row['id']; $bookId = $row['book_id']; $referralId = $row['referral_id'] ?: ($row['referral_id_permanent'] ?: 0); unset($row['id']); unset($row['book_id']); unset($row['referral_id']); unset($row['referral_id_permanent']); $bookName = $bookTags = $referralUrl = ''; if ($bookId) { list($bookName, $bookTags) = $this->getBook($bookId); } if ($referralId) { $referralUrl = '/t/'.$referralId; } $item = array_merge($common,$row, ['book_name' => $bookName, 'book_tags' => $bookTags, 'referral_url' => $referralUrl]); //转换一下数据类型为string $this->toString($item); $postData[] = $item; unset($item); } catch (Exception $e) { echo "订单记录异常:用户ID:{$row['user_id']} --记录ID:{$orderId} --时间:{$date}". $e->getMessage() .PHP_EOL; } } if (count($postData) >= $orderLimit) { if ($this->doPost($this->apiList['order'], $postData)) { $totalSuccessOrders += count($postData); } $postData = []; } if ($total < $orderLimit) { $do = false; } } unset($rows); } catch (Exception $e) { echo "用户充值记录异常:SQL:{$sql} --时间:{$date}". $e->getMessage() .PHP_EOL; } $j++; } if (empty($adminIds)) { $p = false; } } //剩余的一次发送完 if ($postData) { if ($this->doPost($this->apiList['order'], $postData)) { $totalSuccessOrders += count($postData); } unset($postData); } unset($adminIds); unset($agentIds); $continue = true; while($continue) { //100个渠道一起跑 $channelIdArr = array_splice($channelIds, 0, 100); $channelIdStr = implode(',', $channelIdArr); //用户 1 至 512 $userDbcount = Config::get('db.user_num'); for($i = 1; $i<=$userDbcount; $i++) { $userDbConfig = $this->getDbDeploy($i, 'user'); $userTable = $userDbConfig['table'].'.user'; unset($userDbConfig['table']); $do = true; $userId = 0; while ($do) { $sql = "SELECT id as user_id, nickname, openid, mobile, province, city, sex, is_subscribe, operate_time as operatetime, createtime as registertime, channel_id FROM {$userTable} FORCE INDEX(user_updatetime) WHERE channel_id in($channelIdStr) AND id > {$userId} and updatetime >= {$this->selectBeginTime} AND updatetime < {$this->selectEndTime} ORDER BY id ASC LIMIT {$userLimit}"; $date = date("Y-m-d H:i:s"); try { $rows = Db::connect($userDbConfig)->query($sql); $total = count($rows); $totalUsers += $total; if ($total < 1) { $do = false; } else { $common = [ 'push_time' => time(), 'user_id' => '' ]; $userIdArr = array_column($rows, 'user_id'); foreach ($rows as $row) { try { $userId = $row['user_id']; $userChannelId = $row['channel_id']; unset($row['channel_id']); $item = array_merge($common, $row, [ 'authorizer_nickname' => str_replace(['"'], [''], $channelTree[$userChannelId]['app_nickname']), 'authorizer_appid' => $channelTree[$userChannelId]['appid'] ]); $this->toString($item); $postUserData[] = $item; unset($item); } catch (Exception $e) { echo "用户记录异常:用户ID:{$row['user_id']} --记录ID:{$row['user_id']} --时间:{$date}". $e->getMessage() .PHP_EOL; } } if (count($postUserData) >= $userLimit) { if ($this->doPost($this->apiList['user'], $postUserData)) { $totalSuccessUsers += count($postUserData); }; //充值记录 $this->selectRecharge($postUserData,$totalRecharge, $totalSuccessRecharge, $rechargeLimit, $postRechargeData); //消费记录 $this->selectConsume($postUserData, $totalConsume, $totalSuccessConsume, $consumeLimit, $postConsumeData); //阅读记录 $this->selectRecent($postUserData, $totalRecent, $totalSuccessRecent, $recentLimit, $postRecnetData); $postUserData = []; } if ($total < $userLimit) { $do = false; } } unset($rows); } catch (Exception $e) { echo "用户记录异常:SQL:{$sql} --时间:{$date}". $e->getMessage() .PHP_EOL; } } } if (empty($channelIds)) { $continue = false; } } if ($postUserData) { //剩余发送一下 if ($this->doPost($this->apiList['user'], $postUserData)) { $totalSuccessUsers += count($postUserData); }; //充值记录 $this->selectRecharge($postUserData,$totalRecharge, $totalSuccessRecharge, $rechargeLimit, $postRechargeData); //消费记录 $this->selectConsume($postUserData, $totalConsume, $totalSuccessConsume, $consumeLimit, $postConsumeData); //阅读记录 $this->selectRecent($postUserData, $totalRecent, $totalSuccessRecent, $recentLimit, $postRecnetData); unset($postUserData); } if ($postRechargeData) { if ($this->doPost($this->apiList['recharge'], $postRechargeData)) { $totalSuccessRecharge += count($postRechargeData); } unset($postRechargeData); } if ($postConsumeData) { if ($this->doPost($this->apiList['consume'], $postConsumeData)) { $totalSuccessConsume += count($postConsumeData); } unset($postConsumeData); } if ($postRecnetData) { if ($this->doPost($this->apiList['read'], $postRecnetData)) { $totalSuccessRecent += count($postRecnetData); } unset($postRecnetData); } //发送成功的结果 if ($totalSuccessOrders || $totalSuccessUsers || $totalSuccessRecharge || $totalSuccessConsume || $totalSuccessRecent) { $postCheckData = [ 'user_info_num' => $totalSuccessUsers, 'consume_info_num' => $totalSuccessConsume, 'order_info_num' => $totalSuccessOrders, 'read_info_num' => $totalSuccessRecent, 'recharge_info_num' => $totalSuccessRecharge, 'push_start_time' => $beginTime, 'push_end_time' => time(), ]; $this->toString($postCheckData); $this->doPost($this->apiList['check'], $postCheckData); } unset($channelTree); unset($channelIds); /**脚本执行结束**/ $endTime = time(); Db::table('postdata_log')->where('id', 'eq', $id)->update([ 'end_time' => $endTime, 'status' => 1, 'updatetime' => time(), 'user_count' => $totalUsers, 'order_count' => $totalOrders, 'recharge_count' => $totalRecharge, 'consume_count' => $totalConsume, 'recent_count' => $totalRecent, 'success_user_count' => $totalSuccessUsers, 'success_order_count' => $totalSuccessOrders, 'success_recharge_count' => $totalSuccessRecharge, 'success_consume_count' => $totalSuccessConsume, 'success_recent_count' => $totalSuccessRecent, ]); $output->write("任务 结束"); } /** * 充值记录 */ private function selectRecharge($postUserData, &$totalRecharge, &$totalSuccessRecharge, $rechargeLimit = 100, &$postRechargeData) { $userIds = array_column($postUserData,'user_id'); if ($userIds) { //先取模分组吧 减少查询 $userGroup = []; $db = Config::get('db'); foreach ($userIds as $userId) { $mod = $userId % $db['user_num']; $mod = abs($mod); $userGroup[$mod][] = $userId; } foreach ($userGroup as $dbId=>$userIdArr) { if (!isset($this->userDbArr[$dbId])) { $userDbConfig = $this->getDbDeploy($dbId, 'user'); $this->userDbArr[$dbId] = $userDbConfig; } else { $userDbConfig = $this->userDbArr[$dbId]; } $rechargeTable = $userDbConfig['table'].'.recharge'; unset($userDbConfig['table']); $rechargeId = 0; $common = [ 'push_time' => time(), 'user_id' => '' ]; $userIdStr = implode(',', $userIdArr); $do = true; while($do) { $rechargeSql = "SELECT id as recharge_id, user_id, type, kandian as shubi, free_kandian as free_shubi, free_endtime, day, hour, createtime as create_time, vip_starttime FROM {$rechargeTable} FORCE INDEX(createtime) WHERE id > {$rechargeId} AND user_id in({$userIdStr}) AND createtime >= {$this->selectBeginTime} AND createtime < {$this->selectEndTime} ORDER BY id asc limit {$rechargeLimit}"; $date = date("Y-m-d H:i:s"); try { $rechargeRows = Db::connect($userDbConfig)->query($rechargeSql); if ($rechargeRows) { $totalRecharge += count($rechargeRows); foreach ($rechargeRows as $rechargeRow) { try { $rechargeId = $rechargeRow['recharge_id']; $vipEndtime = ''; if ($rechargeRow['day'] || $rechargeRow['hour']) { $day = $rechargeRow['day']; $hour = $rechargeRow['hour']; $vipEndtime = $rechargeRow['vip_starttime'] + 86400 * $day + $hour * 3600; } $item = array_merge($common, $rechargeRow, ['vip_endtime' => $vipEndtime]); $this->toString($item); $postRechargeData[] = $item; unset($item); } catch (Exception $e) { echo "用户充值记录异常:用户ID:{$rechargeRow['user_id']} --记录ID:{$rechargeId} --时间:{$date}" . $e->getMessage() .PHP_EOL; } } if (count($postRechargeData) >= $rechargeLimit) { if ($this->doPost($this->apiList['recharge'], $postRechargeData)) { $totalSuccessRecharge += count($postRechargeData); } $postRechargeData = []; } if (count($rechargeRows) < $rechargeLimit) { $do = false; } } else { $do = false; } unset($rechargeRows); } catch (Exception $e) { echo "用户充值记录异常:SQL:{$rechargeSql} --时间:{$date}" .$e->getMessage() .PHP_EOL; } } } } } /** * 消费记录 */ private function selectConsume($postUserData, &$totalConsume, &$totalSuccessConsume, $consumeLimit = 100, &$postConsumeData) { $userIds = array_column($postUserData,'user_id'); if ($userIds) { //先取模分组吧 减少查询 $userGroup = []; $db = Config::get('db'); foreach ($userIds as $userId) { $mod = $userId % $db['shard_num']; $mod = abs($mod); $userGroup[$mod][] = $userId; } foreach ($userGroup as $dbId=>$userIdArr) { if (!isset($this->shardDbArr[$dbId])) { $shardDbConfig = $this->getDbDeploy($dbId, 'shard'); $this->shardDbArr[$dbId] = $shardDbConfig; } else { $shardDbConfig = $this->shardDbArr[$dbId]; } $consumeTable = $shardDbConfig['table'].'.consume'; unset($shardDbConfig['table']); $consumeId = 0; $common = [ 'push_time' => time(), 'user_id' => '' ]; $userIdStr = implode(',', $userIdArr); $do = true; while($do) { $consumeSql = "SELECT id, user_id, createtime as consume_time, book_id, book_name, chapter_id, chapter_name, kandian as shubi, free_kandian as free_shubi FROM {$consumeTable} FORCE INDEX(consume_createtime) WHERE id > {$consumeId} AND user_id in ({$userIdStr}) AND createtime >= {$this->selectBeginTime} AND createtime < {$this->selectEndTime} ORDER BY id asc LIMIT 500"; $date = date("Y-m-d H:i:s"); try { $consumeRows = Db::connect($shardDbConfig)->query($consumeSql); if ($consumeRows) { $totalConsume += count($consumeRows); foreach ($consumeRows as $consumeRow) { try { $consumeId = $consumeRow['id']; unset($consumeRow['id']); $item = array_merge($common, $consumeRow); $this->toString($item); $postConsumeData[] = $item; unset($item); } catch (Exception $e) { echo "用户消费记录异常:用户ID:{$consumeRow['user_id']} --记录ID:{$consumeId} --时间:{$date}". $e->getMessage() .PHP_EOL; } } if (count($postConsumeData) >= $consumeLimit) { if ($this->doPost($this->apiList['consume'], $postConsumeData)) { $totalSuccessConsume += count($postConsumeData); } $postConsumeData = []; } if (count($consumeRows) < $consumeLimit) { $do = false; } } else { $do = false; } unset($consumeRows); } catch (Exception $e) { echo "用户消费记录异常:SQL:{$consumeSql} --时间:{$date} error:" . $e->getMessage() .PHP_EOL; } } } } } /** * 阅读记录 */ private function selectRecent($postUserData, &$totalRecent, &$totalSuccessRecent, $recentLimit = 100, &$postRecnetData) { $userIds = array_column($postUserData,'user_id'); if ($userIds) { //先取模分组吧 减少查询 $userGroup = []; $db = Config::get('db'); foreach ($userIds as $userId) { $mod = $userId % $db['shard_num']; $mod = abs($mod); $userGroup[$mod][] = $userId; } foreach ($userGroup as $dbId=>$userIdArr) { if (!isset($this->shardDbArr[$dbId])) { $shardDbConfig = $this->getDbDeploy($dbId, 'shard'); $this->shardDbArr[$dbId] = $shardDbConfig; } else { $shardDbConfig = $this->shardDbArr[$dbId]; } $recentTable = $shardDbConfig['table'].'.user_recently_read'; unset($shardDbConfig['table']); $recentId = 0; $common = [ 'push_time' => time(), 'user_id' => '' ]; $userIdStr = implode(',', $userIdArr); $do = true; while($do) { $recentSql = "SELECT id, user_id, book_id, chapter_id, chapter_name, flag, updatetime as update_time FROM {$recentTable} FORCE INDEX(sign_updatetime) WHERE id > {$recentId} AND user_id in ({$userIdStr}) AND updatetime >= {$this->selectBeginTime} AND updatetime < {$this->selectEndTime} ORDER BY id asc LIMIT 500"; $date = date("Y-m-d H:i:s"); try { $recentRows = Db::connect($shardDbConfig)->query($recentSql); if ($recentRows) { $totalRecent += count($recentRows); foreach ($recentRows as $recentRow) { try { $recentId = $recentRow['id']; unset($recentRow['id']); list($bookName, $bookTags) = $this->getBook($recentRow['book_id']); unset($bookTags); $item = array_merge($common, $recentRow, ['book_name' => $bookName]); unset($bookName); $this->toString($item); $postRecnetData[] = $item; unset($item); } catch (Exception $e) { echo "用户阅读记录异常:用户ID:{$recentRow['user_id']} --记录ID:{$recentId} --时间:{$date}". $e->getMessage() .PHP_EOL; } } if (count($postRecnetData) >= $recentLimit) { if ($this->doPost($this->apiList['read'], $postRecnetData)) { $totalSuccessRecent += count($postRecnetData); } $postRecnetData = []; } if (count($recentRows) < $recentLimit) { $do = false; } } else { $do = false; } unset($recentRows); } catch (Exception $e) { echo "用户用户记录异常:SQL:{$recentSql} --时间:{$date}". $e->getMessage() .PHP_EOL; } } } } } /** * 发送数据 * @param $url * @param $postData * @param int $times * @return bool */ private function doPost($url, $postData, $times = 1) { $sleep = 60*1000000/120; usleep($sleep); if ($times >=3) { echo "重试次数过多 直接跳过" .PHP_EOL; return false; } $times++; $token = 'hzQhFpVrCXRvZfXl'; $strs="QWERTYUIOPASDFGHJKLZXCVBNM1234567890qwertyuiopasdfghjklzxcvbnm"; $nonce=substr(str_shuffle($strs),mt_rand(0,strlen($strs)-11),6); $query = [ 'nonce' => $nonce, // 正式环境使用随机生成 'timestamp' => time(), 'client_id' => 1005, ]; $query['signature'] = sha1($token.$query['timestamp'].$query['client_id'].$query['nonce']); $http = new Http(); $ret = $http->post($this->host.$url.'?'.http_build_query($query),json_encode($postData, JSON_UNESCAPED_SLASHES)); //$dataJson = json_encode($postData, 256); if ($ret) { $res = json_decode($ret, true); if ($res['code'] != 200) { //失败 休息一下重试一次 sleep(2); echo "进行重试 访问次数:{$times}".PHP_EOL; $this->doPost($url, $postData, $times); } //echo "推送结果:{$ret} 推送数据:{$dataJson}".PHP_EOL; echo "推送结果:{$ret} , URL:{$url}".PHP_EOL; } else { echo "进行重试 访问次数:{$times} 失败返回结果:{$ret}".PHP_EOL; $this->doPost($url, $postData, $times); } return true; } /** * 转换数据 * @param $data */ private function toString(&$data) { if (is_array($data)) { array_walk($data, function (&$val, $key) {$val = strval($val);}); } } /** * 从库配置 * @param $param * @param string $deploy * @return array|mixed */ private function getDbDeploy($param, $deploy = 'shard') { $db = Config::get('db'); $mod = $param % $db[$deploy . '_num']; $mod = abs($mod); $list = explode(';', $db[$deploy . '_list']); foreach ($list as $item) { $con = explode(':', $item); // 0=0-191库编号 1=192.168.1.149主IP 2=3306主端口 3=192.168.1.150从IP 4=3306从端口 if (count($con) >= 3) { $c = explode('-', $con[0]); //库编号 0开始 1结束 if (count($c) >= 2) { if ($c[0] <= $mod && $mod <= $c[1]) { $database = Config::get('database'); if (count($con) >= 5) { //开启主从 & 带主从配置 $database['deploy'] = 1; $database['rw_separate'] = true; $database['hostname'] = $con[1] . ',' . $con[3]; //192.168.1.149,192.168.1.150 $database['hostport'] = $con[2] . ',' . $con[4]; //3306,3306 } else { //只有主库 $database['hostname'] = $con[1]; $database['hostport'] = $con[2]; } $database['database'] = 'mysql'; $database['table'] = str_replace('$mod', $mod, $db[$deploy . '_database']); return $database; } } } } return []; } /** * 返回书籍 * @param $bookId * @return mixed * @throws Exception * @throws \think\db\exception\BindParamException * @throws \think\exception\PDOException */ private function getBook($bookId) { if (!isset($this->books[$bookId])) { //查询 $mainSlaveDbConfig = $this->getMainSlaveDbConfig(); $bookSql = "select book.name, bc.name as category_name from book left join book_category as bc on bc.id = book.book_category_id where book.id = {$bookId}"; $bookRow = Db::connect($mainSlaveDbConfig)->query($bookSql); $this->books[$bookId] = array_values($bookRow[0]); } return $this->books[$bookId]; } /** * 获取主库的从库配置 从库不存在返回主库 * @return array */ private function getMainSlaveDbConfig() { $hostArr = explode(',', Env::get('database.admin_hostname')); $portArr = explode(',', Env::get('database.admin_hostport', '3306,3306')); //默认主库 $mainDbConfig = array_merge(Config::get("database"), ['hostname' =>$hostArr[0], 'hostport' => $portArr[0]]); if (count($hostArr) >= 2) { //从库 $mainDbConfig = array_merge(Config::get("database"), ['hostname' =>$hostArr[1], 'hostport' => $portArr[1]]); } return $mainDbConfig; } }