123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685 |
- <?php
- namespace app\admin\command;
- use app\common\library\Rabbitmq;
- use app\common\library\Redis;
- use app\common\library\WeChatObject;
- use app\main\service\FinancialService;
- use EasyWeChat\Factory;
- use PhpAmqpLib\Message\AMQPMessage;
- use Symfony\Component\Cache\Simple\RedisCache;
- use think\Config;
- use think\console\input\Option;
- use think\console\Command;
- use think\console\Input;
- use think\console\Output;
- use app\common\model\Config as dbconfig;
- use think\Exception;
- use think\Log;
- use think\Request;
- use think\Exception as thinkException;
- class SyncUser extends BaseCommand {
- const PULL_MAX_FAIL_NUMBER = 5;
- const SYNC_MQ_QUEUE_NAME = 'Q_SYNC_USER';
- const SYNC_MQ_EXCHANGE_NAME = 'E_SYNC_USER';
- const SYNC_SCRIPT_MQ_QUEUE_NAME = 'Q_SYNC_USER_SCRIPT';
- const SYNC_SCRIPT_EXCHANGE_QUEUE_NAME = 'E_SYNC_USER_SCRIPT';
- protected $consoleOut;
- protected $consoleIn;
- protected $pull_max_number;
- protected $pull_fail_number;
- protected $err_message;
- protected $err_mq_msg;
- protected function configure(){
- $this->setName('SyncUser')
- ->addOption('type', 't', Option::VALUE_REQUIRED, '操作类型: cmd:命令行模式,mq:RabbitMQ模式,handle:RabbitMQ处理拉取的微信关注数据', 'mq')
- ->addOption('channel_id','c',Option::VALUE_OPTIONAL,'命令行模式请传递渠道ID')
- ->addOption('open_id','o',Option::VALUE_OPTIONAL,'开始的OpenID')
- ->setDescription('同步微信服务号关注用户到CPS系统');
- }
- /**
- * 入口方法
- * @param Input $input
- * @param Output $output
- * @return int|null|void
- */
- protected function execute(Input $input, Output $output){
- //设置输出
- $this->consoleOut = $output;
- //设置输入
- $this->consoleIn = $input;
- //cli模式下无法获取到当前的项目模块,手动指定一下
- Request::instance()->module('admin');
- $type = $input->getOption('type');
- switch ($type) {
- case 'cmd':
- $this->executeCMD();
- break;
- case 'mq':
- $this->executeMQ();
- case 'handle':
- $this->syncHandler();
- break;
- default:
- $output->error("无法识别的参数:".$type);
- }
- }
- /**
- * 命令行处理方法
- */
- protected function executeCMD(){
- if(!$channel_id = $this->consoleIn->getOption('channel_id')){
- $this->consoleOut->error('获取渠道ID失败,请传入渠道ID');
- return;
- }
- if(!is_numeric($channel_id)){
- $this->consoleOut->error('获取渠道ID失败,渠道ID为整数类型');
- return;
- }
- $open_id = $this->consoleIn->getOption('open_id');
- //启动进程
- $this->sync($channel_id,$open_id);
- }
- /**
- * MQ队列处理方法
- */
- protected function executeMQ(){
- try {
- $mq = new Rabbitmq();
- $mq->receive(self::SYNC_MQ_QUEUE_NAME, function (AMQPMessage $msg){
- $this->err_mq_msg = $msg;
- $data = json_decode($msg->body, true);
- $channel = $msg->delivery_info['channel'];
- if(isset($data['channel_id']) && !empty($data['channel_id'])){
- try{
- if($is_end = $this->sync($data['channel_id'])){
- $channel->basic_ack($msg->delivery_info['delivery_tag']);
- }else{
- //拒绝消息,并丢弃
- $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false);
- //五秒后重新拉取
- sleep(5);
- }
- }catch (\Exception $e){
- Log::error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage());
- $this->consoleOut->error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage());
- $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false);
- }catch (thinkException $e){
- Log::error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage());
- $this->consoleOut->error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage());
- $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false);
- }catch (\Error $e){
- Log::error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage());
- $this->consoleOut->error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage());
- $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false);
- }catch (\Throwable $e){
- Log::error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage());
- $this->consoleOut->error('SyncUser->Info: ChannelID:'.$data['channel_id'].' 添加拉取任务失败,丢弃任务:'.$msg->body.',Error:'.$e->getMessage());
- $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false);
- }
- }else{
- Log::error('SyncUser->Info: 渠道信息获取失败,丢弃任务:'.$msg->body);
- $this->consoleOut->error('SyncUser->Info: 渠道信息获取失败,丢弃任务:'.$msg->body);
- $channel->basic_nack($msg->delivery_info['delivery_tag'],false,false);
- }
- });
- } catch (\Exception $e) {
- Log::error("MQ 同步用户 触发异常!message:" . $e->getMessage());
- $this->consoleOut->writeln("MQ 同步用户 触发异常!message:" . $e->getMessage());
- exit(0);
- }
- }
- /**
- * 同步用户数据
- * @param $channel_id
- * @param $next_open_id
- * @return bool
- * @throws
- */
- protected function sync($channel_id,$next_open_id = ''){
- $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
- print_r(model('AdminConfig')->query("select connection_id();"));
- model('AdminConfig')->getConnection()->free();
- model('AdminConfig')->getConnection()->close();
- if(!$adminConfig){
- $this->consoleOut->error('SyncUser->Info: ChannelID:'.$channel_id.' 获取渠道信息失败,丢弃任务');
- Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 获取渠道信息失败,丢弃任务');
- $this->sendWorkMsg($channel_id,'获取渠道信息失败,丢弃任务',$next_open_id);
- return false;
- }
- if(!$adminConfig['appid'] || !$adminConfig['refresh_token']){
- $this->sendWorkMsg($channel_id,'获取渠道 appid or refresh_token 失败,丢弃任务',$next_open_id);
- Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 获取渠道 appid or refresh_token 失败,丢弃任务');
- $this->consoleOut->error('SyncUser->Info: ChannelID:'.$channel_id.' 获取渠道 appid or refresh_token 失败,丢弃任务');
- return false;
- }
- if(!$this->insertSyncData($channel_id,$adminConfig['appid'])){
- $this->sendWorkMsg($channel_id,'写同步数据信息失败,丢弃任务',$next_open_id);
- Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 写同步数据信息失败,丢弃任务');
- $this->consoleOut->error('SyncUser->Info: ChannelID:'.$channel_id.' 写同步数据信息失败,丢弃任务');
- return false;
- }
- $this->pull_max_number[$channel_id] = 0;
- $this->pull_fail_number[$channel_id] = 0;
- while($fans_list = $this->getWeChatUsers($channel_id,$next_open_id)){
- if(isset($fans_list['total'])){
- model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->update(['fans_total'=>$fans_list['total']]);
- model('SyncUser')->getConnection()->free();
- model('SyncUser')->getConnection()->close();
- }
- if(isset($fans_list['data']['openid'])){
- $mq = new Rabbitmq();
- if($open_chunk = array_chunk($fans_list['data']['openid'],1000)){
- foreach($open_chunk as $val){
- $fans_list['count'] = count($val);
- $fans_list['data']['openid'] = $val;
- model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('queue_total');
- model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('pull_success_number');
- model('SyncUser')->getConnection()->free();
- model('SyncUser')->getConnection()->close();
- $mq->send(['channel_id'=>$channel_id,'fans_list'=>$fans_list],self::SYNC_SCRIPT_MQ_QUEUE_NAME,self::SYNC_SCRIPT_EXCHANGE_QUEUE_NAME);
- }
- }
- $next_open_id = $fans_list['next_openid'];
- }
- }
- if(isset($this->pull_max_number[$channel_id]) && $this->pull_max_number[$channel_id] >= self::PULL_MAX_FAIL_NUMBER){
- $this->sendWorkMsg($channel_id,$this->err_message,$next_open_id);
- return false; //拉取失败次数超过固定值时判定为失败
- }
- return true;
- }
- public function sendWorkMsg($channel_id,$errMsg = '',$next_open_id =''){
- try{
- $configModel = new dbconfig();
- $siteconfig = $configModel->getConfigSiteArr();
- $theme = $siteconfig['theme'];
- $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
- if($adminExtend = model('AdminExtend')->getInfo($channel_id)){
- $extend_info = model('Admin')->where('id',$adminExtend['create_by'])->find();
- model('Admin')->getConnection()->free();
- model('Admin')->getConnection()->close();
- }
- model('AdminConfig')->getConnection()->free();
- model('AdminConfig')->getConnection()->close();
- model('AdminExtend')->getConnection()->free();
- model('AdminExtend')->getConnection()->close();
- $configModel->getConnection()->free();
- $configModel->getConnection()->close();
- $k_nickname = isset($extend_info) ? $extend_info['nickname'] ?? '' : '';
- $k_username = isset($extend_info) ? $extend_info['username'] ?? '' : '';
- $k_id = isset($adminExtend) ? $adminExtend['create_by'] ?? '' : '';
- $admin_info = model('AdminConfig')
- ->join('admin', 'admin.id=admin_config.admin_id','LEFT')
- ->field('admin_config.*,admin.id,admin.username,admin.nickname')
- ->where('admin.id','=',$channel_id)
- ->find();
- model('AdminConfig')->getConnection()->free();
- model('AdminConfig')->getConnection()->close();
- $themeName = '';
- switch($theme){
- case 'qy':
- $themeName = '袋鼠';
- break;
- case 'sf':
- $themeName = '沙发';
- break;
- case 'ms':
- $themeName = '美书';
- break;
- case '':
- $themeName = '西瓜';
- break;
- default:
- $themeName = '';
- }
- $msg = "发现异常!" . date('Y-m- H:i:s');
- $msg .= "\n公众号:" . ($adminConfig['json']['authorizer_info']['nick_name']??'');
- $msg .= "\nAPPID:".$adminConfig['appid'] ?? '';
- $msg .= "\n原始ID:" . ($adminConfig['json']['authorizer_info']['user_name']??'');
- $msg .= "\n渠道ID:" . $adminConfig['admin_id'] ?? $channel_id;
- $msg .= "\n渠道账号:".$admin_info['username'] ?? '';
- $msg .= "\n渠道昵称:".$admin_info['nickname'] ?? '';
- $msg .= "\n开户人ID:".$k_id;
- $msg .= "\n开户人账号:".$k_username;
- $msg .= "\n开户人昵称:".$k_nickname;
- $msg .= "\nnext_openid:".$next_open_id;
- $msg .="\n模板:".$themeName;
- $msg .= "\n错误信息:".$errMsg;
- $this->sendWorkChatMessage($msg);
- }catch (\Exception $e){
- Log::error('SyncUser->Info:发送企业消息失败,ChannelID:'.$channel_id);
- }
- }
- protected function insertSyncData($channel_id,$appid){
- if(!model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$appid])->find()){
- if(!$ins_id = model('SyncUser')->insertGetId(['admin_id'=>$channel_id,'appid'=>$appid,'sync_status'=>1,'createtime'=>time(),'updatetime'=>time()])){
- model('SyncUser')->getConnection()->free();
- model('SyncUser')->getConnection()->close();
- $this->consoleOut->error('syncUser->writeMysql: ChannelID:'.$channel_id.' appid:'.$appid.' Write Mysql Fail');
- Log::error('syncUser->writeMysql: ChannelID:'.$channel_id.' appid:'.$appid.' Write Mysql Fail');
- return false;
- }else{
- model('SyncUser')->getConnection()->free();
- model('SyncUser')->getConnection()->close();
- $this->consoleOut->info('syncUser->writeMysql: ChannelID:'.$channel_id.' appid:'.$appid.' Write Mysql Success');
- Log::info('syncUser->writeMysql: ChannelID:'.$channel_id.' appid:'.$appid.' Write Mysql Success');
- return true;
- }
- }else{
- //有数据时,重置数据
- model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$appid])->update([
- 'sync_status'=>1,
- 'fans_total'=>0,
- 'pull_success_number'=>0,
- 'pull_fail_number'=>0,
- 'queue_total'=>0,
- 'queue_run' => 0,
- 'sync_status' => 0,
- 'sync_success_number' => 0,
- 'sync_fail_number' => 0,
- 'sync_skip_number' => 0,
- 'updatetime' => time()
- ]);
- model('SyncUser')->getConnection()->free();
- model('SyncUser')->getConnection()->close();
- }
- return true;
- }
- protected function syncHandler(){
- try {
- $mq = new Rabbitmq();
- $mq->receive(self::SYNC_SCRIPT_MQ_QUEUE_NAME, function (AMQPMessage $msg){
- $this->err_mq_msg = $msg;
- $channel = $msg->delivery_info['channel'];
- $data = json_decode($msg->body, true);
- $channel_id = isset($data['channel_id']) ? $data['channel_id'] : null;
- $fans_list = isset($data['fans_list']) ? $data['fans_list'] : [];
- $this->consoleOut->info('SyncUser->Info:接收到消费信息 Channel_id:'.$channel_id);
- Log::info('SyncUser->Info:接收到消费信息 Channel_id:'.$channel_id);
- if($channel_id && $fans_list && isset($fans_list['data']['openid'])){
- try{
- $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
- model('AdminConfig')->getConnection()->free();
- model('AdminConfig')->getConnection()->close();
- if($adminConfig['appid'] && $adminConfig['refresh_token']){
- foreach ($fans_list['data']['openid'] as $key => $open_id){
- Log::info('SyncUser->Info: Total:'.$fans_list['total'].' Count:'.$fans_list['count'].' Current:'.$key);
- $this->consoleOut->info('SyncUser->Info: Total:'.$fans_list['total'].' Count:'.$fans_list['count'].' Current:'.$key);
- if($wechat = $this->getChannelWechat($channel_id)){
- $this->weChatInsertUser($channel_id,$open_id,$wechat);
- }else{
- Log::error('SyncUser->Info: 获取渠道信息失败,丢弃消息 ChannelID:'.$channel_id.' Fans:'.json_encode($fans_list));
- $this->consoleOut->error('SyncUser->Info: 获取渠道信息失败,丢弃消息 ChannelID:'.$channel_id.' Fans:'.json_encode($fans_list));
- }
- }
- $channel->basic_ack($msg->delivery_info['delivery_tag']);
- $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
- model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('queue_run');
- $queue_total = model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->value('queue_total');
- $queue_run = model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->value('queue_run');
- if($queue_total >= $queue_run){
- model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->update(['sync_status'=>2]);
- model('SyncUser')->getConnection()->free();
- model('SyncUser')->getConnection()->close();
- }
- model('AdminConfig')->getConnection()->free();
- model('AdminConfig')->getConnection()->close();
- model('SyncUser')->getConnection()->free();
- model('SyncUser')->getConnection()->close();
- }else{
- Log::error('SyncUser->Info: 获取 appid or refresh_token 失败 ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body);
- $this->consoleOut->error('SyncUser->Info: 获取 appid or refresh_token 失败 ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body);
- $channel->basic_ack($msg->delivery_info['delivery_tag']);
- }
- }catch (\Exception $e){
- Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body.' Handle异常:'.$e->getMessage());
- $channel->basic_ack($msg->delivery_info['delivery_tag']);
- }catch (thinkException $e){
- Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body.' Handle异常:'.$e->getMessage());
- $channel->basic_ack($msg->delivery_info['delivery_tag']);
- }catch (\Error $e){
- Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body.' Handle异常:'.$e->getMessage());
- $channel->basic_ack($msg->delivery_info['delivery_tag']);
- }catch (\Throwable $e){
- Log::error('SyncUser->Info: ChannelID:'.$channel_id.' 丢弃任务:'.$msg->body.' Handle异常:'.$e->getMessage());
- $channel->basic_ack($msg->delivery_info['delivery_tag']);
- }
- }
- $this->consoleOut->info('SyncUser->Info:消费信息处理完毕 Channel_id:'.$channel_id);
- Log::info('SyncUser->Info:消费信息处理完毕 Channel_id:'.$channel_id);
- exit(0);
- });
- } catch (\Exception $e) {
- Log::error("MQ 同步用户数据处理异常 触发异常!message:" . $e->getMessage());
- $this->consoleOut->writeln("MQ 同步用户数据处理异常 触发异常!message:" . $e->getMessage());
- exit(0);
- }
- }
- /**
- * 获取要处理的粉丝总数
- * @param $channel_id
- * @param $next_open_id
- * @return bool|mixed
- */
- protected function getWeChatUsers($channel_id,$next_open_id){
- try{
- if(isset($this->pull_max_number[$channel_id]) && $this->pull_max_number[$channel_id] >= self::PULL_MAX_FAIL_NUMBER){
- return false; //拉取失败次数超过固定值时不再重试
- }
- if(!$officialAccount = $this->getChannelWechat($channel_id)){
- $this->pull_fail_number[$channel_id]++;
- $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
- model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('pull_fail_number');
- model('AdminConfig')->getConnection()->free();
- model('AdminConfig')->getConnection()->close();
- model('SyncUser')->getConnection()->free();
- model('SyncUser')->getConnection()->close();
- $this->consoleOut->error('获取渠道微信配置失败!');
- return true; //获取失败时重新获取
- }
- if(!$result = $officialAccount->user->list($next_open_id)){
- $this->pull_fail_number[$channel_id]++;
- $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
- model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('pull_fail_number');
- model('AdminConfig')->getConnection()->free();
- model('AdminConfig')->getConnection()->close();
- model('SyncUser')->getConnection()->free();
- model('SyncUser')->getConnection()->close();
- $this->consoleOut->error('获取渠道关注用户失败!');
- return true; //拉取失败时,重新拉取
- }
- if(isset($result['count']) && !empty($result['count'])){
- $this->pull_max_number[$channel_id] = 0;
- return $result;
- }
- if(isset($result['count']) && empty($result['count'])){
- $this->pull_max_number[$channel_id] = 0;
- return false; //处理数据为空时停止循环
- }
- }catch (\Exception $e){
- $this->pull_fail_number[$channel_id]++;
- $this->pull_max_number[$channel_id]++;
- $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
- model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('pull_fail_number');
- model('AdminConfig')->getConnection()->free();
- model('AdminConfig')->getConnection()->close();
- model('SyncUser')->getConnection()->free();
- model('SyncUser')->getConnection()->close();
- $this->err_message = $e->getMessage();
- $this->consoleOut->error("调用微信接口异常,Error:".$e->getMessage());
- Log::error("调用微信接口异常,Error:".$e->getMessage());
- return true; //异常时,从新执行
- }
- }
- /**
- * 获取渠道对象
- * @param $channel_id
- * @return \EasyWeChat\OpenPlatform\Authorizer\OfficialAccount\Application
- */
- private function getChannelWechat($channel_id){
- try{
- $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
- print_r(model('AdminConfig')->query("select connection_id();"));
- model('AdminConfig')->getConnection()->free();
- model('AdminConfig')->getConnection()->close();
- if(!$adminConfig){
- $this->consoleOut->error('获取渠道配置信息失败');
- return false;
- }
- $wechat = new WeChatObject($adminConfig);
- $officialAccount = $wechat->getOfficialAccount();
- return $officialAccount;
- }catch (\Exception $e){
- $this->consoleOut->error('获取OfficialAccount失败 ChannelID:'.$channel_id);
- Log::error('SyncUser->Info: 获取OfficialAccount失败 ChannelID:'.$channel_id);
- return null;
- }
- }
- /**
- * 获取内存使用情况
- * @return string
- */
- protected function memory_usage() {
- $memory = ( ! function_exists('memory_get_usage')) ? '0' : round(memory_get_usage()/1024/1024, 2).'MB';
- return $memory;
- }
- /**
- * 微信关注时创建用户
- * @param $channel_id 渠道ID
- * @param $open_id OPENID
- * @param $weChat \EasyWeChat\OpenPlatform\Authorizer\OfficialAccount\Application EasyWeChat对象
- * @return bool
- */
- public function weChatInsertUser($channel_id,$open_id,\EasyWeChat\OpenPlatform\Authorizer\OfficialAccount\Application $weChat){
- try{
- //获取基本信息
- $user_id = model('openid')->getUserId($channel_id,$open_id);
- model('openid')->getConnection()->free();
- model('openid')->getConnection()->close();
- $user_info = null;
- if($user_id){
- $user_info = model('user')->setConnect($user_id)->getUserInfo($user_id);
- model('user')->setConnect($user_id)->getConnection()->free();
- model('user')->setConnect($user_id)->getConnection()->close();
- }
- if($user_id && $user_info){
- $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
- //跳过用户加一
- model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('sync_skip_number');
- model('SyncUser')->getConnection()->free();
- model('SyncUser')->getConnection()->close();
- $this->consoleOut->info('syncUser->createUser: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Exists Skip Next User');
- Log::info('syncUser->createUser: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Exists Skip Next User');
- }
- $time = time();
- //Redis 加锁
- $redis = Redis::instanceCache();
- $nxKey = 'L:'.$channel_id.':'.$open_id;
- if(!$user_id || !$user_info){
- if(!$nxRes = $redis->setnx($nxKey,0)){ //已经存在
- return false;
- }
- $redis->expire($nxKey,20); //20秒过期
- }
- //用户ID不存在时创建用户ID
- if(!$user_id){
- $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
- if(!$user_id = $this->createUserID($channel_id,$open_id,$time)){
- $redis->del($nxKey);
- model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('sync_fail_number');
- model('SyncUser')->getConnection()->free();
- model('SyncUser')->getConnection()->close();
- model('AdminConfig')->getConnection()->free();
- model('AdminConfig')->getConnection()->close();
- $this->consoleOut->error('syncUser->createUserID: ChannelID:'.$channel_id.' OpenID:'.$open_id.' Create Fail');
- Log::error('syncUser->createUserID: ChannelID:'.$channel_id.' OpenID:'.$open_id.' Create Fail');
- return false;
- }
- $this->consoleOut->info('syncUser->createUserID: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Success');
- Log::info('syncUser->createUserID: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Success');
- }
- //用户基本信息不存在时创建用户基本信息
- if(!$user_info){
- $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
- if(!$info_id = $this->createUserInfo($weChat,$channel_id,$open_id,$user_id,$time)){
- $redis->del($nxKey);
- model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('sync_fail_number');
- model('AdminConfig')->getConnection()->free();
- model('AdminConfig')->getConnection()->close();
- model('SyncUser')->getConnection()->free();
- model('SyncUser')->getConnection()->close();
- $this->consoleOut->error('syncUser->createUserInfo: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Fail');
- Log::error('syncUser->createUserInfo: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Fail');
- return false;
- }
- model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('sync_success_number');
- model('AdminConfig')->getConnection()->free();
- model('AdminConfig')->getConnection()->close();
- model('SyncUser')->getConnection()->free();
- model('SyncUser')->getConnection()->close();
- $this->consoleOut->info('syncUser->createUserInfo: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Success');
- Log::info('syncUser->createUserInfo: ChannelID:'.$channel_id.' OpenID:'.$open_id.' UserID:'.$user_id.' Create Success');
- }
- $redis->del($nxKey);
- model('AdminConfig')->getConnection()->free();
- model('AdminConfig')->getConnection()->close();
- return true;
- }catch (\Exception $e){
- $adminConfig = model('AdminConfig')->getAdminInfoAll($channel_id);
- model('SyncUser')->where(['admin_id'=>$channel_id,'appid'=>$adminConfig['appid']])->setInc('sync_fail_number');
- model('SyncUser')->getConnection()->free();
- model('SyncUser')->getConnection()->close();
- model('AdminConfig')->getConnection()->free();
- model('AdminConfig')->getConnection()->close();
- Log::error('weChatInsertUser Error:'.$e->getMessage());
- return false;
- }
- }
- /**
- * 创建用户ID
- * @param $channel_id 渠道ID
- * @param $open_id 微信OPEN_ID
- * @param $time 创建时间
- * @return int|null
- */
- public function createUserID($channel_id,$open_id,$time){
- try{
- $redisAuto = Redis::instanceAuto();
- //redis自增返回新的user_id
- $userId = $redisAuto->incr('UID');
- $data = ['channel_openid' => $channel_id . '_' . $open_id,'user_id' => $userId,'createtime' => $time,'updatetime' => $time];
- if($autoId = model('openid')->setConnect($channel_id, $open_id)->insertGetId($data)){
- model('openid')->setConnect($channel_id, $open_id)->getConnection()->free();
- model('openid')->setConnect($channel_id, $open_id)->getConnection()->close();
- return $userId;
- }
- model('openid')->setConnect($channel_id, $open_id)->getConnection()->free();
- model('openid')->setConnect($channel_id, $open_id)->getConnection()->close();
- Log::error('syncUser->createUserID: 创建用户ID失败,OPENID表写入失败 Data:'.json_encode($data));
- }catch (\Exception $e){
- Log::error('syncUser->createUserID: 创建用户ID失败,Error:'.$e->getMessage());
- }
- return null;
- }
- /**
- * 创建用户基本信息
- * @param $weChat EasyWeChat对象
- * @param $channel_id 渠道ID
- * @param $open_id 微信OPEN_ID
- * @param $user_id 用户ID
- * @param $time 创建时间
- * @return null
- */
- public function createUserInfo($weChat,$channel_id,$open_id,$user_id,$time){
- try{
- $weChatUser = $weChat->user->get($open_id);
- if(!empty($weChatUser) && !isset($weChatUser['errcode'])) {
- //获取注册赠送看
- $data = [
- 'id' => $user_id,
- 'openid' => $open_id,
- 'nickname' => $weChatUser['nickname'] ?? '书友',
- 'sex' => $weChatUser['sex'] ?? 0,
- 'first_cancel_pay' => 2,
- 'avatar' => $weChatUser['headimgurl'] ?? cdnurl('/assets/img/frontend/icon/nav_icon_4.png'),
- 'is_subscribe' => $weChatUser['subscribe'] ?? 0,
- 'subscribe_time' => $weChatUser['subscribe_time'] ?? 0,
- 'channel_id' => $channel_id,
- 'country' => $weChatUser['country'] ?? '',
- 'province' => $weChatUser['province'] ?? '',
- 'city' => $weChatUser['city'] ?? '',
- 'createtime' => $time,
- 'updatetime' => $time
- ];
- if($user_info_id = model('User')->setConnect($data['id'])->insertGetId($data)){
- model('User')->setConnect($data['id'])->getConnection()->free();
- model('User')->setConnect($data['id'])->getConnection()->close();
- try {
- $ids = explode(',', Config::get('site.give_shubi_channel_id'));
- if (in_array($channel_id, $ids)) {
- $kandian_reg = 3000;
- $endTime = intval(Config::get('site.kandian_free_day')) * 86400;
- $rdata = [];
- $rdata['free_kandian'] = $kandian_reg;
- $rdata['remain_free_kandian'] = $kandian_reg;
- $rdata['type'] = 3;
- $rdata['user_id'] = $user_info_id;
- $rdata['free_endtime'] = $endTime + $time;
- $rdata['notes'] = '新用户注册赠送';
- $rdata['createtime'] = $time;
- $rdata['updatetime'] = $time;
- FinancialService::instance()->getRechargeModel()->setConnect($user_info_id)->insertGetId($rdata);
- Redis::instance()->del('ZR:' . $user_info_id);
- }
- } catch (\Exception $e) {
- Log::error('赠送书币失败!!');
- }
- return $user_info_id;
- }
- model('User')->setConnect($data['id'])->getConnection()->free();
- model('User')->setConnect($data['id'])->getConnection()->close();
- Log::error('syncUser->createUserInfo: 创建用户基本信息失败,USER表写入失败 Data:' . json_encode($data));
- }
- }catch (\Exception $e){
- Log::error('syncUser->createUserInfo: 创建用户基本信息失败,Error:' . $e->getMessage());
- }
- return null;
- }
- //发企业微信
- public function sendWorkChatMessage($content)
- {
- if (empty($content)) {
- return false;
- }
- $wechat = Config::get('wechat');
- $wechat['http']['base_uri'] = $wechat['work']['base_uri'];
- $wechat['http']['timeout'] = 20;
- $wechat['corp_id'] = $wechat['work']['corp_id'];
- $wechat['secret'] = $wechat['work']['sync_secret'];
- $app = Factory::work($wechat);
- $app['cache'] = new RedisCache(Redis::instanceCache());
- $res = $app->message
- ->message($content)
- ->ofAgent($wechat['work']['sync_agent_id'])
- ->toParty($wechat['work']['sync_party_id'])
- ->send();
- return $res;
- }
- }
|