MqNewPayCancel.php 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. <?php
  2. /*
  3. * 消息队里 取消充值客服消息延迟队列
  4. */
  5. namespace app\admin\command;
  6. use app\common\library\Rabbitmq;
  7. use app\common\library\Redis;
  8. use app\common\library\Ssdb;
  9. use app\common\library\WeChatObject;
  10. use app\common\model\Activity;
  11. use EasyWeChat\Factory;
  12. use EasyWeChat\Kernel\Messages\Text;
  13. use EasyWeChat\Kernel\Messages\News;
  14. use EasyWeChat\Kernel\Messages\NewsItem;
  15. use PhpAmqpLib\Message\AMQPMessage;
  16. use Symfony\Component\Cache\Simple\RedisCache;
  17. use think\Config;
  18. use think\console\Command;
  19. use think\console\Input;
  20. use think\console\Output;
  21. use app\common\model\AdminConfig;
  22. use Think\Exception;
  23. use think\Log;
  24. use app\common\model\Config as dbconfig;
  25. use think\Request;
  26. use app\common\constants\User;
  27. class MqNewPayCancel extends Command
  28. {
  29. protected function configure()
  30. {
  31. $this->setName('MqNewPayCancel')->setDescription('新用户取消充值延时图文消息队列任务');
  32. }
  33. protected function execute(Input $input, Output $output)
  34. {
  35. Request::instance()->module('admin'); //cli模式下无法获取到当前的项目模块,手动指定一下
  36. try {
  37. $mq = new Rabbitmq();
  38. $mq->receive('Q_NewPayCancelReceive', function (AMQPMessage $msg) use ($input, $output) {
  39. $data = json_decode($msg->body, true);
  40. Log::info("MQ 新用户取消充值 处理 用户ID:{$data['user_id']} OPENID:{$data['openid']} 商品ID:{$data['goods_id']}");
  41. $output->writeln("MQ 新用户取消充值 处理 用户ID:{$data['user_id']} OPENID:{$data['openid']} 商品ID:{$data['goods_id']}");
  42. $this->sendMessage($data, $input, $output); // 发送图文
  43. $channel = $msg->delivery_info['channel'];
  44. $channel->basic_ack($msg->delivery_info['delivery_tag']);
  45. });
  46. } catch (Exception $e) {
  47. Log::error("MQ 新用户取消充值 触发异常!message:" . $e->getMessage());
  48. $output->writeln("MQ 新用户取消充值 触发异常!message:" . $e->getMessage());
  49. }
  50. }
  51. /**
  52. * 发送图文消息
  53. *
  54. * @param $data
  55. * @return bool true发送 false取消
  56. */
  57. private function sendMessage($data, Input $input, Output $output)
  58. {
  59. try {
  60. //查看标记状态
  61. $userInfo = model('User')->getUserInfo($data['user_id']);
  62. $ispay = $userInfo['first_cancel_pay'] ?? 0;
  63. //查看活动状态
  64. $activity = new Activity();
  65. $activityinfo = $activity->get(100);
  66. if (!$ispay) {
  67. Log::info("MQ 新用户取消充值 处理 已充值!自动跳过!");
  68. $output->writeln("MQ 新用户取消充值 处理 已充值!自动跳过!");
  69. $activity->getConnection()->free();
  70. $activity->getConnection()->close();
  71. unset($activity);
  72. return false;
  73. } else if($activityinfo['status']!=1){
  74. Log::info("MQ 新用户取消充值 活动结束!自动跳过!");
  75. $output->writeln("MQ 新用户取消充值 活动结束!自动跳过!");
  76. $activity->getConnection()->free();
  77. $activity->getConnection()->close();
  78. unset($activity);
  79. return false;
  80. } else{
  81. //发送消息
  82. $adminConfig = new AdminConfig();
  83. $info = $adminConfig->getAdminInfoAll($data['channel_id']);
  84. $wechat = new WeChatObject($info);
  85. $officialAccount = $wechat->getOfficialAccount();
  86. $officialAccount
  87. ->customer_service
  88. ->message(new News([new NewsItem($data['msg'])]))
  89. ->to($data['openid'])
  90. ->send();
  91. $adminConfig->getConnection()->free();
  92. $adminConfig->getConnection()->close();
  93. $activity->getConnection()->free();
  94. $activity->getConnection()->close();
  95. unset($adminConfig);
  96. unset($activity);
  97. return true;
  98. }
  99. } catch (\Exception $exception) {
  100. Log::error("MQ 新用户取消充值 图文消息触发异常!message:" . $exception->getMessage());
  101. $output->writeln("MQ 新用户取消充值 图文消息触发异常!message:" . $exception->getMessage());
  102. return false;
  103. }
  104. }
  105. }