SyncUser.php 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685
  1. <?php
  2. namespace app\admin\command;
  3. use app\common\library\Rabbitmq;
  4. use app\common\library\Redis;
  5. use app\common\library\WeChatObject;
  6. use app\main\service\FinancialService;
  7. use EasyWeChat\Factory;
  8. use PhpAmqpLib\Message\AMQPMessage;
  9. use Symfony\Component\Cache\Simple\RedisCache;
  10. use think\Config;
  11. use think\console\input\Option;
  12. use think\console\Command;
  13. use think\console\Input;
  14. use think\console\Output;
  15. use app\common\model\Config as dbconfig;
  16. use think\Exception;
  17. use think\Log;
  18. use think\Request;
  19. use think\Exception as thinkException;
  20. class SyncUser extends BaseCommand {
  21. const PULL_MAX_FAIL_NUMBER = 5;
  22. const SYNC_MQ_QUEUE_NAME = 'Q_SYNC_USER';
  23. const SYNC_MQ_EXCHANGE_NAME = 'E_SYNC_USER';
  24. const SYNC_SCRIPT_MQ_QUEUE_NAME = 'Q_SYNC_USER_SCRIPT';
  25. const SYNC_SCRIPT_EXCHANGE_QUEUE_NAME = 'E_SYNC_USER_SCRIPT';
  26. protected $consoleOut;
  27. protected $consoleIn;
  28. protected $pull_max_number;
  29. protected $pull_fail_number;
  30. protected $err_message;
  31. protected $err_mq_msg;
  32. protected function configure(){
  33. $this->setName('SyncUser')
  34. ->addOption('type', 't', Option::VALUE_REQUIRED, '操作类型: cmd:命令行模式,mq:RabbitMQ模式,handle:RabbitMQ处理拉取的微信关注数据', 'mq')
  35. ->addOption('channel_id','c',Option::VALUE_OPTIONAL,'命令行模式请传递渠道ID')
  36. ->addOption('open_id','o',Option::VALUE_OPTIONAL,'开始的OpenID')
  37. ->setDescription('同步微信服务号关注用户到CPS系统');
  38. }
  39. /**
  40. * 入口方法
  41. * @param Input $input
  42. * @param Output $output
  43. * @return int|null|void
  44. */
  45. protected function execute(Input $input, Output $output){
  46. //设置输出
  47. $this->consoleOut = $output;
  48. //设置输入
  49. $this->consoleIn = $input;
  50. //cli模式下无法获取到当前的项目模块,手动指定一下
  51. Request::instance()->module('admin');
  52. $type = $input->getOption('type');
  53. switch ($type) {
  54. case 'cmd':
  55. $this->executeCMD();
  56. break;
  57. case 'mq':
  58. $this->executeMQ();
  59. case 'handle':
  60. $this->syncHandler();
  61. break;
  62. default:
  63. $output->error("无法识别的参数:".$type);
  64. }
  65. }
  66. /**
  67. * 命令行处理方法
  68. */
  69. protected function executeCMD(){
  70. if(!$channel_id = $this->consoleIn->getOption('channel_id')){
  71. $this->consoleOut->error('获取渠道ID失败,请传入渠道ID');
  72. return;
  73. }
  74. if(!is_numeric($channel_id)){
  75. $this->consoleOut->error('获取渠道ID失败,渠道ID为整数类型');
  76. return;
  77. }
  78. $open_id = $this->consoleIn->getOption('open_id');
  79. //启动进程
  80. $this->sync($channel_id,$open_id);
  81. }
  82. /**
  83. * MQ队列处理方法
  84. */
  85. protected function executeMQ(){
  86. try {
  87. $mq = new Rabbitmq();
  88. $mq->receive(self::SYNC_MQ_QUEUE_NAME, function (AMQPMessage $msg){
  89. $this->err_mq_msg = $msg;
  90. $data = json_decode($msg->body, true);
  91. $channel = $msg->delivery_info['channel'];
  92. if(isset($data['channel_id']) && !empty($data['channel_id'])){
  93. try{
  94. if($is_end = $this->sync($data['channel_id'])){
  95. $channel->basic_ack($msg->delivery_info['delivery_tag']);
  96. }else{
  97. //拒绝消息,并丢弃
  98. $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false);
  99. //五秒后重新拉取
  100. sleep(5);
  101. }
  102. }catch (\Exception $e){
  103. Log::error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage());
  104. $this->consoleOut->error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage());
  105. $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false);
  106. }catch (thinkException $e){
  107. Log::error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage());
  108. $this->consoleOut->error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage());
  109. $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false);
  110. }catch (\Error $e){
  111. Log::error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage());
  112. $this->consoleOut->error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage());
  113. $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false);
  114. }catch (\Throwable $e){
  115. Log::error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage());
  116. $this->consoleOut->error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage());
  117. $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false);
  118. }
  119. }else{
  120. Log::error('SyncUser->Info: 渠道信息获取失败,丢弃任务:'.$msg->body);
  121. $this->consoleOut->error('SyncUser->Info: 渠道信息获取失败,丢弃任务:'.$msg->body);
  122. $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false);
  123. }
  124. });
  125. } catch (\Exception $e) {
  126. Log::error("MQ 同步用户 触发异常!message:" . $e->getMessage());
  127. $this->consoleOut->writeln("MQ 同步用户 触发异常!message:" . $e->getMessage());
  128. exit(0);
  129. }
  130. }
  131. /**
  132. * 同步用户数据
  133. * @param $channel_id
  134. * @param $next_open_id
  135. * @return bool
  136. * @throws
  137. */
  138. protected function sync($channel_id,$next_open_id = ''){
  139. $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
  140. print_r(model('AdminConfig')->query("select connection_id();"));
  141. model('AdminConfig')->getConnection()->free();
  142. model('AdminConfig')->getConnection()->close();
  143. if(!$adminConfig){
  144. $this->consoleOut->error('SyncUser->Info: ChannelID:'.$channel_id.' 获取渠道信息失败,丢弃任务');
  145. Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 获取渠道信息失败,丢弃任务');
  146. $this->sendWorkMsg($channel_id,'获取渠道信息失败,丢弃任务',$next_open_id);
  147. return false;
  148. }
  149. if(!$adminConfig['appid'] || !$adminConfig['refresh_token']){
  150. $this->sendWorkMsg($channel_id,'获取渠道 appid or refresh_token 失败,丢弃任务',$next_open_id);
  151. Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 获取渠道 appid or refresh_token 失败,丢弃任务');
  152. $this->consoleOut->error('SyncUser->Info: ChannelID:'.$channel_id.' 获取渠道 appid or refresh_token 失败,丢弃任务');
  153. return false;
  154. }
  155. if(!$this->insertSyncData($channel_id,$adminConfig['appid'])){
  156. $this->sendWorkMsg($channel_id,'写同步数据信息失败,丢弃任务',$next_open_id);
  157. Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 写同步数据信息失败,丢弃任务');
  158. $this->consoleOut->error('SyncUser->Info: ChannelID:'.$channel_id.' 写同步数据信息失败,丢弃任务');
  159. return false;
  160. }
  161. $this->pull_max_number[$channel_id] = 0;
  162. $this->pull_fail_number[$channel_id] = 0;
  163. while($fans_list = $this->getWeChatUsers($channel_id,$next_open_id)){
  164. if(isset($fans_list['total'])){
  165. model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->update(['fans_total'=>$fans_list['total']]);
  166. model('SyncUser')->getConnection()->free();
  167. model('SyncUser')->getConnection()->close();
  168. }
  169. if(isset($fans_list['data']['openid'])){
  170. $mq = new Rabbitmq();
  171. if($open_chunk = array_chunk($fans_list['data']['openid'],1000)){
  172. foreach($open_chunk as $val){
  173. $fans_list['count'] = count($val);
  174. $fans_list['data']['openid'] = $val;
  175. model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('queue_total');
  176. model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('pull_success_number');
  177. model('SyncUser')->getConnection()->free();
  178. model('SyncUser')->getConnection()->close();
  179. $mq->send(['channel_id'=>$channel_id,'fans_list'=>$fans_list],self::SYNC_SCRIPT_MQ_QUEUE_NAME,self::SYNC_SCRIPT_EXCHANGE_QUEUE_NAME);
  180. }
  181. }
  182. $next_open_id = $fans_list['next_openid'];
  183. }
  184. }
  185. if(isset($this->pull_max_number[$channel_id]) && $this->pull_max_number[$channel_id] >= self::PULL_MAX_FAIL_NUMBER){
  186. $this->sendWorkMsg($channel_id,$this->err_message,$next_open_id);
  187. return false; //拉取失败次数超过固定值时判定为失败
  188. }
  189. return true;
  190. }
  191. public function sendWorkMsg($channel_id,$errMsg = '',$next_open_id =''){
  192. try{
  193. $configModel = new dbconfig();
  194. $siteconfig = $configModel->getConfigSiteArr();
  195. $theme = $siteconfig['theme'];
  196. $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
  197. if($adminExtend = model('AdminExtend')->getInfo($channel_id)){
  198. $extend_info = model('Admin')->where('id',$adminExtend['create_by'])->find();
  199. model('Admin')->getConnection()->free();
  200. model('Admin')->getConnection()->close();
  201. }
  202. model('AdminConfig')->getConnection()->free();
  203. model('AdminConfig')->getConnection()->close();
  204. model('AdminExtend')->getConnection()->free();
  205. model('AdminExtend')->getConnection()->close();
  206. $configModel->getConnection()->free();
  207. $configModel->getConnection()->close();
  208. $k_nickname = isset($extend_info) ? $extend_info['nickname'] ?? '' : '';
  209. $k_username = isset($extend_info) ? $extend_info['username'] ?? '' : '';
  210. $k_id = isset($adminExtend) ? $adminExtend['create_by'] ?? '' : '';
  211. $admin_info = model('AdminConfig')
  212. ->join('admin', 'admin.id=admin_config.admin_id','LEFT')
  213. ->field('admin_config.*,admin.id,admin.username,admin.nickname')
  214. ->where('admin.id','=',$channel_id)
  215. ->find();
  216. model('AdminConfig')->getConnection()->free();
  217. model('AdminConfig')->getConnection()->close();
  218. $themeName = '';
  219. switch($theme){
  220. case 'qy':
  221. $themeName = '袋鼠';
  222. break;
  223. case 'sf':
  224. $themeName = '沙发';
  225. break;
  226. case 'ms':
  227. $themeName = '美书';
  228. break;
  229. case '':
  230. $themeName = '西瓜';
  231. break;
  232. default:
  233. $themeName = '';
  234. }
  235. $msg = "发现异常!" . date('Y-m- H:i:s');
  236. $msg .= "\n公众号:" . ($adminConfig['json']['authorizer_info']['nick_name']??'');
  237. $msg .= "\nAPPID:".$adminConfig['appid'] ?? '';
  238. $msg .= "\n原始ID:" . ($adminConfig['json']['authorizer_info']['user_name']??'');
  239. $msg .= "\n渠道ID:" . $adminConfig['admin_id'] ?? $channel_id;
  240. $msg .= "\n渠道账号:".$admin_info['username'] ?? '';
  241. $msg .= "\n渠道昵称:".$admin_info['nickname'] ?? '';
  242. $msg .= "\n开户人ID:".$k_id;
  243. $msg .= "\n开户人账号:".$k_username;
  244. $msg .= "\n开户人昵称:".$k_nickname;
  245. $msg .= "\nnext_openid:".$next_open_id;
  246. $msg .="\n模板:".$themeName;
  247. $msg .= "\n错误信息:".$errMsg;
  248. $this->sendWorkChatMessage($msg);
  249. }catch (\Exception $e){
  250. Log::error('SyncUser->Info:发送企业消息失败,ChannelID:'.$channel_id);
  251. }
  252. }
  253. protected function insertSyncData($channel_id,$appid){
  254. if(!model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$appid])->find()){
  255. if(!$ins_id = model('SyncUser')->insertGetId(['admin_id'=>$channel_id,'appid'=>$appid,'sync_status'=>1,'createtime'=>time(),'updatetime'=>time()])){
  256. model('SyncUser')->getConnection()->free();
  257. model('SyncUser')->getConnection()->close();
  258. $this->consoleOut->error('syncUser->writeMysql: ChannelID:'.$channel_id.' appid:'.$appid.' Write Mysql Fail');
  259. Log::error('syncUser->writeMysql: ChannelID:'.$channel_id.' appid:'.$appid.' Write Mysql Fail');
  260. return false;
  261. }else{
  262. model('SyncUser')->getConnection()->free();
  263. model('SyncUser')->getConnection()->close();
  264. $this->consoleOut->info('syncUser->writeMysql: ChannelID:'.$channel_id.' appid:'.$appid.' Write Mysql Success');
  265. Log::info('syncUser->writeMysql: ChannelID:'.$channel_id.' appid:'.$appid.' Write Mysql Success');
  266. return true;
  267. }
  268. }else{
  269. //有数据时,重置数据
  270. model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$appid])->update([
  271. 'sync_status'=>1,
  272. 'fans_total'=>0,
  273. 'pull_success_number'=>0,
  274. 'pull_fail_number'=>0,
  275. 'queue_total'=>0,
  276. 'queue_run' => 0,
  277. 'sync_status' => 0,
  278. 'sync_success_number' => 0,
  279. 'sync_fail_number' => 0,
  280. 'sync_skip_number' => 0,
  281. 'updatetime' => time()
  282. ]);
  283. model('SyncUser')->getConnection()->free();
  284. model('SyncUser')->getConnection()->close();
  285. }
  286. return true;
  287. }
  288. protected function syncHandler(){
  289. try {
  290. $mq = new Rabbitmq();
  291. $mq->receive(self::SYNC_SCRIPT_MQ_QUEUE_NAME, function (AMQPMessage $msg){
  292. $this->err_mq_msg = $msg;
  293. $channel = $msg->delivery_info['channel'];
  294. $data = json_decode($msg->body, true);
  295. $channel_id = isset($data['channel_id']) ? $data['channel_id'] : null;
  296. $fans_list = isset($data['fans_list']) ? $data['fans_list'] : [];
  297. $this->consoleOut->info('SyncUser->Info:接收到消费信息 Channel_id:'.$channel_id);
  298. Log::info('SyncUser->Info:接收到消费信息 Channel_id:'.$channel_id);
  299. if($channel_id && $fans_list && isset($fans_list['data']['openid'])){
  300. try{
  301. $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
  302. model('AdminConfig')->getConnection()->free();
  303. model('AdminConfig')->getConnection()->close();
  304. if($adminConfig['appid'] && $adminConfig['refresh_token']){
  305. foreach ($fans_list['data']['openid'] as $key => $open_id){
  306. Log::info('SyncUser->Info: Total:'.$fans_list['total'].' Count:'.$fans_list['count'].' Current:'.$key);
  307. $this->consoleOut->info('SyncUser->Info: Total:'.$fans_list['total'].' Count:'.$fans_list['count'].' Current:'.$key);
  308. if($wechat = $this->getChannelWechat($channel_id)){
  309. $this->weChatInsertUser($channel_id,$open_id,$wechat);
  310. }else{
  311. Log::error('SyncUser->Info: 获取渠道信息失败,丢弃消息 ChannelID:'.$channel_id.' Fans:'.json_encode($fans_list));
  312. $this->consoleOut->error('SyncUser->Info: 获取渠道信息失败,丢弃消息 ChannelID:'.$channel_id.' Fans:'.json_encode($fans_list));
  313. }
  314. }
  315. $channel->basic_ack($msg->delivery_info['delivery_tag']);
  316. $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
  317. model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('queue_run');
  318. $queue_total = model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->value('queue_total');
  319. $queue_run = model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->value('queue_run');
  320. if($queue_total >= $queue_run){
  321. model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->update(['sync_status'=>2]);
  322. model('SyncUser')->getConnection()->free();
  323. model('SyncUser')->getConnection()->close();
  324. }
  325. model('AdminConfig')->getConnection()->free();
  326. model('AdminConfig')->getConnection()->close();
  327. model('SyncUser')->getConnection()->free();
  328. model('SyncUser')->getConnection()->close();
  329. }else{
  330. Log::error('SyncUser->Info: 获取 appid or refresh_token 失败 ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body);
  331. $this->consoleOut->error('SyncUser->Info: 获取 appid or refresh_token 失败 ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body);
  332. $channel->basic_ack($msg->delivery_info['delivery_tag']);
  333. }
  334. }catch (\Exception $e){
  335. Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body.' Handle异常:'.$e->getMessage());
  336. $channel->basic_ack($msg->delivery_info['delivery_tag']);
  337. }catch (thinkException $e){
  338. Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body.' Handle异常:'.$e->getMessage());
  339. $channel->basic_ack($msg->delivery_info['delivery_tag']);
  340. }catch (\Error $e){
  341. Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body.' Handle异常:'.$e->getMessage());
  342. $channel->basic_ack($msg->delivery_info['delivery_tag']);
  343. }catch (\Throwable $e){
  344. Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body.' Handle异常:'.$e->getMessage());
  345. $channel->basic_ack($msg->delivery_info['delivery_tag']);
  346. }
  347. }
  348. $this->consoleOut->info('SyncUser->Info:消费信息处理完毕 Channel_id:'.$channel_id);
  349. Log::info('SyncUser->Info:消费信息处理完毕 Channel_id:'.$channel_id);
  350. exit(0);
  351. });
  352. } catch (\Exception $e) {
  353. Log::error("MQ 同步用户数据处理异常 触发异常!message:" . $e->getMessage());
  354. $this->consoleOut->writeln("MQ 同步用户数据处理异常 触发异常!message:" . $e->getMessage());
  355. exit(0);
  356. }
  357. }
  358. /**
  359. * 获取要处理的粉丝总数
  360. * @param $channel_id
  361. * @param $next_open_id
  362. * @return bool|mixed
  363. */
  364. protected function getWeChatUsers($channel_id,$next_open_id){
  365. try{
  366. if(isset($this->pull_max_number[$channel_id]) && $this->pull_max_number[$channel_id] >= self::PULL_MAX_FAIL_NUMBER){
  367. return false; //拉取失败次数超过固定值时不再重试
  368. }
  369. if(!$officialAccount = $this->getChannelWechat($channel_id)){
  370. $this->pull_fail_number[$channel_id]++;
  371. $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
  372. model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('pull_fail_number');
  373. model('AdminConfig')->getConnection()->free();
  374. model('AdminConfig')->getConnection()->close();
  375. model('SyncUser')->getConnection()->free();
  376. model('SyncUser')->getConnection()->close();
  377. $this->consoleOut->error('获取渠道微信配置失败!');
  378. return true; //获取失败时重新获取
  379. }
  380. if(!$result = $officialAccount->user->list($next_open_id)){
  381. $this->pull_fail_number[$channel_id]++;
  382. $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
  383. model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('pull_fail_number');
  384. model('AdminConfig')->getConnection()->free();
  385. model('AdminConfig')->getConnection()->close();
  386. model('SyncUser')->getConnection()->free();
  387. model('SyncUser')->getConnection()->close();
  388. $this->consoleOut->error('获取渠道关注用户失败!');
  389. return true; //拉取失败时,重新拉取
  390. }
  391. if(isset($result['count']) && !empty($result['count'])){
  392. $this->pull_max_number[$channel_id] = 0;
  393. return $result;
  394. }
  395. if(isset($result['count']) && empty($result['count'])){
  396. $this->pull_max_number[$channel_id] = 0;
  397. return false; //处理数据为空时停止循环
  398. }
  399. }catch (\Exception $e){
  400. $this->pull_fail_number[$channel_id]++;
  401. $this->pull_max_number[$channel_id]++;
  402. $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
  403. model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('pull_fail_number');
  404. model('AdminConfig')->getConnection()->free();
  405. model('AdminConfig')->getConnection()->close();
  406. model('SyncUser')->getConnection()->free();
  407. model('SyncUser')->getConnection()->close();
  408. $this->err_message = $e->getMessage();
  409. $this->consoleOut->error("调用微信接口异常,Error:".$e->getMessage());
  410. Log::error("调用微信接口异常,Error:".$e->getMessage());
  411. return true; //异常时,从新执行
  412. }
  413. }
  414. /**
  415. * 获取渠道对象
  416. * @param $channel_id
  417. * @return \EasyWeChat\OpenPlatform\Authorizer\OfficialAccount\Application
  418. */
  419. private function getChannelWechat($channel_id){
  420. try{
  421. $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
  422. print_r(model('AdminConfig')->query("select connection_id();"));
  423. model('AdminConfig')->getConnection()->free();
  424. model('AdminConfig')->getConnection()->close();
  425. if(!$adminConfig){
  426. $this->consoleOut->error('获取渠道配置信息失败');
  427. return false;
  428. }
  429. $wechat = new WeChatObject($adminConfig);
  430. $officialAccount = $wechat->getOfficialAccount();
  431. return $officialAccount;
  432. }catch (\Exception $e){
  433. $this->consoleOut->error('获取OfficialAccount失败 ChannelID:'.$channel_id);
  434. Log::error('SyncUser->Info: 获取OfficialAccount失败 ChannelID:'.$channel_id);
  435. return null;
  436. }
  437. }
  438. /**
  439. * 获取内存使用情况
  440. * @return string
  441. */
  442. protected function memory_usage() {
  443. $memory = ( ! function_exists('memory_get_usage')) ? '0' : round(memory_get_usage()/1024/1024, 2).'MB';
  444. return $memory;
  445. }
  446. /**
  447. * 微信关注时创建用户
  448. * @param $channel_id 渠道ID
  449. * @param $open_id OPENID
  450. * @param $weChat \EasyWeChat\OpenPlatform\Authorizer\OfficialAccount\Application EasyWeChat对象
  451. * @return bool
  452. */
  453. public function weChatInsertUser($channel_id,$open_id,\EasyWeChat\OpenPlatform\Authorizer\OfficialAccount\Application $weChat){
  454. try{
  455. //获取基本信息
  456. $user_id = model('openid')->getUserId($channel_id,$open_id);
  457. model('openid')->getConnection()->free();
  458. model('openid')->getConnection()->close();
  459. $user_info = null;
  460. if($user_id){
  461. $user_info = model('user')->setConnect($user_id)->getUserInfo($user_id);
  462. model('user')->setConnect($user_id)->getConnection()->free();
  463. model('user')->setConnect($user_id)->getConnection()->close();
  464. }
  465. if($user_id && $user_info){
  466. $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
  467. //跳过用户加一
  468. model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('sync_skip_number');
  469. model('SyncUser')->getConnection()->free();
  470. model('SyncUser')->getConnection()->close();
  471. $this->consoleOut->info('syncUser->createUser: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Exists Skip Next User');
  472. Log::info('syncUser->createUser: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Exists Skip Next User');
  473. }
  474. $time = time();
  475. //Redis 加锁
  476. $redis = Redis::instanceCache();
  477. $nxKey = 'L:'.$channel_id.':'.$open_id;
  478. if(!$user_id || !$user_info){
  479. if(!$nxRes = $redis->setnx($nxKey,0)){ //已经存在
  480. return false;
  481. }
  482. $redis->expire($nxKey,20); //20秒过期
  483. }
  484. //用户ID不存在时创建用户ID
  485. if(!$user_id){
  486. $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
  487. if(!$user_id = $this->createUserID($channel_id,$open_id,$time)){
  488. $redis->del($nxKey);
  489. model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('sync_fail_number');
  490. model('SyncUser')->getConnection()->free();
  491. model('SyncUser')->getConnection()->close();
  492. model('AdminConfig')->getConnection()->free();
  493. model('AdminConfig')->getConnection()->close();
  494. $this->consoleOut->error('syncUser->createUserID: ChannelID:'.$channel_id.' OpenID:'.$open_id.' Create Fail');
  495. Log::error('syncUser->createUserID: ChannelID:'.$channel_id.' OpenID:'.$open_id.' Create Fail');
  496. return false;
  497. }
  498. $this->consoleOut->info('syncUser->createUserID: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Success');
  499. Log::info('syncUser->createUserID: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Success');
  500. }
  501. //用户基本信息不存在时创建用户基本信息
  502. if(!$user_info){
  503. $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
  504. if(!$info_id = $this->createUserInfo($weChat,$channel_id,$open_id,$user_id,$time)){
  505. $redis->del($nxKey);
  506. model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('sync_fail_number');
  507. model('AdminConfig')->getConnection()->free();
  508. model('AdminConfig')->getConnection()->close();
  509. model('SyncUser')->getConnection()->free();
  510. model('SyncUser')->getConnection()->close();
  511. $this->consoleOut->error('syncUser->createUserInfo: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Fail');
  512. Log::error('syncUser->createUserInfo: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Fail');
  513. return false;
  514. }
  515. model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('sync_success_number');
  516. model('AdminConfig')->getConnection()->free();
  517. model('AdminConfig')->getConnection()->close();
  518. model('SyncUser')->getConnection()->free();
  519. model('SyncUser')->getConnection()->close();
  520. $this->consoleOut->info('syncUser->createUserInfo: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Success');
  521. Log::info('syncUser->createUserInfo: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Success');
  522. }
  523. $redis->del($nxKey);
  524. model('AdminConfig')->getConnection()->free();
  525. model('AdminConfig')->getConnection()->close();
  526. return true;
  527. }catch (\Exception $e){
  528. $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
  529. model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('sync_fail_number');
  530. model('SyncUser')->getConnection()->free();
  531. model('SyncUser')->getConnection()->close();
  532. model('AdminConfig')->getConnection()->free();
  533. model('AdminConfig')->getConnection()->close();
  534. Log::error('weChatInsertUser Error:'.$e->getMessage());
  535. return false;
  536. }
  537. }
  538. /**
  539. * 创建用户ID
  540. * @param $channel_id 渠道ID
  541. * @param $open_id 微信OPEN_ID
  542. * @param $time 创建时间
  543. * @return int|null
  544. */
  545. public function createUserID($channel_id,$open_id,$time){
  546. try{
  547. $redisAuto = Redis::instanceAuto();
  548. //redis自增返回新的user_id
  549. $userId = $redisAuto->incr('UID');
  550. $data = ['channel_openid' => $channel_id . '_' . $open_id,'user_id' => $userId,'createtime' => $time,'updatetime' => $time];
  551. if($autoId = model('openid')->setConnect($channel_id, $open_id)->insertGetId($data)){
  552. model('openid')->setConnect($channel_id, $open_id)->getConnection()->free();
  553. model('openid')->setConnect($channel_id, $open_id)->getConnection()->close();
  554. return $userId;
  555. }
  556. model('openid')->setConnect($channel_id, $open_id)->getConnection()->free();
  557. model('openid')->setConnect($channel_id, $open_id)->getConnection()->close();
  558. Log::error('syncUser->createUserID: 创建用户ID失败,OPENID表写入失败 Data:'.json_encode($data));
  559. }catch (\Exception $e){
  560. Log::error('syncUser->createUserID: 创建用户ID失败,Error:'.$e->getMessage());
  561. }
  562. return null;
  563. }
  564. /**
  565. * 创建用户基本信息
  566. * @param $weChat EasyWeChat对象
  567. * @param $channel_id 渠道ID
  568. * @param $open_id 微信OPEN_ID
  569. * @param $user_id 用户ID
  570. * @param $time 创建时间
  571. * @return null
  572. */
  573. public function createUserInfo($weChat,$channel_id,$open_id,$user_id,$time){
  574. try{
  575. $weChatUser = $weChat->user->get($open_id);
  576. if(!empty($weChatUser) && !isset($weChatUser['errcode'])) {
  577. //获取注册赠送看
  578. $data = [
  579. 'id' => $user_id,
  580. 'openid' => $open_id,
  581. 'nickname' => $weChatUser['nickname'] ?? '书友',
  582. 'sex' => $weChatUser['sex'] ?? 0,
  583. 'first_cancel_pay' => 2,
  584. 'avatar' => $weChatUser['headimgurl'] ?? cdnurl('/assets/img/frontend/icon/nav_icon_4.png'),
  585. 'is_subscribe' => $weChatUser['subscribe'] ?? 0,
  586. 'subscribe_time' => $weChatUser['subscribe_time'] ?? 0,
  587. 'channel_id' => $channel_id,
  588. 'country' => $weChatUser['country'] ?? '',
  589. 'province' => $weChatUser['province'] ?? '',
  590. 'city' => $weChatUser['city'] ?? '',
  591. 'createtime' => $time,
  592. 'updatetime' => $time
  593. ];
  594. if($user_info_id = model('User')->setConnect($data['id'])->insertGetId($data)){
  595. model('User')->setConnect($data['id'])->getConnection()->free();
  596. model('User')->setConnect($data['id'])->getConnection()->close();
  597. try {
  598. $ids = explode(',', Config::get('site.give_shubi_channel_id'));
  599. if (in_array($channel_id, $ids)) {
  600. $kandian_reg = 3000;
  601. $endTime = intval(Config::get('site.kandian_free_day')) * 86400;
  602. $rdata = [];
  603. $rdata['free_kandian'] = $kandian_reg;
  604. $rdata['remain_free_kandian'] = $kandian_reg;
  605. $rdata['type'] = 3;
  606. $rdata['user_id'] = $user_info_id;
  607. $rdata['free_endtime'] = $endTime + $time;
  608. $rdata['notes'] = '新用户注册赠送';
  609. $rdata['createtime'] = $time;
  610. $rdata['updatetime'] = $time;
  611. FinancialService::instance()->getRechargeModel()->setConnect($user_info_id)->insertGetId($rdata);
  612. Redis::instance()->del('ZR:' . $user_info_id);
  613. }
  614. } catch (\Exception $e) {
  615. Log::error('赠送书币失败!!');
  616. }
  617. return $user_info_id;
  618. }
  619. model('User')->setConnect($data['id'])->getConnection()->free();
  620. model('User')->setConnect($data['id'])->getConnection()->close();
  621. Log::error('syncUser->createUserInfo: 创建用户基本信息失败,USER表写入失败 Data:' . json_encode($data));
  622. }
  623. }catch (\Exception $e){
  624. Log::error('syncUser->createUserInfo: 创建用户基本信息失败,Error:' . $e->getMessage());
  625. }
  626. return null;
  627. }
  628. //发企业微信
  629. public function sendWorkChatMessage($content)
  630. {
  631. if (empty($content)) {
  632. return false;
  633. }
  634. $wechat = Config::get('wechat');
  635. $wechat['http']['base_uri'] = $wechat['work']['base_uri'];
  636. $wechat['http']['timeout'] = 20;
  637. $wechat['corp_id'] = $wechat['work']['corp_id'];
  638. $wechat['secret'] = $wechat['work']['sync_secret'];
  639. $app = Factory::work($wechat);
  640. $app['cache'] = new RedisCache(Redis::instanceCache());
  641. $res = $app->message
  642. ->message($content)
  643. ->ofAgent($wechat['work']['sync_agent_id'])
  644. ->toParty($wechat['work']['sync_party_id'])
  645. ->send();
  646. return $res;
  647. }
  648. }