123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- <?php
- /*
- * 消息队里 取消充值客服消息延迟队列
- */
- namespace app\admin\command;
- use app\common\library\Rabbitmq;
- use app\common\library\Redis;
- use app\common\library\Ssdb;
- use app\common\library\WeChatObject;
- use app\common\model\Activity;
- use EasyWeChat\Factory;
- use EasyWeChat\Kernel\Messages\Text;
- use EasyWeChat\Kernel\Messages\News;
- use EasyWeChat\Kernel\Messages\NewsItem;
- use PhpAmqpLib\Message\AMQPMessage;
- use Symfony\Component\Cache\Simple\RedisCache;
- use think\Config;
- use think\console\Command;
- use think\console\Input;
- use think\console\Output;
- use app\common\model\AdminConfig;
- use Think\Exception;
- use think\Log;
- use app\common\model\Config as dbconfig;
- use think\Request;
- use app\common\constants\User;
- class MqNewPayCancel extends Command
- {
- protected function configure()
- {
- $this->setName('MqNewPayCancel')->setDescription('新用户取消充值延时图文消息队列任务');
- }
- protected function execute(Input $input, Output $output)
- {
- Request::instance()->module('admin'); //cli模式下无法获取到当前的项目模块,手动指定一下
- try {
- $mq = new Rabbitmq();
- $mq->receive('Q_NewPayCancelReceive', function (AMQPMessage $msg) use ($input, $output) {
- $data = json_decode($msg->body, true);
- Log::info("MQ 新用户取消充值 处理 用户ID:{$data['user_id']} OPENID:{$data['openid']} 商品ID:{$data['goods_id']}");
- $output->writeln("MQ 新用户取消充值 处理 用户ID:{$data['user_id']} OPENID:{$data['openid']} 商品ID:{$data['goods_id']}");
- $this->sendMessage($data, $input, $output); // 发送图文
- $channel = $msg->delivery_info['channel'];
- $channel->basic_ack($msg->delivery_info['delivery_tag']);
- });
- } catch (Exception $e) {
- Log::error("MQ 新用户取消充值 触发异常!message:" . $e->getMessage());
- $output->writeln("MQ 新用户取消充值 触发异常!message:" . $e->getMessage());
- }
- }
- /**
- * 发送图文消息
- *
- * @param $data
- * @return bool true发送 false取消
- */
- private function sendMessage($data, Input $input, Output $output)
- {
- try {
- //查看标记状态
- $userInfo = model('User')->getUserInfo($data['user_id']);
- $ispay = $userInfo['first_cancel_pay'] ?? 0;
- //查看活动状态
- $activity = new Activity();
- $activityinfo = $activity->get(100);
- if (!$ispay) {
- Log::info("MQ 新用户取消充值 处理 已充值!自动跳过!");
- $output->writeln("MQ 新用户取消充值 处理 已充值!自动跳过!");
- $activity->getConnection()->free();
- $activity->getConnection()->close();
- unset($activity);
- return false;
- } else if($activityinfo['status']!=1){
- Log::info("MQ 新用户取消充值 活动结束!自动跳过!");
- $output->writeln("MQ 新用户取消充值 活动结束!自动跳过!");
- $activity->getConnection()->free();
- $activity->getConnection()->close();
- unset($activity);
- return false;
- } else{
- //发送消息
- $adminConfig = new AdminConfig();
- $info = $adminConfig->getAdminInfoAll($data['channel_id']);
- $wechat = new WeChatObject($info);
- $officialAccount = $wechat->getOfficialAccount();
- $officialAccount
- ->customer_service
- ->message(new News([new NewsItem($data['msg'])]))
- ->to($data['openid'])
- ->send();
- $adminConfig->getConnection()->free();
- $adminConfig->getConnection()->close();
- $activity->getConnection()->free();
- $activity->getConnection()->close();
- unset($adminConfig);
- unset($activity);
- return true;
- }
- } catch (\Exception $exception) {
- Log::error("MQ 新用户取消充值 图文消息触发异常!message:" . $exception->getMessage());
- $output->writeln("MQ 新用户取消充值 图文消息触发异常!message:" . $exception->getMessage());
- return false;
- }
- }
- }
|