setName('MqPayCancel')->setDescription('取消充值延时客服消息队列任务'); } protected function execute(Input $input, Output $output) { Request::instance()->module('admin'); //cli模式下无法获取到当前的项目模块,手动指定一下 try { $mq = new Rabbitmq(); $mq->receive('Q_PayCancelReceive', function (AMQPMessage $msg) use ($input, $output) { $data = json_decode($msg->body, true); Log::info("MQ 取消充值 处理 用户ID:{$data['user_id']} OPENID:{$data['openid']} 商品ID:{$data['goods_id']}"); $output->writeln("MQ 取消充值 处理 用户ID:{$data['user_id']} OPENID:{$data['openid']} 商品ID:{$data['goods_id']}"); $this->sendMessage($data, $input, $output); // 发送客服消息 $channel = $msg->delivery_info['channel']; $channel->basic_ack($msg->delivery_info['delivery_tag']); }); } catch (Exception $e) { Log::error("MQ 取消充值 触发异常!message:" . $e->getMessage()); $output->writeln("MQ 取消充值 触发异常!message:" . $e->getMessage()); } } /** * 发送客服消息 * * @param $data * @return bool true发送 false取消 */ private function sendMessage($data, Input $input, Output $output) { try { //查看标记状态 $redis = Redis::instance(); $payCancelKey = "O-C:{$data['user_id']}:{$data['goods_id']}"; $status = $redis->get($payCancelKey); if ($status == 2) { Log::info("MQ 取消充值 处理 已充值!自动跳过!"); $output->writeln("MQ 取消充值 处理 已充值!自动跳过!"); //支付完成不推消息并清除标记 $redis->del($payCancelKey); return false; } else { //发送消息 $adminConfig = new AdminConfig(); $info = $adminConfig->getAdminInfoAll($data['channel_id']); $wechat = new WeChatObject($info); $officialAccount = $wechat->getOfficialAccount(); $officialAccount ->customer_service ->message(new Text($data['msg'])) ->to($data['openid']) ->send(); $adminConfig->getConnection()->free(); $adminConfig->getConnection()->close(); unset($adminConfig); return true; } } catch (\Exception $exception) { Log::error("MQ 取消充值 微信客服消息触发异常!message:" . $exception->getMessage()); $output->writeln("MQ 取消充值 微信客服消息触发异常!message:" . $exception->getMessage()); return false; } } }