123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- <?php
- namespace app\common\library;
- use PhpAmqpLib\Channel\AMQPChannel;
- use PhpAmqpLib\Connection\AMQPStreamConnection;
- use PhpAmqpLib\Message\AMQPMessage;
- use PhpAmqpLib\Wire\AMQPTable;
- use think\Cache;
- use think\Config;
- use think\Log;
- class Rabbitmq
- {
- /**
- * 配置
- *
- * @var array
- */
- public $config;
- /**
- * @var AMQPChannel
- */
- public $channel;
- /**
- * initialized
- *
- * @var AMQPStreamConnection
- */
- public $connect;
- public function __construct(array $config = [])
- {
- $this->config = array_merge(Config::get('rabbitmq'), $config);
- }
- /**
- * 处理消息格式 A Message for use with the Channnel.basic_* methods.
- *
- * @param $msg
- * @param $message_durable
- * @return AMQPMessage
- */
- public function message($msg, $message_durable)
- {
- if (is_array($msg)) {
- $msg = json_encode($msg);
- }
- if (!is_object($msg) && $message_durable === true) {
- return new AMQPMessage($msg, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
- }
- return new AMQPMessage($msg);
- }
- /**
- * acknowledge one or more messages
- *
- * @param string $delivery_tag
- */
- public function basicAck($delivery_tag)
- {
- $this->channel->basic_ack($delivery_tag);
- }
- /**
- * @param $data
- * @param $exchange_name
- * @param string $routing_key
- * @param string $type
- * @param bool $queue_durable
- * @param bool $message_durable
- */
- public function transferExchange(
- $data, $exchange_name, $routing_key = '', $type = 'topic', $queue_durable = false, $message_durable = false
- )
- {
- Log::info('MQ:data::' . print_r(json_encode($data), true));
- Log::info('MQ:data::' . print_r(json_encode($this->config), true));
- Log::info('MQ:data:: $exchange_name->' . $exchange_name . '::$routing_key->' . $routing_key . '::$type->' . $type);
- $this->connect = new AMQPStreamConnection(
- $this->config['host'],
- $this->config['port'],
- $this->config['login'],
- $this->config['password'],
- $this->config['vhost']
- );
- $this->channel = $this->connect->channel();
- //声明交换机
- $this->channel->exchange_declare($exchange_name, $type, false, $message_durable, false, false, false, [], null);
- //处理压入消息格式
- $msg = $this->message($data, $message_durable);
- //压入消息到交换机
- $this->channel->basic_publish($msg, $exchange_name, $routing_key, false, false, null);
- //关闭
- $this->channel->close(); // 关闭信道
- $this->connect->close(); // 关闭链接
- }
- /**
- * 发送延时消息
- * @param array $data
- * @param $routing_key
- * @param $timeout
- * @param string $ex_name
- * @param string $ex_type
- */
- public function transferDelayExchange(
- array $data,
- $routing_key,
- $timeout,
- $ex_name='cps.ex.delay.topic',
- $ex_type = 'topic'
- ){
- Log::info("Delay:{$timeout} MQ:data::".print_r($data, true));
- $this->connect = new AMQPStreamConnection(
- $this->config['host'],
- $this->config['port'],
- $this->config['login'],
- $this->config['password'],
- $this->config['vhost']
- );
- $this->channel = $this->connect->channel();
- //声明交换机
- $args = new AMQPTable(["x-delayed-type" => $ex_type]);
- $this->channel->exchange_declare($ex_name, 'x-delayed-message', false, false, false, false, false, $args, null);
- //处理压入消息格式
- $msg = new AMQPMessage(json_encode($data));
- if($timeout){
- $msg->set('application_headers', new AMQPTable(["x-delay" => $timeout]));
- }
- //压入消息到交换机
- $this->channel->basic_publish($msg, $ex_name, $routing_key, false, false, null);
- //关闭
- $this->channel->close(); // 关闭信道
- $this->connect->close(); // 关闭链接
- }
- /**
- * 生产者 生产实时队列 只生产一个队列
- *
- * @param $data
- * @param $queue
- * @param $exchange_name
- * @param string $routing_key
- * @param string $type
- * @param bool $queue_durable
- * @param bool $message_durable
- */
- public function send(
- $data, $queue, $exchange_name, $routing_key = '', $type = 'direct', $queue_durable = false, $message_durable = false
- ) {
- $this->connect = new AMQPStreamConnection(
- $this->config['host'],
- $this->config['port'],
- $this->config['login'],
- $this->config['password'],
- $this->config['vhost']
- );
- $this->channel = $this->connect->channel();
- //声明交换机
- $this->channel->exchange_declare($exchange_name, $type, false, $message_durable, false, false, false, [], null);
- //声明队列
- $this->channel->queue_declare($queue, false, $queue_durable, false, false, false, [], null);
- //绑定交换机与队列
- $this->channel->queue_bind($queue, $exchange_name, $routing_key, false, [], null);
- //处理压入消息格式
- $msg = $this->message($data, $message_durable);
- //压入消息到交换机
- $this->channel->basic_publish($msg, $exchange_name, $routing_key, false, false, null);
- //关闭
- $this->channel->close(); // 关闭信道
- $this->connect->close(); // 关闭链接
- }
- /**
- * 延时生产者 生产延时队列 生产两个队列(1个延时死信队列,1个实时消费队列)
- *
- * @param array|string $data 消息内容
- * @param string $delay_queue 延时队列
- * @param string $delay_exchange_name 延时交换机
- * @param string $delay_routing_key 延时队列routing_key
- * @param string $delay_type 延时交换机类型
- * @param int $delay_expire 延时队列声明周期(过期会触发死信规则)
- * @param bool $delay_queue_durable 延时队列持久化
- * @param bool $delay_message_durable 延时消息持久化
- * @param string $receive_queue 接收队列(接收死信规则)
- * @param string $receive_exchange_name 接收交换机
- * @param string $receive_routing_key 接收队列routing_key
- * @param string $receive_type 接收交换机类型
- * @param bool $receive_queue_durable 接收队列持久化
- * @param bool $receive_message_durable 接收消息持久化
- */
- public function sendDelay(
- $data,
- $delay_queue,
- $delay_exchange_name,
- $delay_routing_key = '',
- $delay_type = 'direct',
- $delay_expire = 0,
- $delay_queue_durable = false,
- $delay_message_durable = false,
- $receive_queue,
- $receive_exchange_name,
- $receive_routing_key = '',
- $receive_type = 'direct',
- $receive_queue_durable = false,
- $receive_message_durable = false
- ) {
- $this->connect = new AMQPStreamConnection(
- $this->config['host'],
- $this->config['port'],
- $this->config['login'],
- $this->config['password'],
- $this->config['vhost']
- );
- $this->channel = $this->connect->channel();
- // 声明主队列 <-- 关联主消费交换机(接收) <-- 数据压入
- // | 300 /
- // | /
- // 关联延时交换机 -> 关联消费队列
- //
- // 延时交换机(弹出数据) -> 消费
- //声明死信规则
- $tale = new AMQPTable();
- $tale->set('x-dead-letter-exchange', $receive_exchange_name);
- $tale->set('x-dead-letter-routing-key', $receive_routing_key);
- $tale->set('x-message-ttl', $delay_expire);
- //声明延时交换机 与 接收交换机
- $this->channel->exchange_declare($delay_exchange_name, $delay_type, false, $delay_message_durable, false, false, false, [], null);
- $this->channel->exchange_declare($receive_exchange_name, $receive_type, false, $receive_message_durable, false, false, false, [], null);
- //声明延时队列 与 接收队列
- $this->channel->queue_declare($delay_queue, false, $delay_queue_durable, false, false, false, $tale, null);
- $this->channel->queue_declare($receive_queue, false, $receive_queue_durable, false, false, false, [], null);
- //绑定延时队列到延时交换机 与 绑定接收队列到接收交换机
- $this->channel->queue_bind($delay_queue, $delay_exchange_name, $delay_routing_key, false, [], null);
- $this->channel->queue_bind($receive_queue, $receive_exchange_name, $receive_routing_key, false, [], null);
- //处理压入消息格式
- $msg = $this->message($data, $delay_message_durable);
- //压入消息到交换机
- $this->channel->basic_publish($msg, $delay_exchange_name, $delay_routing_key, false, false, null);
- //关闭
- $this->channel->close(); // 关闭信道
- $this->connect->close(); // 关闭链接
- }
- /**
- * 消费者 只消费一个队列
- *
- * @param string $queue
- * @param string $consumer_tag
- * @param bool $no_local
- * @param bool $no_ack
- * @param bool $exclusive
- * @param bool $nowait
- * @param null $ticket
- * @param array $arguments
- */
- function receive(
- $queue = '', // 队列名
- $callback = null, // 回调函数
- $queue_durable = false, //持久化
- $consumer_tag = '',
- $no_local = false,
- $no_ack = false,
- $exclusive = false, //队列是否可以被其他队列访问
- $nowait = false,
- $ticket = null,
- $arguments = array()
- ) {
- $this->connect = new AMQPStreamConnection(
- $this->config['host'],
- $this->config['port'],
- $this->config['login'],
- $this->config['password'],
- $this->config['vhost']
- );
- $this->channel = $this->connect->channel();
- //一次只消费一个
- $this->channel->basic_qos(0,1,false);
- //声明队列
- $this->channel->queue_declare($queue, false, $queue_durable, false, false, false, [], null);
- //订阅消费 callback仅绑定并不立即执行
- $this->channel->basic_consume($queue, $consumer_tag, $no_local, $no_ack, $exclusive, $nowait, $callback,
- $ticket, $arguments);
- //轮训等待并触发basic_consume绑定的callback
- while (count($this->channel->callbacks)) {
- $this->channel->wait();
- }
- //关闭
- $this->channel->close(); // 关闭信道
- $this->connect->close(); // 关闭链接
- }
- // $callback = function ($msg) {
- // $rabbit = new \app\admin\command\Rabbitmq();
- // $rabbit->sendMessage($msg->body);
- //// $recharge = new Recharge();
- //// $recharge->sendMessage($msg->body);
- // $this->basicAck($msg->delivery_info['delivery_tag']);
- // };
- }
|