setName('SyncUser') ->addOption('type', 't', Option::VALUE_REQUIRED, '操作类型: cmd:命令行模式,mq:RabbitMQ模式,handle:RabbitMQ处理拉取的微信关注数据', 'mq') ->addOption('channel_id','c',Option::VALUE_OPTIONAL,'命令行模式请传递渠道ID') ->addOption('open_id','o',Option::VALUE_OPTIONAL,'开始的OpenID') ->setDescription('同步微信服务号关注用户到CPS系统'); } /** * 入口方法 * @param Input $input * @param Output $output * @return int|null|void */ protected function execute(Input $input, Output $output){ //设置输出 $this->consoleOut = $output; //设置输入 $this->consoleIn = $input; //cli模式下无法获取到当前的项目模块,手动指定一下 Request::instance()->module('admin'); $type = $input->getOption('type'); switch ($type) { case 'cmd': $this->executeCMD(); break; case 'mq': $this->executeMQ(); case 'handle': $this->syncHandler(); break; default: $output->error("无法识别的参数:".$type); } } /** * 命令行处理方法 */ protected function executeCMD(){ if(!$channel_id = $this->consoleIn->getOption('channel_id')){ $this->consoleOut->error('获取渠道ID失败,请传入渠道ID'); return; } if(!is_numeric($channel_id)){ $this->consoleOut->error('获取渠道ID失败,渠道ID为整数类型'); return; } $open_id = $this->consoleIn->getOption('open_id'); //启动进程 $this->sync($channel_id,$open_id); } /** * MQ队列处理方法 */ protected function executeMQ(){ try { $mq = new Rabbitmq(); $mq->receive(self::SYNC_MQ_QUEUE_NAME, function (AMQPMessage $msg){ $this->err_mq_msg = $msg; $data = json_decode($msg->body, true); $channel = $msg->delivery_info['channel']; if(isset($data['channel_id']) && !empty($data['channel_id'])){ try{ if($is_end = $this->sync($data['channel_id'])){ $channel->basic_ack($msg->delivery_info['delivery_tag']); }else{ //拒绝消息,并丢弃 $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false); //五秒后重新拉取 sleep(5); } }catch (\Exception $e){ Log::error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage()); $this->consoleOut->error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage()); $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false); }catch (thinkException $e){ Log::error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage()); $this->consoleOut->error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage()); $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false); }catch (\Error $e){ Log::error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage()); $this->consoleOut->error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage()); $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false); }catch (\Throwable $e){ Log::error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage()); $this->consoleOut->error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage()); $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false); } }else{ Log::error('SyncUser->Info: 渠道信息获取失败,丢弃任务:'.$msg->body); $this->consoleOut->error('SyncUser->Info: 渠道信息获取失败,丢弃任务:'.$msg->body); $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false); } }); } catch (\Exception $e) { Log::error("MQ 同步用户 触发异常!message:" . $e->getMessage()); $this->consoleOut->writeln("MQ 同步用户 触发异常!message:" . $e->getMessage()); exit(0); } } /** * 同步用户数据 * @param $channel_id * @param $next_open_id * @return bool * @throws */ protected function sync($channel_id,$next_open_id = ''){ $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id); print_r(model('AdminConfig')->query("select connection_id();")); model('AdminConfig')->getConnection()->free(); model('AdminConfig')->getConnection()->close(); if(!$adminConfig){ $this->consoleOut->error('SyncUser->Info: ChannelID:'.$channel_id.' 获取渠道信息失败,丢弃任务'); Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 获取渠道信息失败,丢弃任务'); $this->sendWorkMsg($channel_id,'获取渠道信息失败,丢弃任务',$next_open_id); return false; } if(!$adminConfig['appid'] || !$adminConfig['refresh_token']){ $this->sendWorkMsg($channel_id,'获取渠道 appid or refresh_token 失败,丢弃任务',$next_open_id); Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 获取渠道 appid or refresh_token 失败,丢弃任务'); $this->consoleOut->error('SyncUser->Info: ChannelID:'.$channel_id.' 获取渠道 appid or refresh_token 失败,丢弃任务'); return false; } if(!$this->insertSyncData($channel_id,$adminConfig['appid'])){ $this->sendWorkMsg($channel_id,'写同步数据信息失败,丢弃任务',$next_open_id); Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 写同步数据信息失败,丢弃任务'); $this->consoleOut->error('SyncUser->Info: ChannelID:'.$channel_id.' 写同步数据信息失败,丢弃任务'); return false; } $this->pull_max_number[$channel_id] = 0; $this->pull_fail_number[$channel_id] = 0; while($fans_list = $this->getWeChatUsers($channel_id,$next_open_id)){ if(isset($fans_list['total'])){ model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->update(['fans_total'=>$fans_list['total']]); model('SyncUser')->getConnection()->free(); model('SyncUser')->getConnection()->close(); } if(isset($fans_list['data']['openid'])){ $mq = new Rabbitmq(); if($open_chunk = array_chunk($fans_list['data']['openid'],1000)){ foreach($open_chunk as $val){ $fans_list['count'] = count($val); $fans_list['data']['openid'] = $val; model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('queue_total'); model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('pull_success_number'); model('SyncUser')->getConnection()->free(); model('SyncUser')->getConnection()->close(); $mq->send(['channel_id'=>$channel_id,'fans_list'=>$fans_list],self::SYNC_SCRIPT_MQ_QUEUE_NAME,self::SYNC_SCRIPT_EXCHANGE_QUEUE_NAME); } } $next_open_id = $fans_list['next_openid']; } } if(isset($this->pull_max_number[$channel_id]) && $this->pull_max_number[$channel_id] >= self::PULL_MAX_FAIL_NUMBER){ $this->sendWorkMsg($channel_id,$this->err_message,$next_open_id); return false; //拉取失败次数超过固定值时判定为失败 } return true; } public function sendWorkMsg($channel_id,$errMsg = '',$next_open_id =''){ try{ $configModel = new dbconfig(); $siteconfig = $configModel->getConfigSiteArr(); $theme = $siteconfig['theme']; $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id); if($adminExtend = model('AdminExtend')->getInfo($channel_id)){ $extend_info = model('Admin')->where('id',$adminExtend['create_by'])->find(); model('Admin')->getConnection()->free(); model('Admin')->getConnection()->close(); } model('AdminConfig')->getConnection()->free(); model('AdminConfig')->getConnection()->close(); model('AdminExtend')->getConnection()->free(); model('AdminExtend')->getConnection()->close(); $configModel->getConnection()->free(); $configModel->getConnection()->close(); $k_nickname = isset($extend_info) ? $extend_info['nickname'] ?? '' : ''; $k_username = isset($extend_info) ? $extend_info['username'] ?? '' : ''; $k_id = isset($adminExtend) ? $adminExtend['create_by'] ?? '' : ''; $admin_info = model('AdminConfig') ->join('admin', 'admin.id=admin_config.admin_id','LEFT') ->field('admin_config.*,admin.id,admin.username,admin.nickname') ->where('admin.id','=',$channel_id) ->find(); model('AdminConfig')->getConnection()->free(); model('AdminConfig')->getConnection()->close(); $themeName = ''; switch($theme){ case 'qy': $themeName = '袋鼠'; break; case 'sf': $themeName = '沙发'; break; case 'ms': $themeName = '美书'; break; case '': $themeName = '西瓜'; break; default: $themeName = ''; } $msg = "发现异常!" . date('Y-m- H:i:s'); $msg .= "\n公众号:" . ($adminConfig['json']['authorizer_info']['nick_name']??''); $msg .= "\nAPPID:".$adminConfig['appid'] ?? ''; $msg .= "\n原始ID:" . ($adminConfig['json']['authorizer_info']['user_name']??''); $msg .= "\n渠道ID:" . $adminConfig['admin_id'] ?? $channel_id; $msg .= "\n渠道账号:".$admin_info['username'] ?? ''; $msg .= "\n渠道昵称:".$admin_info['nickname'] ?? ''; $msg .= "\n开户人ID:".$k_id; $msg .= "\n开户人账号:".$k_username; $msg .= "\n开户人昵称:".$k_nickname; $msg .= "\nnext_openid:".$next_open_id; $msg .="\n模板:".$themeName; $msg .= "\n错误信息:".$errMsg; $this->sendWorkChatMessage($msg); }catch (\Exception $e){ Log::error('SyncUser->Info:发送企业消息失败,ChannelID:'.$channel_id); } } protected function insertSyncData($channel_id,$appid){ if(!model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$appid])->find()){ if(!$ins_id = model('SyncUser')->insertGetId(['admin_id'=>$channel_id,'appid'=>$appid,'sync_status'=>1,'createtime'=>time(),'updatetime'=>time()])){ model('SyncUser')->getConnection()->free(); model('SyncUser')->getConnection()->close(); $this->consoleOut->error('syncUser->writeMysql: ChannelID:'.$channel_id.' appid:'.$appid.' Write Mysql Fail'); Log::error('syncUser->writeMysql: ChannelID:'.$channel_id.' appid:'.$appid.' Write Mysql Fail'); return false; }else{ model('SyncUser')->getConnection()->free(); model('SyncUser')->getConnection()->close(); $this->consoleOut->info('syncUser->writeMysql: ChannelID:'.$channel_id.' appid:'.$appid.' Write Mysql Success'); Log::info('syncUser->writeMysql: ChannelID:'.$channel_id.' appid:'.$appid.' Write Mysql Success'); return true; } }else{ //有数据时,重置数据 model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$appid])->update([ 'sync_status'=>1, 'fans_total'=>0, 'pull_success_number'=>0, 'pull_fail_number'=>0, 'queue_total'=>0, 'queue_run' => 0, 'sync_status' => 0, 'sync_success_number' => 0, 'sync_fail_number' => 0, 'sync_skip_number' => 0, 'updatetime' => time() ]); model('SyncUser')->getConnection()->free(); model('SyncUser')->getConnection()->close(); } return true; } protected function syncHandler(){ try { $mq = new Rabbitmq(); $mq->receive(self::SYNC_SCRIPT_MQ_QUEUE_NAME, function (AMQPMessage $msg){ $this->err_mq_msg = $msg; $channel = $msg->delivery_info['channel']; $data = json_decode($msg->body, true); $channel_id = isset($data['channel_id']) ? $data['channel_id'] : null; $fans_list = isset($data['fans_list']) ? $data['fans_list'] : []; $this->consoleOut->info('SyncUser->Info:接收到消费信息 Channel_id:'.$channel_id); Log::info('SyncUser->Info:接收到消费信息 Channel_id:'.$channel_id); if($channel_id && $fans_list && isset($fans_list['data']['openid'])){ try{ $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id); model('AdminConfig')->getConnection()->free(); model('AdminConfig')->getConnection()->close(); if($adminConfig['appid'] && $adminConfig['refresh_token']){ foreach ($fans_list['data']['openid'] as $key => $open_id){ Log::info('SyncUser->Info: Total:'.$fans_list['total'].' Count:'.$fans_list['count'].' Current:'.$key); $this->consoleOut->info('SyncUser->Info: Total:'.$fans_list['total'].' Count:'.$fans_list['count'].' Current:'.$key); if($wechat = $this->getChannelWechat($channel_id)){ $this->weChatInsertUser($channel_id,$open_id,$wechat); }else{ Log::error('SyncUser->Info: 获取渠道信息失败,丢弃消息 ChannelID:'.$channel_id.' Fans:'.json_encode($fans_list)); $this->consoleOut->error('SyncUser->Info: 获取渠道信息失败,丢弃消息 ChannelID:'.$channel_id.' Fans:'.json_encode($fans_list)); } } $channel->basic_ack($msg->delivery_info['delivery_tag']); $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id); model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('queue_run'); $queue_total = model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->value('queue_total'); $queue_run = model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->value('queue_run'); if($queue_total >= $queue_run){ model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->update(['sync_status'=>2]); model('SyncUser')->getConnection()->free(); model('SyncUser')->getConnection()->close(); } model('AdminConfig')->getConnection()->free(); model('AdminConfig')->getConnection()->close(); model('SyncUser')->getConnection()->free(); model('SyncUser')->getConnection()->close(); }else{ Log::error('SyncUser->Info: 获取 appid or refresh_token 失败 ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body); $this->consoleOut->error('SyncUser->Info: 获取 appid or refresh_token 失败 ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body); $channel->basic_ack($msg->delivery_info['delivery_tag']); } }catch (\Exception $e){ Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body.' Handle异常:'.$e->getMessage()); $channel->basic_ack($msg->delivery_info['delivery_tag']); }catch (thinkException $e){ Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body.' Handle异常:'.$e->getMessage()); $channel->basic_ack($msg->delivery_info['delivery_tag']); }catch (\Error $e){ Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body.' Handle异常:'.$e->getMessage()); $channel->basic_ack($msg->delivery_info['delivery_tag']); }catch (\Throwable $e){ Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body.' Handle异常:'.$e->getMessage()); $channel->basic_ack($msg->delivery_info['delivery_tag']); } } $this->consoleOut->info('SyncUser->Info:消费信息处理完毕 Channel_id:'.$channel_id); Log::info('SyncUser->Info:消费信息处理完毕 Channel_id:'.$channel_id); exit(0); }); } catch (\Exception $e) { Log::error("MQ 同步用户数据处理异常 触发异常!message:" . $e->getMessage()); $this->consoleOut->writeln("MQ 同步用户数据处理异常 触发异常!message:" . $e->getMessage()); exit(0); } } /** * 获取要处理的粉丝总数 * @param $channel_id * @param $next_open_id * @return bool|mixed */ protected function getWeChatUsers($channel_id,$next_open_id){ try{ if(isset($this->pull_max_number[$channel_id]) && $this->pull_max_number[$channel_id] >= self::PULL_MAX_FAIL_NUMBER){ return false; //拉取失败次数超过固定值时不再重试 } if(!$officialAccount = $this->getChannelWechat($channel_id)){ $this->pull_fail_number[$channel_id]++; $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id); model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('pull_fail_number'); model('AdminConfig')->getConnection()->free(); model('AdminConfig')->getConnection()->close(); model('SyncUser')->getConnection()->free(); model('SyncUser')->getConnection()->close(); $this->consoleOut->error('获取渠道微信配置失败!'); return true; //获取失败时重新获取 } if(!$result = $officialAccount->user->list($next_open_id)){ $this->pull_fail_number[$channel_id]++; $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id); model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('pull_fail_number'); model('AdminConfig')->getConnection()->free(); model('AdminConfig')->getConnection()->close(); model('SyncUser')->getConnection()->free(); model('SyncUser')->getConnection()->close(); $this->consoleOut->error('获取渠道关注用户失败!'); return true; //拉取失败时,重新拉取 } if(isset($result['count']) && !empty($result['count'])){ $this->pull_max_number[$channel_id] = 0; return $result; } if(isset($result['count']) && empty($result['count'])){ $this->pull_max_number[$channel_id] = 0; return false; //处理数据为空时停止循环 } }catch (\Exception $e){ $this->pull_fail_number[$channel_id]++; $this->pull_max_number[$channel_id]++; $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id); model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('pull_fail_number'); model('AdminConfig')->getConnection()->free(); model('AdminConfig')->getConnection()->close(); model('SyncUser')->getConnection()->free(); model('SyncUser')->getConnection()->close(); $this->err_message = $e->getMessage(); $this->consoleOut->error("调用微信接口异常,Error:".$e->getMessage()); Log::error("调用微信接口异常,Error:".$e->getMessage()); return true; //异常时,从新执行 } } /** * 获取渠道对象 * @param $channel_id * @return \EasyWeChat\OpenPlatform\Authorizer\OfficialAccount\Application */ private function getChannelWechat($channel_id){ try{ $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id); print_r(model('AdminConfig')->query("select connection_id();")); model('AdminConfig')->getConnection()->free(); model('AdminConfig')->getConnection()->close(); if(!$adminConfig){ $this->consoleOut->error('获取渠道配置信息失败'); return false; } $wechat = new WeChatObject($adminConfig); $officialAccount = $wechat->getOfficialAccount(); return $officialAccount; }catch (\Exception $e){ $this->consoleOut->error('获取OfficialAccount失败 ChannelID:'.$channel_id); Log::error('SyncUser->Info: 获取OfficialAccount失败 ChannelID:'.$channel_id); return null; } } /** * 获取内存使用情况 * @return string */ protected function memory_usage() { $memory = ( ! function_exists('memory_get_usage')) ? '0' : round(memory_get_usage()/1024/1024, 2).'MB'; return $memory; } /** * 微信关注时创建用户 * @param $channel_id 渠道ID * @param $open_id OPENID * @param $weChat \EasyWeChat\OpenPlatform\Authorizer\OfficialAccount\Application EasyWeChat对象 * @return bool */ public function weChatInsertUser($channel_id,$open_id,\EasyWeChat\OpenPlatform\Authorizer\OfficialAccount\Application $weChat){ try{ //获取基本信息 $user_id = model('openid')->getUserId($channel_id,$open_id); model('openid')->getConnection()->free(); model('openid')->getConnection()->close(); $user_info = null; if($user_id){ $user_info = model('user')->setConnect($user_id)->getUserInfo($user_id); model('user')->setConnect($user_id)->getConnection()->free(); model('user')->setConnect($user_id)->getConnection()->close(); } if($user_id && $user_info){ $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id); //跳过用户加一 model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('sync_skip_number'); model('SyncUser')->getConnection()->free(); model('SyncUser')->getConnection()->close(); $this->consoleOut->info('syncUser->createUser: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Exists Skip Next User'); Log::info('syncUser->createUser: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Exists Skip Next User'); } $time = time(); //Redis 加锁 $redis = Redis::instanceCache(); $nxKey = 'L:'.$channel_id.':'.$open_id; if(!$user_id || !$user_info){ if(!$nxRes = $redis->setnx($nxKey,0)){ //已经存在 return false; } $redis->expire($nxKey,20); //20秒过期 } //用户ID不存在时创建用户ID if(!$user_id){ $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id); if(!$user_id = $this->createUserID($channel_id,$open_id,$time)){ $redis->del($nxKey); model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('sync_fail_number'); model('SyncUser')->getConnection()->free(); model('SyncUser')->getConnection()->close(); model('AdminConfig')->getConnection()->free(); model('AdminConfig')->getConnection()->close(); $this->consoleOut->error('syncUser->createUserID: ChannelID:'.$channel_id.' OpenID:'.$open_id.' Create Fail'); Log::error('syncUser->createUserID: ChannelID:'.$channel_id.' OpenID:'.$open_id.' Create Fail'); return false; } $this->consoleOut->info('syncUser->createUserID: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Success'); Log::info('syncUser->createUserID: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Success'); } //用户基本信息不存在时创建用户基本信息 if(!$user_info){ $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id); if(!$info_id = $this->createUserInfo($weChat,$channel_id,$open_id,$user_id,$time)){ $redis->del($nxKey); model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('sync_fail_number'); model('AdminConfig')->getConnection()->free(); model('AdminConfig')->getConnection()->close(); model('SyncUser')->getConnection()->free(); model('SyncUser')->getConnection()->close(); $this->consoleOut->error('syncUser->createUserInfo: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Fail'); Log::error('syncUser->createUserInfo: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Fail'); return false; } model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('sync_success_number'); model('AdminConfig')->getConnection()->free(); model('AdminConfig')->getConnection()->close(); model('SyncUser')->getConnection()->free(); model('SyncUser')->getConnection()->close(); $this->consoleOut->info('syncUser->createUserInfo: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Success'); Log::info('syncUser->createUserInfo: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Success'); } $redis->del($nxKey); model('AdminConfig')->getConnection()->free(); model('AdminConfig')->getConnection()->close(); return true; }catch (\Exception $e){ $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id); model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('sync_fail_number'); model('SyncUser')->getConnection()->free(); model('SyncUser')->getConnection()->close(); model('AdminConfig')->getConnection()->free(); model('AdminConfig')->getConnection()->close(); Log::error('weChatInsertUser Error:'.$e->getMessage()); return false; } } /** * 创建用户ID * @param $channel_id 渠道ID * @param $open_id 微信OPEN_ID * @param $time 创建时间 * @return int|null */ public function createUserID($channel_id,$open_id,$time){ try{ $redisAuto = Redis::instanceAuto(); //redis自增返回新的user_id $userId = $redisAuto->incr('UID'); $data = ['channel_openid' => $channel_id . '_' . $open_id,'user_id' => $userId,'createtime' => $time,'updatetime' => $time]; if($autoId = model('openid')->setConnect($channel_id, $open_id)->insertGetId($data)){ model('openid')->setConnect($channel_id, $open_id)->getConnection()->free(); model('openid')->setConnect($channel_id, $open_id)->getConnection()->close(); return $userId; } model('openid')->setConnect($channel_id, $open_id)->getConnection()->free(); model('openid')->setConnect($channel_id, $open_id)->getConnection()->close(); Log::error('syncUser->createUserID: 创建用户ID失败,OPENID表写入失败 Data:'.json_encode($data)); }catch (\Exception $e){ Log::error('syncUser->createUserID: 创建用户ID失败,Error:'.$e->getMessage()); } return null; } /** * 创建用户基本信息 * @param $weChat EasyWeChat对象 * @param $channel_id 渠道ID * @param $open_id 微信OPEN_ID * @param $user_id 用户ID * @param $time 创建时间 * @return null */ public function createUserInfo($weChat,$channel_id,$open_id,$user_id,$time){ try{ $weChatUser = $weChat->user->get($open_id); if(!empty($weChatUser) && !isset($weChatUser['errcode'])) { //获取注册赠送看 $data = [ 'id' => $user_id, 'openid' => $open_id, 'nickname' => $weChatUser['nickname'] ?? '书友', 'sex' => $weChatUser['sex'] ?? 0, 'first_cancel_pay' => 2, 'avatar' => $weChatUser['headimgurl'] ?? cdnurl('/assets/img/frontend/icon/nav_icon_4.png'), 'is_subscribe' => $weChatUser['subscribe'] ?? 0, 'subscribe_time' => $weChatUser['subscribe_time'] ?? 0, 'channel_id' => $channel_id, 'country' => $weChatUser['country'] ?? '', 'province' => $weChatUser['province'] ?? '', 'city' => $weChatUser['city'] ?? '', 'createtime' => $time, 'updatetime' => $time ]; if($user_info_id = model('User')->setConnect($data['id'])->insertGetId($data)){ model('User')->setConnect($data['id'])->getConnection()->free(); model('User')->setConnect($data['id'])->getConnection()->close(); try { $ids = explode(',', Config::get('site.give_shubi_channel_id')); if (in_array($channel_id, $ids)) { $kandian_reg = 3000; $endTime = intval(Config::get('site.kandian_free_day')) * 86400; $rdata = []; $rdata['free_kandian'] = $kandian_reg; $rdata['remain_free_kandian'] = $kandian_reg; $rdata['type'] = 3; $rdata['user_id'] = $user_info_id; $rdata['free_endtime'] = $endTime + $time; $rdata['notes'] = '新用户注册赠送'; $rdata['createtime'] = $time; $rdata['updatetime'] = $time; FinancialService::instance()->getRechargeModel()->setConnect($user_info_id)->insertGetId($rdata); Redis::instance()->del('ZR:' . $user_info_id); } } catch (\Exception $e) { Log::error('赠送书币失败!!'); } return $user_info_id; } model('User')->setConnect($data['id'])->getConnection()->free(); model('User')->setConnect($data['id'])->getConnection()->close(); Log::error('syncUser->createUserInfo: 创建用户基本信息失败,USER表写入失败 Data:' . json_encode($data)); } }catch (\Exception $e){ Log::error('syncUser->createUserInfo: 创建用户基本信息失败,Error:' . $e->getMessage()); } return null; } //发企业微信 public function sendWorkChatMessage($content) { if (empty($content)) { return false; } $wechat = Config::get('wechat'); $wechat['http']['base_uri'] = $wechat['work']['base_uri']; $wechat['http']['timeout'] = 20; $wechat['corp_id'] = $wechat['work']['corp_id']; $wechat['secret'] = $wechat['work']['sync_secret']; $app = Factory::work($wechat); $app['cache'] = new RedisCache(Redis::instanceCache()); $res = $app->message ->message($content) ->ofAgent($wechat['work']['sync_agent_id']) ->toParty($wechat['work']['sync_party_id']) ->send(); return $res; } }