RabbitMQ-从入门到应用

基本原理

MQ全称为Message Queue, 是一种分布式应用程序的的通信方法,它是消费-生产者模型的一个典型的代表,producer往消息队列中不断写入消息,而另一端consumer则可以读取或者订阅队列中的消息。RabbitMQ是MQ产品的典型代表,是一款基于AMQP协议可复用的企业消息系统。业务上,可以实现服务提供者和消费者之间的数据解耦,提供高可用性的消息传输机制,在实际生产中应用相当广泛。本文意在介绍Rabbitmq的基本原理,包括rabbitmq基本框架,概念,通信过程等。

系统架构

Rabbitmq系统最核心的组件是Exchange和Queue,下图是系统简单的示意图。Exchange和Queue是在rabbitmq server(又叫做broker)端,producer和consumer在应用端。

producer&Consumer

producer指的是消息生产者,consumer消息的消费者。

Queue

消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。

  1. 设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失

  2. 设置为临时队列,queue中的数据在系统重启之后就会丢失

  3. 设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除

Exchange

Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。

Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别:

  1. Direct 直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue

  2. fanout 广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。

  3. topic 主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)

  4. headers 消息体的header匹配(ignore)

Binding

所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系。

virtual host

在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。

通信过程

假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。基本的通信流程大概如下所示:

  1. P1生产消息,发送给服务器端的Exchange

  2. Exchange收到消息,根据ROUTINKEY,将消息转发给匹配的Queue1

  3. Queue1收到消息,将消息发送给订阅者C1

  4. C1收到消息,发送ACK给队列确认收到消息

  5. Queue1收到ACK,删除队列中缓存的此条消息

Consumer收到消息时需要显式的向rabbit broker发送basic.ack消息或者consumer订阅消息时设置auto_ack参数为true。在通信过程中,队列对ACK的处理有以下几种情况:

  1. 如果consumer接收了消息,发送ack,rabbitmq会删除队列中这个消息,发送另一条消息给consumer。

  2. 如果cosumer接受了消息, 但在发送ack之前断开连接,rabbitmq会认为这条消息没有被deliver,在consumer在次连接的时候,这条消息会被redeliver。

  3. 如果consumer接受了消息,但是程序中有bug,忘记了ack,rabbitmq不会重复发送消息。

  4. rabbitmq2.0.0和之后的版本支持consumer reject某条(类)消息,可以通过设置requeue参数中的reject为true达到目地,那么rabbitmq将会把消息发送给下一个注册的consumer。

安装(参考2)

安装erlang

https://blog.csdn.net/ws_kfxd/article/details/85858736

环境

yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel

规则,安装

./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac

make && make install

设置path

vim /etc/profile

#set erlang environment
ERL_PATH=/usr/local/erlang/bin
PATH=$ERL_PATH:$PATH

source /etc/profile

查看是否安装 及版本

# erl

Erlang/OTP 23 [erts-11.0] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe]

Eshell V11.0  (abort with ^G)

失败的安装安装

wget http://erlang.org/download/otp_src_21.2.tar.gz
tar -zxvf otp_src_21.2.tar.gz
cd otp_src_21.2
./configure --prefix=/usr/local/servers/erlang --without-javac --with-ssl=/usr/local/ssl/ 
make && make install 
erl   查看是否成功,还有版本号

安装MQ

安装地址

https://www.rabbitmq.com/install-rpm.html

新建文件 /etc/yum.repos.d/rabbitmq.repo

内容, 选centos的配置 :

[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/
gpgcheck=0
repo_gpgcheck=0
enabled=1

查看以下, 最下面有一个 rabbitmq-server.noarch 3.8.5-1.el7

yum list |grep rabbit

安装 , centos7要用 3.8.5-1.el7

rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
# This example assumes the CentOS 8 version of the package.
# For CentOS 7, replace "el8" with "el7".

yum install rabbitmq-server-3.8.5-1.el8.noarch.rpm

安装2

erlang

https://github.com/rabbitmq/erlang-rpm

# In /etc/yum.repos.d/rabbitmq_erlang.repo
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/$basearch
repo_gpgcheck=1
gpgcheck=1
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
       https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

[rabbitmq_erlang-source]
name=rabbitmq_erlang-source
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
       https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

更新yum

yum clean all
yum makecache

yun list | grep erlang  #最新, erlang.x86_64    23.0.2-1.el7   rabbitmq_erlang

安装

yum -y install erlang

验证, 完成

root@iabc:~/download/rabbit# erl
Erlang/OTP 23 [erts-11.0.2] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:1] [hipe]

Rabbitmq

下载, Download the server

https://www.rabbitmq.com/install-rpm.html

安装

yum install rabbitmq-server-3.8.5-1.el7.noarch.rpm

默认安装目录:

/usr/lib/rabbitmq

使用&配置

添加管理员

./rabbitmqctl add_user qadmin qadmin_pwd

#这样才可以访问web页面提供的管理页面
./rabbitmqctl set_user_tags qadmin administrator 

#给用户qxadmin加上有访问虚拟空间/的权限
./rabbitmqctl set_permissions -p / qadmin '.*' '.*' '.*'  

启动服务

./rabbitmq-server -detached (-detached为可选参数,表示后台开启)  

开启管理插件( http://localhost:15672 )

./rabbitmq-plugins enable rabbitmq_management

关闭mq服务

./rabbitmqctl stop

查看

./rabbitmqctl status

创建虚拟机

Vh: testvh

创建用户

user: test_user_01 / pwd: 121212

还没配置权限

创建交换机 exchange

Virtual host: testvh

name : test_exc

type: fabout ( direat, fabout , headers, topic)

Durability: durable ( durable:持久化, transient: )

Auto delete: No

Internal: NO

Arguments:

添加队列 queues

Virtual host: testvh

Type: ( Classic / Quorum )

Name: test_que

Durability: Durable

Auto delete: No

Arguments

Binding

选中并点击交换机名. 或者队列名, 进图详情页面,. 下方都有绑定操作

AMQP

https://www.jianshu.com/p/cbe3e82e888f

windows安装php_amqp

下载ampq扩展(https://pecl.php.net/package/amqp), 找到对应的版本, 下载DLL, 解压 ,

复制 php_amqp.dll 到 php/ext/

复制 rabbitmq.4.dll 到 php/ ( 网上说法: 32位复制rabbitmq.4.dll 到windows/system32 ,64位则windows/SysWOW64/ )

修改php.ini , extension=php_amqp.dll

重启Nginx

查看, phpinfo

centos

安装rabbitmq-c-0.8.

wget https://github.com/alanxz/rabbitmq-c/releases/download/v0.8.0/rabbitmq-c-0.8.0.tar.gz
tar -zxvf rabbitmq-c-0.8.0.tar.gz 
cd rabbitmq-c-0.8.0
./configure --prefix=/usr/local/rabbitmq-c
make
make install

安装 amqp-1.9.3

wget https://pecl.php.net/get/amqp-1.9.3.tgz
tar -zxvf amqp-1.9.3.tgz 
cd amqp-1.9.3
phpize 
./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c/
make
make install

#修改 php.ini
extension=amqp.so

重启php-fpm

php-amqplib

官网文档: http://www.rabbitmq.com/tutorials/tutorial-one-php.html

php版本要求: ^7.1

composer直接安装

composer require php-amqplib/php-amqplib

遇到的坑:

服务端有多个版本php,默认读到5.x的版本, 一直提示php版本低

解决:

进到项目根目录, 执行已下…根目录会多一个 composer.phar

/www/server/php/71/bin/php -r "copy('https://getcomposer.org/installer', 'composer-setup.php');"

/www/server/php/71/bin/php -r "if (hash_file('sha384', 'composer-setup.php') === 'e0012edf3e80b6978849f5eff0d4b4e4c79ff1609dd1e613307e16318854d24ae64f26d17af3ef0bf7cfb710ca74755a') { echo 'Installer verified'; } else { echo 'Installer corrupt'; unlink('composer-setup.php'); } echo PHP_EOL;"

#下载 composer.phar
/www/server/php/71/bin/php composer-setup.php

#删
/www/server/php/71/bin/php -r "unlink('composer-setup.php');"

#指定php7.1 安装composer包
/www/server/php/71/bin/php composer.phar require php-amqplib/php-amqplib 2.7.*

Mark

理解消息队列: https://www.cnblogs.com/williamjie/p/9481780.html

什么是MQ: https://www.jianshu.com/p/78847c203b76

基础概念: https://www.cnblogs.com/williamjie/p/9481774.html

入门教程, 比较全: https://blog.csdn.net/php_lzr/article/details/99635346

AMQP协议, AMQP与rabbitmq的关系 https://www.cnblogs.com/wutianqi/p/10043011.html