Rabbitmq.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. <?php
  2. namespace app\common\library;
  3. use PhpAmqpLib\Channel\AMQPChannel;
  4. use PhpAmqpLib\Connection\AMQPStreamConnection;
  5. use PhpAmqpLib\Message\AMQPMessage;
  6. use PhpAmqpLib\Wire\AMQPTable;
  7. use think\Cache;
  8. use think\Config;
  9. use think\Log;
  10. class Rabbitmq
  11. {
  12. /**
  13. * 配置
  14. *
  15. * @var array
  16. */
  17. public $config;
  18. /**
  19. * @var AMQPChannel
  20. */
  21. public $channel;
  22. /**
  23. * initialized
  24. *
  25. * @var AMQPStreamConnection
  26. */
  27. public $connect;
  28. public function __construct(array $config = [])
  29. {
  30. $this->config = array_merge(Config::get('rabbitmq'), $config);
  31. }
  32. /**
  33. * 处理消息格式 A Message for use with the Channnel.basic_* methods.
  34. *
  35. * @param $msg
  36. * @param $message_durable
  37. * @return AMQPMessage
  38. */
  39. public function message($msg, $message_durable)
  40. {
  41. if (is_array($msg)) {
  42. $msg = json_encode($msg);
  43. }
  44. if (!is_object($msg) && $message_durable === true) {
  45. return new AMQPMessage($msg, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
  46. }
  47. return new AMQPMessage($msg);
  48. }
  49. /**
  50. * acknowledge one or more messages
  51. *
  52. * @param string $delivery_tag
  53. */
  54. public function basicAck($delivery_tag)
  55. {
  56. $this->channel->basic_ack($delivery_tag);
  57. }
  58. /**
  59. * @param $data
  60. * @param $exchange_name
  61. * @param string $routing_key
  62. * @param string $type
  63. * @param bool $queue_durable
  64. * @param bool $message_durable
  65. */
  66. public function transferExchange(
  67. $data, $exchange_name, $routing_key = '', $type = 'topic', $queue_durable = false, $message_durable = false
  68. )
  69. {
  70. Log::info('MQ:data::' . print_r(json_encode($data), true));
  71. Log::info('MQ:data::' . print_r(json_encode($this->config), true));
  72. Log::info('MQ:data:: $exchange_name->' . $exchange_name . '::$routing_key->' . $routing_key . '::$type->' . $type);
  73. $this->connect = new AMQPStreamConnection(
  74. $this->config['host'],
  75. $this->config['port'],
  76. $this->config['login'],
  77. $this->config['password'],
  78. $this->config['vhost']
  79. );
  80. $this->channel = $this->connect->channel();
  81. //声明交换机
  82. $this->channel->exchange_declare($exchange_name, $type, false, $message_durable, false, false, false, [], null);
  83. //处理压入消息格式
  84. $msg = $this->message($data, $message_durable);
  85. //压入消息到交换机
  86. $this->channel->basic_publish($msg, $exchange_name, $routing_key, false, false, null);
  87. //关闭
  88. $this->channel->close(); // 关闭信道
  89. $this->connect->close(); // 关闭链接
  90. }
  91. /**
  92. * 发送延时消息
  93. * @param array $data
  94. * @param $routing_key
  95. * @param $timeout
  96. * @param string $ex_name
  97. * @param string $ex_type
  98. */
  99. public function transferDelayExchange(
  100. array $data,
  101. $routing_key,
  102. $timeout,
  103. $ex_name='cps.ex.delay.topic',
  104. $ex_type = 'topic'
  105. ){
  106. Log::info("Delay:{$timeout} MQ:data::".print_r($data, true));
  107. $this->connect = new AMQPStreamConnection(
  108. $this->config['host'],
  109. $this->config['port'],
  110. $this->config['login'],
  111. $this->config['password'],
  112. $this->config['vhost']
  113. );
  114. $this->channel = $this->connect->channel();
  115. //声明交换机
  116. $args = new AMQPTable(["x-delayed-type" => $ex_type]);
  117. $this->channel->exchange_declare($ex_name, 'x-delayed-message', false, false, false, false, false, $args, null);
  118. //处理压入消息格式
  119. $msg = new AMQPMessage(json_encode($data));
  120. if($timeout){
  121. $msg->set('application_headers', new AMQPTable(["x-delay" => $timeout]));
  122. }
  123. //压入消息到交换机
  124. $this->channel->basic_publish($msg, $ex_name, $routing_key, false, false, null);
  125. //关闭
  126. $this->channel->close(); // 关闭信道
  127. $this->connect->close(); // 关闭链接
  128. }
  129. /**
  130. * 生产者 生产实时队列 只生产一个队列
  131. *
  132. * @param $data
  133. * @param $queue
  134. * @param $exchange_name
  135. * @param string $routing_key
  136. * @param string $type
  137. * @param bool $queue_durable
  138. * @param bool $message_durable
  139. */
  140. public function send(
  141. $data, $queue, $exchange_name, $routing_key = '', $type = 'direct', $queue_durable = false, $message_durable = false
  142. ) {
  143. $this->connect = new AMQPStreamConnection(
  144. $this->config['host'],
  145. $this->config['port'],
  146. $this->config['login'],
  147. $this->config['password'],
  148. $this->config['vhost']
  149. );
  150. $this->channel = $this->connect->channel();
  151. //声明交换机
  152. $this->channel->exchange_declare($exchange_name, $type, false, $message_durable, false, false, false, [], null);
  153. //声明队列
  154. $this->channel->queue_declare($queue, false, $queue_durable, false, false, false, [], null);
  155. //绑定交换机与队列
  156. $this->channel->queue_bind($queue, $exchange_name, $routing_key, false, [], null);
  157. //处理压入消息格式
  158. $msg = $this->message($data, $message_durable);
  159. //压入消息到交换机
  160. $this->channel->basic_publish($msg, $exchange_name, $routing_key, false, false, null);
  161. //关闭
  162. $this->channel->close(); // 关闭信道
  163. $this->connect->close(); // 关闭链接
  164. }
  165. /**
  166. * 延时生产者 生产延时队列 生产两个队列(1个延时死信队列,1个实时消费队列)
  167. *
  168. * @param array|string $data 消息内容
  169. * @param string $delay_queue 延时队列
  170. * @param string $delay_exchange_name 延时交换机
  171. * @param string $delay_routing_key 延时队列routing_key
  172. * @param string $delay_type 延时交换机类型
  173. * @param int $delay_expire 延时队列声明周期(过期会触发死信规则)
  174. * @param bool $delay_queue_durable 延时队列持久化
  175. * @param bool $delay_message_durable 延时消息持久化
  176. * @param string $receive_queue 接收队列(接收死信规则)
  177. * @param string $receive_exchange_name 接收交换机
  178. * @param string $receive_routing_key 接收队列routing_key
  179. * @param string $receive_type 接收交换机类型
  180. * @param bool $receive_queue_durable 接收队列持久化
  181. * @param bool $receive_message_durable 接收消息持久化
  182. */
  183. public function sendDelay(
  184. $data,
  185. $delay_queue,
  186. $delay_exchange_name,
  187. $delay_routing_key = '',
  188. $delay_type = 'direct',
  189. $delay_expire = 0,
  190. $delay_queue_durable = false,
  191. $delay_message_durable = false,
  192. $receive_queue,
  193. $receive_exchange_name,
  194. $receive_routing_key = '',
  195. $receive_type = 'direct',
  196. $receive_queue_durable = false,
  197. $receive_message_durable = false
  198. ) {
  199. $this->connect = new AMQPStreamConnection(
  200. $this->config['host'],
  201. $this->config['port'],
  202. $this->config['login'],
  203. $this->config['password'],
  204. $this->config['vhost']
  205. );
  206. $this->channel = $this->connect->channel();
  207. // 声明主队列 <-- 关联主消费交换机(接收) <-- 数据压入
  208. // | 300 /
  209. // | /
  210. // 关联延时交换机 -> 关联消费队列
  211. //
  212. // 延时交换机(弹出数据) -> 消费
  213. //声明死信规则
  214. $tale = new AMQPTable();
  215. $tale->set('x-dead-letter-exchange', $receive_exchange_name);
  216. $tale->set('x-dead-letter-routing-key', $receive_routing_key);
  217. $tale->set('x-message-ttl', $delay_expire);
  218. //声明延时交换机 与 接收交换机
  219. $this->channel->exchange_declare($delay_exchange_name, $delay_type, false, $delay_message_durable, false, false, false, [], null);
  220. $this->channel->exchange_declare($receive_exchange_name, $receive_type, false, $receive_message_durable, false, false, false, [], null);
  221. //声明延时队列 与 接收队列
  222. $this->channel->queue_declare($delay_queue, false, $delay_queue_durable, false, false, false, $tale, null);
  223. $this->channel->queue_declare($receive_queue, false, $receive_queue_durable, false, false, false, [], null);
  224. //绑定延时队列到延时交换机 与 绑定接收队列到接收交换机
  225. $this->channel->queue_bind($delay_queue, $delay_exchange_name, $delay_routing_key, false, [], null);
  226. $this->channel->queue_bind($receive_queue, $receive_exchange_name, $receive_routing_key, false, [], null);
  227. //处理压入消息格式
  228. $msg = $this->message($data, $delay_message_durable);
  229. //压入消息到交换机
  230. $this->channel->basic_publish($msg, $delay_exchange_name, $delay_routing_key, false, false, null);
  231. //关闭
  232. $this->channel->close(); // 关闭信道
  233. $this->connect->close(); // 关闭链接
  234. }
  235. /**
  236. * 消费者 只消费一个队列
  237. *
  238. * @param string $queue
  239. * @param string $consumer_tag
  240. * @param bool $no_local
  241. * @param bool $no_ack
  242. * @param bool $exclusive
  243. * @param bool $nowait
  244. * @param null $ticket
  245. * @param array $arguments
  246. */
  247. function receive(
  248. $queue = '', // 队列名
  249. $callback = null, // 回调函数
  250. $queue_durable = false, //持久化
  251. $consumer_tag = '',
  252. $no_local = false,
  253. $no_ack = false,
  254. $exclusive = false, //队列是否可以被其他队列访问
  255. $nowait = false,
  256. $ticket = null,
  257. $arguments = array()
  258. ) {
  259. $this->connect = new AMQPStreamConnection(
  260. $this->config['host'],
  261. $this->config['port'],
  262. $this->config['login'],
  263. $this->config['password'],
  264. $this->config['vhost']
  265. );
  266. $this->channel = $this->connect->channel();
  267. //一次只消费一个
  268. $this->channel->basic_qos(0,1,false);
  269. //声明队列
  270. $this->channel->queue_declare($queue, false, $queue_durable, false, false, false, [], null);
  271. //订阅消费 callback仅绑定并不立即执行
  272. $this->channel->basic_consume($queue, $consumer_tag, $no_local, $no_ack, $exclusive, $nowait, $callback,
  273. $ticket, $arguments);
  274. //轮训等待并触发basic_consume绑定的callback
  275. while (count($this->channel->callbacks)) {
  276. $this->channel->wait();
  277. }
  278. //关闭
  279. $this->channel->close(); // 关闭信道
  280. $this->connect->close(); // 关闭链接
  281. }
  282. // $callback = function ($msg) {
  283. // $rabbit = new \app\admin\command\Rabbitmq();
  284. // $rabbit->sendMessage($msg->body);
  285. //// $recharge = new Recharge();
  286. //// $recharge->sendMessage($msg->body);
  287. // $this->basicAck($msg->delivery_info['delivery_tag']);
  288. // };
  289. }