MqPayCancel.php 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. <?php
  2. /*
  3. * 消息队里 取消充值客服消息延迟队列
  4. */
  5. namespace app\admin\command;
  6. use app\common\library\Rabbitmq;
  7. use app\common\library\Redis;
  8. use app\common\library\WeChatObject;
  9. use EasyWeChat\Factory;
  10. use EasyWeChat\Kernel\Messages\Text;
  11. use PhpAmqpLib\Message\AMQPMessage;
  12. use Symfony\Component\Cache\Simple\RedisCache;
  13. use think\Config;
  14. use think\console\Command;
  15. use think\console\Input;
  16. use think\console\Output;
  17. use app\common\model\AdminConfig;
  18. use Think\Exception;
  19. use think\Log;
  20. use app\common\model\Config as dbconfig;
  21. use think\Request;
  22. class MqPayCancel extends Command
  23. {
  24. protected function configure()
  25. {
  26. $this->setName('MqPayCancel')->setDescription('取消充值延时客服消息队列任务');
  27. }
  28. protected function execute(Input $input, Output $output)
  29. {
  30. Request::instance()->module('admin'); //cli模式下无法获取到当前的项目模块,手动指定一下
  31. try {
  32. $mq = new Rabbitmq();
  33. $mq->receive('Q_PayCancelReceive', function (AMQPMessage $msg) use ($input, $output) {
  34. $data = json_decode($msg->body, true);
  35. Log::info("MQ 取消充值 处理 用户ID:{$data['user_id']} OPENID:{$data['openid']} 商品ID:{$data['goods_id']}");
  36. $output->writeln("MQ 取消充值 处理 用户ID:{$data['user_id']} OPENID:{$data['openid']} 商品ID:{$data['goods_id']}");
  37. $this->sendMessage($data, $input, $output); // 发送客服消息
  38. $channel = $msg->delivery_info['channel'];
  39. $channel->basic_ack($msg->delivery_info['delivery_tag']);
  40. });
  41. } catch (Exception $e) {
  42. Log::error("MQ 取消充值 触发异常!message:" . $e->getMessage());
  43. $output->writeln("MQ 取消充值 触发异常!message:" . $e->getMessage());
  44. }
  45. }
  46. /**
  47. * 发送客服消息
  48. *
  49. * @param $data
  50. * @return bool true发送 false取消
  51. */
  52. private function sendMessage($data, Input $input, Output $output)
  53. {
  54. try {
  55. //查看标记状态
  56. $redis = Redis::instance();
  57. $payCancelKey = "O-C:{$data['user_id']}:{$data['goods_id']}";
  58. $status = $redis->get($payCancelKey);
  59. if ($status == 2) {
  60. Log::info("MQ 取消充值 处理 已充值!自动跳过!");
  61. $output->writeln("MQ 取消充值 处理 已充值!自动跳过!");
  62. //支付完成不推消息并清除标记
  63. $redis->del($payCancelKey);
  64. return false;
  65. } else {
  66. //发送消息
  67. $adminConfig = new AdminConfig();
  68. $info = $adminConfig->getAdminInfoAll($data['channel_id']);
  69. $wechat = new WeChatObject($info);
  70. $officialAccount = $wechat->getOfficialAccount();
  71. $officialAccount
  72. ->customer_service
  73. ->message(new Text($data['msg']))
  74. ->to($data['openid'])
  75. ->send();
  76. $adminConfig->getConnection()->free();
  77. $adminConfig->getConnection()->close();
  78. unset($adminConfig);
  79. return true;
  80. }
  81. } catch (\Exception $exception) {
  82. Log::error("MQ 取消充值 微信客服消息触发异常!message:" . $exception->getMessage());
  83. $output->writeln("MQ 取消充值 微信客服消息触发异常!message:" . $exception->getMessage());
  84. return false;
  85. }
  86. }
  87. }