config = array_merge(Config::get('rabbitmq'), $config); } /** * 处理消息格式 A Message for use with the Channnel.basic_* methods. * * @param $msg * @param $message_durable * @return AMQPMessage */ public function message($msg, $message_durable) { if (is_array($msg)) { $msg = json_encode($msg); } if (!is_object($msg) && $message_durable === true) { return new AMQPMessage($msg, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); } return new AMQPMessage($msg); } /** * acknowledge one or more messages * * @param string $delivery_tag */ public function basicAck($delivery_tag) { $this->channel->basic_ack($delivery_tag); } /** * @param $data * @param $exchange_name * @param string $routing_key * @param string $type * @param bool $queue_durable * @param bool $message_durable */ public function transferExchange( $data, $exchange_name, $routing_key = '', $type = 'topic', $queue_durable = false, $message_durable = false ) { Log::info('MQ:data::' . print_r(json_encode($data), true)); Log::info('MQ:data::' . print_r(json_encode($this->config), true)); Log::info('MQ:data:: $exchange_name->' . $exchange_name . '::$routing_key->' . $routing_key . '::$type->' . $type); $this->connect = new AMQPStreamConnection( $this->config['host'], $this->config['port'], $this->config['login'], $this->config['password'], $this->config['vhost'] ); $this->channel = $this->connect->channel(); //声明交换机 $this->channel->exchange_declare($exchange_name, $type, false, $message_durable, false, false, false, [], null); //处理压入消息格式 $msg = $this->message($data, $message_durable); //压入消息到交换机 $this->channel->basic_publish($msg, $exchange_name, $routing_key, false, false, null); //关闭 $this->channel->close(); // 关闭信道 $this->connect->close(); // 关闭链接 } /** * 发送延时消息 * @param array $data * @param $routing_key * @param $timeout * @param string $ex_name * @param string $ex_type */ public function transferDelayExchange( array $data, $routing_key, $timeout, $ex_name='cps.ex.delay.topic', $ex_type = 'topic' ){ Log::info("Delay:{$timeout} MQ:data::".print_r($data, true)); $this->connect = new AMQPStreamConnection( $this->config['host'], $this->config['port'], $this->config['login'], $this->config['password'], $this->config['vhost'] ); $this->channel = $this->connect->channel(); //声明交换机 $args = new AMQPTable(["x-delayed-type" => $ex_type]); $this->channel->exchange_declare($ex_name, 'x-delayed-message', false, false, false, false, false, $args, null); //处理压入消息格式 $msg = new AMQPMessage(json_encode($data)); if($timeout){ $msg->set('application_headers', new AMQPTable(["x-delay" => $timeout])); } //压入消息到交换机 $this->channel->basic_publish($msg, $ex_name, $routing_key, false, false, null); //关闭 $this->channel->close(); // 关闭信道 $this->connect->close(); // 关闭链接 } /** * 生产者 生产实时队列 只生产一个队列 * * @param $data * @param $queue * @param $exchange_name * @param string $routing_key * @param string $type * @param bool $queue_durable * @param bool $message_durable */ public function send( $data, $queue, $exchange_name, $routing_key = '', $type = 'direct', $queue_durable = false, $message_durable = false ) { $this->connect = new AMQPStreamConnection( $this->config['host'], $this->config['port'], $this->config['login'], $this->config['password'], $this->config['vhost'] ); $this->channel = $this->connect->channel(); //声明交换机 $this->channel->exchange_declare($exchange_name, $type, false, $message_durable, false, false, false, [], null); //声明队列 $this->channel->queue_declare($queue, false, $queue_durable, false, false, false, [], null); //绑定交换机与队列 $this->channel->queue_bind($queue, $exchange_name, $routing_key, false, [], null); //处理压入消息格式 $msg = $this->message($data, $message_durable); //压入消息到交换机 $this->channel->basic_publish($msg, $exchange_name, $routing_key, false, false, null); //关闭 $this->channel->close(); // 关闭信道 $this->connect->close(); // 关闭链接 } /** * 延时生产者 生产延时队列 生产两个队列(1个延时死信队列,1个实时消费队列) * * @param array|string $data 消息内容 * @param string $delay_queue 延时队列 * @param string $delay_exchange_name 延时交换机 * @param string $delay_routing_key 延时队列routing_key * @param string $delay_type 延时交换机类型 * @param int $delay_expire 延时队列声明周期(过期会触发死信规则) * @param bool $delay_queue_durable 延时队列持久化 * @param bool $delay_message_durable 延时消息持久化 * @param string $receive_queue 接收队列(接收死信规则) * @param string $receive_exchange_name 接收交换机 * @param string $receive_routing_key 接收队列routing_key * @param string $receive_type 接收交换机类型 * @param bool $receive_queue_durable 接收队列持久化 * @param bool $receive_message_durable 接收消息持久化 */ public function sendDelay( $data, $delay_queue, $delay_exchange_name, $delay_routing_key = '', $delay_type = 'direct', $delay_expire = 0, $delay_queue_durable = false, $delay_message_durable = false, $receive_queue, $receive_exchange_name, $receive_routing_key = '', $receive_type = 'direct', $receive_queue_durable = false, $receive_message_durable = false ) { $this->connect = new AMQPStreamConnection( $this->config['host'], $this->config['port'], $this->config['login'], $this->config['password'], $this->config['vhost'] ); $this->channel = $this->connect->channel(); // 声明主队列 <-- 关联主消费交换机(接收) <-- 数据压入 // | 300 / // | / // 关联延时交换机 -> 关联消费队列 // // 延时交换机(弹出数据) -> 消费 //声明死信规则 $tale = new AMQPTable(); $tale->set('x-dead-letter-exchange', $receive_exchange_name); $tale->set('x-dead-letter-routing-key', $receive_routing_key); $tale->set('x-message-ttl', $delay_expire); //声明延时交换机 与 接收交换机 $this->channel->exchange_declare($delay_exchange_name, $delay_type, false, $delay_message_durable, false, false, false, [], null); $this->channel->exchange_declare($receive_exchange_name, $receive_type, false, $receive_message_durable, false, false, false, [], null); //声明延时队列 与 接收队列 $this->channel->queue_declare($delay_queue, false, $delay_queue_durable, false, false, false, $tale, null); $this->channel->queue_declare($receive_queue, false, $receive_queue_durable, false, false, false, [], null); //绑定延时队列到延时交换机 与 绑定接收队列到接收交换机 $this->channel->queue_bind($delay_queue, $delay_exchange_name, $delay_routing_key, false, [], null); $this->channel->queue_bind($receive_queue, $receive_exchange_name, $receive_routing_key, false, [], null); //处理压入消息格式 $msg = $this->message($data, $delay_message_durable); //压入消息到交换机 $this->channel->basic_publish($msg, $delay_exchange_name, $delay_routing_key, false, false, null); //关闭 $this->channel->close(); // 关闭信道 $this->connect->close(); // 关闭链接 } /** * 消费者 只消费一个队列 * * @param string $queue * @param string $consumer_tag * @param bool $no_local * @param bool $no_ack * @param bool $exclusive * @param bool $nowait * @param null $ticket * @param array $arguments */ function receive( $queue = '', // 队列名 $callback = null, // 回调函数 $queue_durable = false, //持久化 $consumer_tag = '', $no_local = false, $no_ack = false, $exclusive = false, //队列是否可以被其他队列访问 $nowait = false, $ticket = null, $arguments = array() ) { $this->connect = new AMQPStreamConnection( $this->config['host'], $this->config['port'], $this->config['login'], $this->config['password'], $this->config['vhost'] ); $this->channel = $this->connect->channel(); //一次只消费一个 $this->channel->basic_qos(0,1,false); //声明队列 $this->channel->queue_declare($queue, false, $queue_durable, false, false, false, [], null); //订阅消费 callback仅绑定并不立即执行 $this->channel->basic_consume($queue, $consumer_tag, $no_local, $no_ack, $exclusive, $nowait, $callback, $ticket, $arguments); //轮训等待并触发basic_consume绑定的callback while (count($this->channel->callbacks)) { $this->channel->wait(); } //关闭 $this->channel->close(); // 关闭信道 $this->connect->close(); // 关闭链接 } // $callback = function ($msg) { // $rabbit = new \app\admin\command\Rabbitmq(); // $rabbit->sendMessage($msg->body); //// $recharge = new Recharge(); //// $recharge->sendMessage($msg->body); // $this->basicAck($msg->delivery_info['delivery_tag']); // }; }