MQ全称为Message Queue, 是一种分布式应用程序的的通信方法,它是消费-生产者模型的一个典型的代表,producer往消息队列中不断写入消息,而另一端consumer则可以读取或者订阅队列中的消息。RabbitMQ是MQ产品的典型代表,是一款基于AMQP协议可复用的企业消息系统。业务上,可以实现服务提供者和消费者之间的数据解耦,提供高可用性的消息传输机制,在实际生产中应用相当广泛。本文意在介绍Rabbitmq的基本原理,包括rabbitmq基本框架,概念,通信过程等。
系统架构
Rabbitmq系统最核心的组件是Exchange和Queue,下图是系统简单的示意图。Exchange和Queue是在rabbitmq server(又叫做broker)端,producer和consumer在应用端。
producer&Consumer
producer指的是消息生产者,consumer消息的消费者。
Queue
消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。
-
设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失
-
设置为临时队列,queue中的数据在系统重启之后就会丢失
-
设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除
Exchange
Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。
Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别:
-
Direct 直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue
-
fanout 广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。
-
topic 主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)
-
headers 消息体的header匹配(ignore)
Binding
所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系。
virtual host
在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。
通信过程
假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。基本的通信流程大概如下所示:
-
P1生产消息,发送给服务器端的Exchange
-
Exchange收到消息,根据ROUTINKEY,将消息转发给匹配的Queue1
-
Queue1收到消息,将消息发送给订阅者C1
-
C1收到消息,发送ACK给队列确认收到消息
-
Queue1收到ACK,删除队列中缓存的此条消息
Consumer收到消息时需要显式的向rabbit broker发送basic.ack消息或者consumer订阅消息时设置auto_ack参数为true。在通信过程中,队列对ACK的处理有以下几种情况:
-
如果consumer接收了消息,发送ack,rabbitmq会删除队列中这个消息,发送另一条消息给consumer。
-
如果cosumer接受了消息, 但在发送ack之前断开连接,rabbitmq会认为这条消息没有被deliver,在consumer在次连接的时候,这条消息会被redeliver。
-
如果consumer接受了消息,但是程序中有bug,忘记了ack,rabbitmq不会重复发送消息。
-
rabbitmq2.0.0和之后的版本支持consumer reject某条(类)消息,可以通过设置requeue参数中的reject为true达到目地,那么rabbitmq将会把消息发送给下一个注册的consumer。
安装(参考2)
安装erlang
https://blog.csdn.net/ws_kfxd/article/details/85858736
环境
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
规则,安装
./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac make && make install
设置path
vim /etc/profile #set erlang environment ERL_PATH=/usr/local/erlang/bin PATH=$ERL_PATH:$PATH source /etc/profile
查看是否安装 及版本
# erl
Erlang/OTP 23 [erts-11.0] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe]
Eshell V11.0 (abort with ^G)
失败的安装安装
wget http://erlang.org/download/otp_src_21.2.tar.gz
tar -zxvf otp_src_21.2.tar.gz
cd otp_src_21.2
./configure --prefix=/usr/local/servers/erlang --without-javac --with-ssl=/usr/local/ssl/
make && make install
erl 查看是否成功,还有版本号
安装MQ
安装地址
https://www.rabbitmq.com/install-rpm.html
新建文件 /etc/yum.repos.d/rabbitmq.repo
内容, 选centos的配置 :
[bintray-rabbitmq-server] name=bintray-rabbitmq-rpm baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/ gpgcheck=0 repo_gpgcheck=0 enabled=1
查看以下, 最下面有一个 rabbitmq-server.noarch 3.8.5-1.el7
yum list |grep rabbit
安装 , centos7要用 3.8.5-1.el7
rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc # This example assumes the CentOS 8 version of the package. # For CentOS 7, replace "el8" with "el7". yum install rabbitmq-server-3.8.5-1.el8.noarch.rpm
安装2
erlang
https://github.com/rabbitmq/erlang-rpm
# In /etc/yum.repos.d/rabbitmq_erlang.repo
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/$basearch
repo_gpgcheck=1
gpgcheck=1
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
[rabbitmq_erlang-source]
name=rabbitmq_erlang-source
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
更新yum
yum clean all yum makecache yun list | grep erlang #最新, erlang.x86_64 23.0.2-1.el7 rabbitmq_erlang
安装
yum -y install erlang
验证, 完成
root@iabc:~/download/rabbit# erl Erlang/OTP 23 [erts-11.0.2] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:1] [hipe]
Rabbitmq
下载, Download the server
https://www.rabbitmq.com/install-rpm.html
安装
yum install rabbitmq-server-3.8.5-1.el7.noarch.rpm
默认安装目录:
/usr/lib/rabbitmq
使用&配置
添加管理员
./rabbitmqctl add_user qadmin qadmin_pwd #这样才可以访问web页面提供的管理页面 ./rabbitmqctl set_user_tags qadmin administrator #给用户qxadmin加上有访问虚拟空间/的权限 ./rabbitmqctl set_permissions -p / qadmin '.*' '.*' '.*'
启动服务
./rabbitmq-server -detached (-detached为可选参数,表示后台开启)
开启管理插件( http://localhost:15672 )
./rabbitmq-plugins enable rabbitmq_management
关闭mq服务
./rabbitmqctl stop
查看
./rabbitmqctl status
创建虚拟机
Vh: testvh
创建用户
user: test_user_01 / pwd: 121212
还没配置权限
创建交换机 exchange
Virtual host: testvh
name : test_exc
type: fabout ( direat, fabout , headers, topic)
Durability: durable ( durable:持久化, transient: )
Auto delete: No
Internal: NO
Arguments:
添加队列 queues
Virtual host: testvh
Type: ( Classic / Quorum )
Name: test_que
Durability: Durable
Auto delete: No
Arguments
Binding
选中并点击交换机名. 或者队列名, 进图详情页面,. 下方都有绑定操作
AMQP
https://www.jianshu.com/p/cbe3e82e888f
windows安装php_amqp
下载ampq扩展(https://pecl.php.net/package/amqp), 找到对应的版本, 下载DLL, 解压 ,
复制 php_amqp.dll 到 php/ext/
复制 rabbitmq.4.dll 到 php/ ( 网上说法: 32位复制rabbitmq.4.dll 到windows/system32 ,64位则windows/SysWOW64/ )
修改php.ini , extension=php_amqp.dll
重启Nginx
查看, phpinfo
centos
安装rabbitmq-c-0.8.
-
rabbitmq-c是一个用于C语言的,与AMQP server进行交互的client库。
-
AMQP扩展能够支持rabbitMQ,AMQP是rabbitMQ的协议(Advanced Message Queue Protocol 高级消息队列协议)
-
下载地址:https://github.com/alanxz/rabbitmq-c/releases/download/v0.8.0/rabbitmq-c-0.8.0.tar.gz
wget https://github.com/alanxz/rabbitmq-c/releases/download/v0.8.0/rabbitmq-c-0.8.0.tar.gz
tar -zxvf rabbitmq-c-0.8.0.tar.gz
cd rabbitmq-c-0.8.0
./configure --prefix=/usr/local/rabbitmq-c
make
make install
安装 amqp-1.9.3
wget https://pecl.php.net/get/amqp-1.9.3.tgz tar -zxvf amqp-1.9.3.tgz cd amqp-1.9.3 phpize ./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c/ make make install #修改 php.ini extension=amqp.so
重启php-fpm
php-amqplib
官网文档: http://www.rabbitmq.com/tutorials/tutorial-one-php.html
php版本要求: ^7.1
composer直接安装
composer require php-amqplib/php-amqplib
遇到的坑:
服务端有多个版本php,默认读到5.x的版本, 一直提示php版本低
解决:
进到项目根目录, 执行已下…根目录会多一个 composer.phar
/www/server/php/71/bin/php -r "copy('https://getcomposer.org/installer', 'composer-setup.php');" /www/server/php/71/bin/php -r "if (hash_file('sha384', 'composer-setup.php') === 'e0012edf3e80b6978849f5eff0d4b4e4c79ff1609dd1e613307e16318854d24ae64f26d17af3ef0bf7cfb710ca74755a') { echo 'Installer verified'; } else { echo 'Installer corrupt'; unlink('composer-setup.php'); } echo PHP_EOL;" #下载 composer.phar /www/server/php/71/bin/php composer-setup.php #删 /www/server/php/71/bin/php -r "unlink('composer-setup.php');" #指定php7.1 安装composer包 /www/server/php/71/bin/php composer.phar require php-amqplib/php-amqplib 2.7.*
Mark
理解消息队列: https://www.cnblogs.com/williamjie/p/9481780.html
什么是MQ: https://www.jianshu.com/p/78847c203b76
基础概念: https://www.cnblogs.com/williamjie/p/9481774.html
入门教程, 比较全: https://blog.csdn.net/php_lzr/article/details/99635346
AMQP协议, AMQP与rabbitmq的关系 https://www.cnblogs.com/wutianqi/p/10043011.html