MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。这里来学习一种常用的消息队列RabbitMQ
1. 认识RabbitMQ
在介绍RabbitMQ之前,我们先来看下面一个电商项目的场景:
- 商品的原始数据保存在数据库中,增删改查都在数据库中完成。
- 搜索服务数据来源是索引库(Elasticsearch),如果数据库商品发生变化,索引库数据不能及时更新。
- 商品详情做了页面静态化处理,静态页面数据也不会随着数据库商品更新而变化。
如果我们在后台修改了商品的价格,搜索页面和商品详情页显示的依然是旧的价格,这样显然不对。该如何解决?
我们可能会想到这么做:
方案1:每当后台对商品做增删改操作,同时修改索引库数据及更新静态页面。
方案2:搜索服务和商品页面静态化服务对外提供操作接口,后台在商品增删改后,调用接口。
这两种方案都有个严重的问题:就是代码耦合,后台服务中需要嵌入搜索和商品页面服务,违背了微服务的独立原则。
这时,我们就会采用另外一种解决办法,那就是消息队列!
商品服务对商品增删改以后,无需去操作索引库和静态页面,只需向MQ发送一条消息(比如包含商品id的消息),也不关心消息被谁接收。 搜索服务和静态页面服务监听MQ,接收消息,然后分别去处理索引库和静态页面(根据商品id去更新索引库和商品详情静态页面)。
1.1 什么是消息队列
MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
1、任务异步处理:
高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达MySQL,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。减少了应用程序的响应时间。
2、应用程序解耦合:
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
1.2 AMQP和JMS
MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
两者间的区别和联系:
JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
JMS规定了两种消息模型;而AMQP的消息模型更加丰富
1.3 常见MQ产品
ActiveMQ:基于JMS
RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
Kafka:分布式消息系统,高吞吐量
1.4 RabbitMQ的工作原理
下图是RabbitMQ的基本结构:
组成部分说明:
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
生产者发送消息流程:
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
消费者接收消息流程:
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
6、ack回复
2. RabbitMq 的搭建流程
2.1 准备工作—安装Erlang
配置Erlang环境(c++)
1 | yum -y install make gcc gcc-c++ kernel-devel m4 ncurses ncurses-devel openssl-devel perl |
从Erlang Solution安装(推荐)到官网 http://erlang.org/download/ 下载最新包,这里使用otp_src_21.1.tar.gz
1 | mkdir /data/erlang |
下载速度比较慢.,建议先下载到本地,再上传到服务器
下载完后解压
1 | tar -vxf otp_src_21.1.tar.gz |
安装编译
1 | cd otp_src_21.1 |
安装好后
修改/etc/profile文件: vi /etc/profile
增加环境变量
1 | #set erlang environment |
刷新配置: source /etc/profile
测试:执行erl,看是否能打开eshell,用“halt().”退出,注意后面的“.”点号,那是erlang的结束符
2.2 安装 RabbitMq及过程错误解决
先建立一个工作目录,并下载安装包,
1
2 mkdir /data/rabbitMq
cd /data/rabbitMq/
官网最新安装包下载链接: http://www.rabbitmq.com/releases/rabbitmq-server/
github 下载链接:https://github.com/rabbitmq/rabbitmq-server/releases
网速好的可通过如下命令行下载
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.8/rabbitmq-server-generic-unix-3.7.8.tar.xz
下载完成后,开始解压
1
2 xz -d rabbitmq-server-generic-unix-3.7.8.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.7.8.tar接下来配置 rabbitmq 的环境变量(这个跟上面的erlang配置以及java的环境变量差不多)
vi /etc/profile
设置环境变量,一般新增的,直接放在文档最后面就行,方便查找修改
1
2
3 RABBITMQ_BIN=/data/rabbitMq/rabbitmq_server-3.7.8/sbin
PATH=PATH:PATH:RABBITMQ_BIN
export PATH/etc/profile 效果如下图:
执行以下命令,使得PATH路径更新,rabbitMQ安装成功。
source /etc/profile
开启管理界面及配置
1
2
3
4 rabbitmq-plugins enable rabbitmq_management #启动管理界面
vim /data/rabbitMq/rabbitmq_server-3.7.8/ebin/rabbit.app
# 比如修改密码,配置等,例如looppack_users中的《“guest”》,只保留guest
rabbitmq-server -detached #后台运行rabbitmqrabbitmq 基本操作命令,更多命令可参考 RabbitMQ基础操作命令
1
2
3
4 rabbitmq-server on # 添加开机启动RabbitMQ服务
rabbitmq-server start # 启动服务
rabbitmq-server status # 查看服务状态
rabbitmq-server stop # 停止服务访问链接: http://127.0.0.1:15672/ 说明启动成功。
此时,如果通过外部访问,需要考虑防火墙设置。
本篇是Centos 7.X 所以要设置 firewalld , 如果是Centos 6.X 要设置 iptables
两种操作,第一种:开放端口供外界访问(推荐,因接近实际搭建涉及的问题)
1
2
3
4 # firewall-cmd --zone=public --add-port=4369/tcp --permanent #erlang发现端口4369
# firewall-cmd --zone=public --add-port=5672/tcp --permanent #默认client端通信端口5672
# firewall-cmd --zone=public --add-port=15672/tcp --permanent #默认server管理端口15672
# firewall-cmd --zone=public --add-port=25672/tcp --permanent #默认server间内部通信口25672端口配置好后,重启防火墙,使其生效
systemctl restart firewalld
方法二:直接关闭防火墙。
systemctl stop firewalld
更多防火墙问题 linux-Centos-7-64位:3、 firewalld 配置
由于guest用户被限制,只能通过localhost访问,因此我们需要新建一个用户,并授予管理员权限。
新建一个用户名为admin,密码为admin的用户,并授予管理员(administrator)权限。
方法一:命令行创建
rabbitmqctl add_user admin admin123 rabbitmqctl set_user_tags admin administrator
方法二:登录控制台创建 http://127.0.0.1:15672/
搭建完成。
记得,此时创建的 admin用户只有登录管理界面权限,没有操作权限:“No access”,如需开启权限有两种方式:
1,界面操作
设置权限
保存即可。
2、命令行形式
通过命令授权
语法: set_permissions [-p
] 执行: rabbitmqctl set permissions -p / admin . . .* 完成对admin的授权,然后启动项目就正常了
正常权限界面如下:
3. 基本模式
在上图的模型中,有以下概念:
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
3.1 生产者
新建一个maven工程,添加amqp-client依赖
1 | <dependency> |
连接工具类:
1 | public class ConnectionUtil { |
生产者发送消息:
1 | public class Send { |
控制台:
web管理页面:服务器地址/端口号 (本地:127.0.0.1:15672,默认用户及密码:guest guest)
点击队列名称,进入详情页,可以查看消息:
3.2 消费者接收消息
1 | public class Recv { |
控制台打印:
再看看队列的消息,已经被消费了
消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消息。一旦有新的消息进入队列,就会立即打印.
4. WorkQueues模式
工作队列或者竞争消费者模式
work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。
这个消息模型在Web应用程序中特别有用,可以处理短的HTTP请求窗口中无法处理复杂的任务。
接下来我们来模拟这个流程:
P:生产者:任务的发布者
C1:消费者1:领取任务并且完成任务,假设完成速度较慢(模拟耗时)
C2:消费者2:领取任务并且完成任务,假设完成速度较快
4.1 生产者
生产者循环发送50条消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 public class Send {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 循环发布任务
for (int i = 0; i < 50; i++) {
// 消息内容
String message = "task .. " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 2);
}
// 关闭通道和连接
channel.close();
connection.close();
}
}
4.2 消费者1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 public class Recv {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println(" [消费者1] received : " + msg + "!");
//模拟任务耗时1s
try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); }
}
};
// 监听队列,第二个参数:是否自动进行消息确认。
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
4.3 消费者2
代码与消费者1基本类似,只是消费者2没有设置消费耗时时间。
接下来,两个消费者一同启动,然后发送50条消息:
可以发现,两个消费者各自消费了不同25条消息,这就实现了任务的分发。
刚才的实现有何问题
消费者1比消费者2的效率要低,一次任务的耗时较长
然而两人最终消费的消息数量是一样的
消费者2大量时间处于空闲状态,消费者1一直忙碌
现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
怎么实现呢?
通过 BasicQos 方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理1个Message。换句话说,在接收到该Consumer的ack前,他它
不会将新的Message分发给它。相反,它会将其分派给不是仍然忙碌的下一个Consumer。
值得注意的是:prefetchCount在手动ack的情况下才生效,自动ack不生效。
5. 订阅模式
说明下:
一个生产者多个消费者
每个消费者都有一个自己的队列
生产者没有将消息直接发送给队列,而是发送给exchange(交换机、转发器)
每个队列都需要绑定到交换机上
生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者消费
例子:注册->发邮件、发短信
X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
Exchange类型有以下几种:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Header:header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。
Header模式不展开了,感兴趣可以参考这篇文章https://blog.csdn.net/zhu_tianwei/article/details/40923131
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
5.1 Publish/subscribe发布与订阅模型
(交换机类型:Fanout,也称为广播 )
模型示意图 :
5.1.1 生产者
和前面两种模式不同:
- 1) 声明Exchange,不再声明Queue
- 2) 发送消息到Exchange,不再发送到Queue
1 | public class Send { |
5.1.2 消费者1 (注册成功发给短信服务)
1 | public class Recv { |
5.1.3 消费者2(注册成功发给邮件服务)
1 | public class Recv2 { |
我们运行两个消费者,然后发送1条消息:
思考:
1、publish/subscribe与work queues有什么区别。
区别:
1)work queues不用定义交换机,而publish/subscribe需要定义交换机。
2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认交换机)。
3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实际上work queues会将队列绑定到默认的交换机 。
相同点:
所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
2、实际工作用 publish/subscribe还是work queues。
建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大(也可以做到同一队列竞争),并且发布订阅模式可以指定自己专用的交换机。
5.2 Routing 路由模型
Routing模型示意图:
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
接下来看代码:
5.2.1 生产者
1 | public class Send { |
5.2.2 消费者1
1 | public class Recv { |
5.2.3 消费者2
1 | public class Recv2 { |
我们发送sms的RoutingKey,发现结果:只有指定短信的消费者1收到消息了
5.3 Topics 通配符模式
Topics模型示意图:
每个消费者监听自己的队列,并且设置带统配符的routingkey,生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
Routingkey一般都是有一个或者多个单词组成,多个单词之间以“.”分割,例如:inform.sms
通配符规则:
:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
audit.#:能够匹配audit.irs.corporate 或者 audit.irs
audit.*:只能匹配audit.irs
从示意图可知,我们将发送所有描述动物的消息。消息将使用由三个字(两个点)组成的Routing key发送。路由关键字中的第一个单词将描述速度,第二个颜色和第三个种类:“
我们创建了三个绑定:Q1绑定了“.orange.”,Q2绑定了“...rabbit”和“lazy.#”。
Q1匹配所有的橙色动物。
Q2匹配关于兔子以及懒惰动物的消息。
下面做个小练习,假如生产者发送如下消息,会进入哪个队列:
quick.orange.rabbit Q1 Q2 routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2
lazy.orange.elephant Q1 Q2
quick.orange.fox Q1
lazy.pink.rabbit Q2 (值得注意的是,虽然这个routingKey与Q2的两个bindingKey都匹配,但是只会投递Q2一次)
quick.brown.fox 不匹配任意队列,被丢弃
quick.orange.male.rabbit 不匹配任意队列,被丢弃
orange 不匹配任意队列,被丢弃
下面我们以指定Routing key=”quick.orange.rabbit”为例,验证上面的答案
5.3.1 生产者
1 | public class Send { |
5.3.2 消费者1
1 | public class Recv { |
5.3.3 消费者2
1 | public class Recv2 { |
结果C1、C2是都接收到消息了:
6. Spring整合RabbitMQ
6.1 生产者
创建消息生产者项目:spring_rabbitmq_producer
添加依赖
1
2
3
4
5<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>编写spring整合rabbitmq配置文件:spring-rabbitmq-producer.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28<!-- 1. 配置连接 -->
<rabbit:connection-factory
id="connectionFactory"
host="127.0.0.1"
port="5672"
username="pomelo"
password="pomelo"
virtual-host="/pomelo"
/>
<!-- 2. 配置队列 -->
<rabbit:queue name="myQueue"/>
<!-- 3.配置rabbitAdmin -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 4. 配置topic类型exchange;队列绑定到交换机 -->
<rabbit:topic-exchange name="myExchange">
<rabbit:bindings>
<rabbit:binding queue="myQueue" pattern="msg.#"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 5. 配置消息对象json转换类 -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<!-- 6. 配置RabbitTemplate -->
<rabbit:template
id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="myExchange"
message-converter="jsonMessageConverter"
/>发消息测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23package com.pomelo.producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.HashMap;
import java.util.Map;
public class Send {
public static void main(String[] args) {
//创建spring容器
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring-rabbitmq-producer.xml");
//从容器中获取对象
RabbitTemplate template = context.getBean(RabbitTemplate.class);
// 发送消息
Map<String,String> map = new HashMap<>();
map.put("email","550731230@qq.com");
template.convertAndSend("msg.email",map);
context.close();
}
}
6.2 消费者
创建消费者项目:spring_rabbitmq_consumer
添加依赖
1
2
3
4
5
6
7<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
</dependencies>编写消息监听器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25package com.pomelo.listener;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import java.io.IOException;
public class EmailMessageListener implements MessageListener {
private static final ObjectMapper MAPPER = new ObjectMapper();
public void onMessage(Message message) {
try {
JsonNode jsonNode = MAPPER.readTree(message.getBody());
String email = jsonNode.get("email").asText();
System.out.println("获取队列中消息:" + email);
} catch (IOException e) {
e.printStackTrace();
}
}
}编写spring整合rabbitmq配置文件:spring-rabbitmq-consumer.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19<!-- 1. springIOC注解扫描-->
<context:component-scan base-package="com.pomelo.listener"/>
<!-- 2. 配置连接工厂 -->
<rabbit:connection-factory
id="connectionFactory"
host="127.0.0.1"
port="5672"
username="pomelo"
password="pomelo"
virtual-host="/pomelo"
/>
<!-- 3. 配置队列名 -->
<rabbit:queue name="myQueue"/>
<!-- 4.配置rabbitAdmin -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 5.配置监听 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="emailMessageListener" queue-names="myQueue" />
</rabbit:listener-container>运行项目
1
2
3
4
5
6
7
8
9
10
11
12package com.pomelo.consumer;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws IOException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring-rabbitmq-consumer.xml");
System.in.read();
}
}
7. SpringBoot整合RabbitMQ
搭建SpringBoot环境
同样创建两个工程 mq-rabbitmq-producer和mq-rabbitmq-consumer
添加AMQP的启动器:
1
2
3
4
5
6
7
8<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐test</artifactId>
</dependency>在application.yml中添加RabbitMQ的配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19server:
port: 10086
spring:
application:
name: mq-rabbitmq-producer
rabbitmq:
host: 192.168.1.103
port: 5672
username: kavito
password: 123456
virtualHost: /kavito
template:
retry:
enabled: true
initial-interval: 10000ms
max-interval: 300000ms
multiplier: 2
exchange: topic.exchange
publisher-confirms: true属性说明:
template:有关
AmqpTemplate
的配置retry:失败重试
enabled:开启失败重试
initial-interval:第一次重试的间隔时长
max-interval:最长重试间隔,超过这个间隔将不再重试
multiplier:下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试
当然如果consumer只是接收消息而不发送,就不用配置template相关内容。
定义RabbitConfig配置类,配置Exchange、Queue、及绑定交换机。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class RabbitmqConfig {
public static final String QUEUE_EMAIL = "queue_email";//email队列
public static final String QUEUE_SMS = "queue_sms";//sms队列
public static final String EXCHANGE_NAME="topic.exchange";//topics类型交换机
public static final String ROUTINGKEY_EMAIL="topic.#.email.#";
public static final String ROUTINGKEY_SMS="topic.#.sms.#";
//声明交换机
(EXCHANGE_NAME)
public Exchange exchange(){
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//声明email队列
/*
* new Queue(QUEUE_EMAIL,true,false,false)
* durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
* auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
* exclusive 表示该消息队列是否只在当前connection生效,默认是false
*/
(QUEUE_EMAIL)
public Queue emailQueue(){
return new Queue(QUEUE_EMAIL);
}
//声明sms队列
(QUEUE_SMS)
public Queue smsQueue(){
return new Queue(QUEUE_SMS);
}
//ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
public Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue,
@Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
//ROUTINGKEY_SMS队列绑定交换机,指定routingKey
public Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue,
@Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
7.1 生产者(mq-rabbitmq-producer)
为了方便测试,我直接把生产者代码放工程测试类:发送routing key是”topic.sms.email”的消息,那么mq-rabbitmq-consumer下那些监听的(与交换机(topic.exchange)绑定,并且订阅的routingkey中匹配了”topic.sms.email”规则的) 队列就会收到消息。
1 |
|
运行测试类发送5条消息:
web管理界面: 可以看到已经创建了交换机以及queue_email、queue_sms 2个队列,并且向这两个队列分别发送了5条消息
7.2 消费者(mq-rabbitmq-consumer)
编写一个监听器组件,通过注解配置消费者队列,以及队列与交换机之间绑定关系。(也可以像生产者那样通过配置类配置)
在SpringAmqp中,对消息的消费者进行了封装和抽象。一个JavaBean的方法,只要添加@RabbitListener注解,就可以成为了一个消费者。
1 |
|
属性说明:
@Componet:类上的注解,注册到Spring容器
@RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:
bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:
value:这个消费者关联的队列。值是@Queue,代表一个队列
exchange:队列所绑定的交换机,值是@Exchange类型
key:队列和交换机绑定的RoutingKey,可指定多个
启动mq-rabbitmq-comsumer项目:
8. 高级特性
8.1 投递消息机制
8.1.1 Confirm确认消息
消息确认,是指生产者消息投递后,如果Broker收到消息,则会给生产者一个应答
生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障
生成者发送消息与监听confirm是异步的。
如何实现Confirm确认消息?
- 在channel上开启确认模式:channel.confirmSelect()
- 在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体返回结果对消息进行重发或日志记录等
生产者
1 | public class Producer { |
消费者
1 | public class Consumer { |
8.1.2 return返回消息
- Return Listener用于处理一些不可路由消息
- 生产者指定Exchange和RoutingKey,将消息投递到某个队列,然后消费者监听队列,进行消息处理
- 但在某些情况下,在发送消息时,若当前的exchange不存在或指定的路由key路由失败,这时,如果需要监听这种不可达的消息,则要使用return listener
return 消息机制
在基础API中有一个关键的配置项:
Mandatory
: 若为true,则监听器会接收到路由不可达的消息,然后进行后粗处理 ;若为false,则broker端自动删除该消息
发送端发送了一条消息,但是没有发现Exchange,则可通过return listener监听这些消息
生产者:
1 | public class Producer { |
消费者
1 |
|
当生产端执行channel.basicPublish(exchange,routingKey,true,null,message.getBytes());消息能发送成功,也可以从消费端看到打印
当执行channel.basicPublish(exchange,routingKeyError,true,null,message.getBytes());消息发送失败,因为路由失败了嘛,生产端能看到如下打印:
1 | ——handle return—— |
若生产端将mandatory设为false,则ReturnListener不会进行回调
8.2 consumer ACK
在RabbitMQ中指代的是消费者收到消息后确认的一种行为,关注点在于消费者能否实际接收到MQ发送的消息。
其提供了三种确认方式:
- 自动确认acknowledge=”none”:当消费者接收到消息的时候,就会自动给到RabbitMQ一个回执,告诉MQ我已经收到消息了,不在乎消费者接收到消息之后业务处理的成功与否。
- 手动确认acknowledge=”manual”:当消费者收到消息后,不会立刻告诉RabbitMQ已经收到消息了,而是等待业务处理成功后,通过调用代码的方式手动向MQ确认消息已经收到。当业务处理失败,就可以做一些重试机制,甚至让MQ重新向消费者发送消息都是可以的。
- 根据异常情况确认acknowledge=”auto”:该方式是通过抛出异常的类型,来做响应的处理(如重发、确认等布拉不拉布拉)。这种方式比较麻烦。
当消息一旦被消费者接收到,会立刻自动向MQ确认接收,并将响应的message从RabbitMQ消息缓存中移除,但是在实际的业务处理中,会出现消息收到了,但是业务处理出现异常的情况,在自动确认的模式下,该条业务处理失败的message就相当于被丢弃了。如果设置了手动确认,则需要在业务处理完成之后,手动调用channel.basicAck(),手动的签收,如果业务处理失败,则手动调用channel.basicNack()方法拒收,并让MQ重新发送该消息。
如果不做任何关于acknowledge的配置,默认就是自动确认签收的。
生产者
1 | /* |
消费者端打开Ack模式
1 | server: |
消费者端创建一个listener并实现ChannelAwareMessageListener接口(其实也可以不实现该接口,只要@RabbitListener标记的方法,或者@RabbitListener标记的类+@RabbitHandler标记的方法的参数列表有[com.rabbitmq.client.Channel]和[org.springframework.amqp.core.Message]两个参数,都可以)
1 | package com.leolee.rabbitmq.MsgListener; |
用生产者测试类发送不同的消息给MQ
成功接收并手动确认后,MQ队列就删除了该消息的缓存:
被拒绝的消息会一直发送:
8.3 消费端限流
RabbitMQ限流机制
RabbitMQ提供了QOS限流功能,在非自动确认消息的前提下,如果有一定数目的未被确认前,不消费新的消息。
在消费端开启限流机制
- 把
channel.basicConsume(...)
方法的autoAck
参数改为false - 设置Qos
1 | channel.basicQos(0,1,false); |
1 | /** |
生产端
1 | public static void main(String[] args) throws Exception{ |
消费端
1 | public static void main(String[] args) throws Exception{ |
如果想看限流效果可先把消费端的这行代码channel.basicAck(envelope.getDeliveryTag(),false);
先注释。
在RabbitMQ控制台可以看到,有4条消息未被消费,一条消息等待签收确认。只有当前消息签收后,才消费剩余的消息
8.4 TTL
TTL队列/消息
TTL是Time To Live的缩写, 也就是生存时间
RabbitMQ支持消息的过期时间, 在消息发送时可以进行指定
RabbitMQ支持队列的过期时间, 从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除
代码演示
消费者中设置队列超时时间为10秒, 启动之后关闭消费者
生产者发送两条消息, 一条消息不设置超时时间, 一条消息设置5秒后超时
启动生产者之后, 监控RabbitMQ控制台, 发现5秒后设置了消息超时时间的消息先超时清除, 然后10秒后另外一条消息也超时清除
配置文件
生产者
1 | package com.qiyexue.api.ttl; |
消费者
1 | package com.qiyexue.api.ttl; |
8.5 死信队列介绍
- 死信队列:DLX,
dead-letter-exchange
- 利用DLX,当消息在一个队列中变成死信
(dead message)
之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX
消息变成死信有以下几种情况:
- 消息被拒绝(basic.reject / basic.nack),并且requeue = false
- 消息TTL过期
- 队列达到最大长度
死信处理过程:
- DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
- 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
- 可以监听这个队列中的消息做相应的处理。
死信队列设置
首先需要设置死信队列的exchange和queue,然后进行绑定:
1
2
3
4Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: #
#表示只要有消息到达了Exchange,那么都会路由到这个queue上然后需要有一个监听,去监听这个队列进行处理
然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可:
arguments.put(" x-dead-letter-exchange","dlx.exchange");
,这样消息在过期、requeue、 队列在达到最大长度时,消息就可以直接路由到死信队列!
生产端
1 | public class Producer { |
自定义消费端
1 | public class MyConsumer extends DefaultConsumer { |
消费端
- 声明正常处理消息的交换机、队列及绑定规则
- 在正常交换机上指定死信发送的Exchange
- 声明死信交换机、队列及绑定规则
- 监听死信队列,进行后续处理,这里省略
1 | public class Consumer { |
运行说明
启动消费端,此时查看管控台,新增了两个Exchange,两个Queue。在test_dlx_queue
上我们设置了DLX,也就代表死信消息会发送到指定的Exchange上,最终其实会路由到dlx.queue
上。
此时关闭消费端,然后启动生产端,查看管控台队列的消息情况,test_dlx_queue
的值为1,而dlx_queue
的值为0。
10s后的队列结果如图,由于生产端发送消息时指定了消息的过期时间为10s,而此时没有消费端进行消费,消息便被路由到死信队列中。
8.6 延迟队列
延迟队列存储的对象肯定是对应的延迟消息,所谓”延迟消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延迟队列将订单信息发送到延迟队列。
场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到只能设备。
RabbitMQ怎么实现延迟队列
AMQP协议,以及RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过TTL和DLX模拟出延迟队列的功能。
TTL(Time To Live)
RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
RabbitMQ针对队列中的消息过期时间有两种方法可以设置。
- A: 通过队列属性设置,队列中所有消息都有相同的过期时间。
- B: 对消息进行单独设置,每条消息TTL可以不同。
如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter
DLX (Dead-Letter-Exchange)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由。
- x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
- x-dead-letter-routing-key:指定routing-key发送
队列出现dead letter的情况有:
- 消息或者队列的TTL过期
- 队列达到最大长度
- 消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false
利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。这时候消息就可以重新被消费。
详细可以参考: RabbitMQ之死信队列
实例:
首先建立2个exchange和2个queue:
exchange_delay_begin:这个是producer端发送时调用的exchange, 将消息发送至queue_dealy_begin中。
queue_delay_begin: 通过routingKey="delay"绑定exchang_delay_begin, 同时配置DLX=exchange_delay_done, 当消息变成死信时,发往exchange_delay_done中。
exchange_delay_done: 死信的exchange, 如果不配置x-dead-letter-routing-key则采用原有默认的routingKey,即queue_delay_begin绑定exchang_delay_beghin采用的“delay”。
queue_delay_done:消息在TTL到期之后,最终通过exchang_delay_done发送值此queue,消费端通过消费此queue的消息,即可以达到延迟的效果。
建立exchange和queue的代码(当然这里可以通过RabbitMQ的管理界面来实现,无需code相关代码):
1 | channel.exchangeDeclare("exchange_delay_begin", "direct", true); |
消费端
1 | QueueingConsumer consumer = new QueueingConsumer(channel); |
生产端
1 | AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); |
在创建完exchange和queue之后,首先执行consumer端的代码,之后执行producer端的代码,待producer发送完毕之后,查看consumer端的输出:
1 | receive msg time:Tue Feb 14 21:06:19 CST 2017, msg body:Tue Feb 14 21:05:19 CST 2017 |