java-RabbitMQ详解

MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。这里来学习一种常用的消息队列RabbitMQ

1. 认识RabbitMQ

在介绍RabbitMQ之前,我们先来看下面一个电商项目的场景:

  1. 商品的原始数据保存在数据库中,增删改查都在数据库中完成。
  2. 搜索服务数据来源是索引库(Elasticsearch),如果数据库商品发生变化,索引库数据不能及时更新。
  3. 商品详情做了页面静态化处理,静态页面数据也不会随着数据库商品更新而变化。

如果我们在后台修改了商品的价格,搜索页面和商品详情页显示的依然是旧的价格,这样显然不对。该如何解决?

我们可能会想到这么做:

方案1:每当后台对商品做增删改操作,同时修改索引库数据及更新静态页面。

方案2:搜索服务和商品页面静态化服务对外提供操作接口,后台在商品增删改后,调用接口。 

这两种方案都有个严重的问题:就是代码耦合,后台服务中需要嵌入搜索和商品页面服务,违背了微服务的独立原则。

这时,我们就会采用另外一种解决办法,那就是消息队列!

​ 商品服务对商品增删改以后,无需去操作索引库和静态页面,只需向MQ发送一条消息(比如包含商品id的消息),也不关心消息被谁接收。 搜索服务和静态页面服务监听MQ,接收消息,然后分别去处理索引库和静态页面(根据商品id去更新索引库和商品详情静态页面)。

1.1 什么是消息队列

MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

1、任务异步处理:

高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达MySQL,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。减少了应用程序的响应时间。

2、应用程序解耦合:

MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

1.2 AMQP和JMS

MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。

两者间的区别和联系:

JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式

JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。

JMS规定了两种消息模型;而AMQP的消息模型更加丰富

1.3 常见MQ产品

ActiveMQ:基于JMS

RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好

RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会

Kafka:分布式消息系统,高吞吐量

1.4 RabbitMQ的工作原理

下图是RabbitMQ的基本结构:

img

组成部分说明:

Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

生产者发送消息流程:

1、生产者和Broker建立TCP连接。

2、生产者和Broker建立通道。

3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。

4、Exchange将消息转发到指定的Queue(队列)

消费者接收消息流程:

1、消费者和Broker建立TCP连接

2、消费者和Broker建立通道

3、消费者监听指定的Queue(队列)

4、当有消息到达Queue时Broker默认将消息推送给消费者。

5、消费者接收到消息。

6、ack回复

2. RabbitMq 的搭建流程

2.1 准备工作—安装Erlang

配置Erlang环境(c++)

1
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses ncurses-devel openssl-devel perl

从Erlang Solution安装(推荐)到官网 http://erlang.org/download/ 下载最新包,这里使用otp_src_21.1.tar.gz

1
2
3
mkdir /data/erlang
cd /data/erlang/
wget http://erlang.org/download/otp_src_21.1.tar.gz

下载速度比较慢.,建议先下载到本地,再上传到服务器

下载完后解压

1
tar -vxf otp_src_21.1.tar.gz

安装编译

1
2
cd otp_src_21.1 
./configure --prefix=/data/erlang/erlang; make; make install

安装好后

修改/etc/profile文件: vi /etc/profile

增加环境变量

1
2
3
4
5
#set erlang environment
ERLANG_HOME=/data/erlang/erlang
PATH=ERLANG_HOME/bin:PATH
export ERLANG_HOME
export PATH

刷新配置: source /etc/profile

测试:执行erl,看是否能打开eshell,用“halt().”退出,注意后面的“.”点号,那是erlang的结束符

img

2.2 安装 RabbitMq及过程错误解决

先建立一个工作目录,并下载安装包,

1
2
mkdir /data/rabbitMq
cd /data/rabbitMq/

官网最新安装包下载链接: http://www.rabbitmq.com/releases/rabbitmq-server/

github 下载链接:https://github.com/rabbitmq/rabbitmq-server/releases

网速好的可通过如下命令行下载

wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.8/rabbitmq-server-generic-unix-3.7.8.tar.xz

下载完成后,开始解压

1
2
xz -d rabbitmq-server-generic-unix-3.7.8.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.7.8.tar

img

接下来配置 rabbitmq 的环境变量(这个跟上面的erlang配置以及java的环境变量差不多)

vi /etc/profile

设置环境变量,一般新增的,直接放在文档最后面就行,方便查找修改

1
2
3
RABBITMQ_BIN=/data/rabbitMq/rabbitmq_server-3.7.8/sbin
PATH=PATH:PATH:RABBITMQ_BIN
export PATH

/etc/profile 效果如下图:

img

执行以下命令,使得PATH路径更新,rabbitMQ安装成功。

source /etc/profile

开启管理界面及配置

1
2
3
4
rabbitmq-plugins enable rabbitmq_management  #启动管理界面
vim /data/rabbitMq/rabbitmq_server-3.7.8/ebin/rabbit.app
# 比如修改密码,配置等,例如looppack_users中的《“guest”》,只保留guest
rabbitmq-server -detached #后台运行rabbitmq

rabbitmq 基本操作命令,更多命令可参考 RabbitMQ基础操作命令

1
2
3
4
rabbitmq-server on                  # 添加开机启动RabbitMQ服务
rabbitmq-server start # 启动服务
rabbitmq-server status # 查看服务状态
rabbitmq-server stop # 停止服务

访问链接: http://127.0.0.1:15672/ 说明启动成功。

img

此时,如果通过外部访问,需要考虑防火墙设置。

本篇是Centos 7.X 所以要设置 firewalld , 如果是Centos 6.X 要设置 iptables

两种操作,第一种:开放端口供外界访问(推荐,因接近实际搭建涉及的问题)

1
2
3
4
# firewall-cmd --zone=public --add-port=4369/tcp --permanent    #erlang发现端口4369
# firewall-cmd --zone=public --add-port=5672/tcp --permanent #默认client端通信端口5672
# firewall-cmd --zone=public --add-port=15672/tcp --permanent #默认server管理端口15672
# firewall-cmd --zone=public --add-port=25672/tcp --permanent #默认server间内部通信口25672

端口配置好后,重启防火墙,使其生效

systemctl restart firewalld

方法二:直接关闭防火墙。

systemctl stop firewalld

更多防火墙问题 linux-Centos-7-64位:3、 firewalld 配置

由于guest用户被限制,只能通过localhost访问,因此我们需要新建一个用户,并授予管理员权限。

新建一个用户名为admin,密码为admin的用户,并授予管理员(administrator)权限。

方法一:命令行创建

rabbitmqctl add_user admin admin123
rabbitmqctl set_user_tags admin administrator

方法二:登录控制台创建 http://127.0.0.1:15672/

img

搭建完成。

记得,此时创建的 admin用户只有登录管理界面权限,没有操作权限:“No access”,如需开启权限有两种方式:

1,界面操作

img

设置权限

img

保存即可。

2、命令行形式

通过命令授权

语法: set_permissions [-p ]

执行: rabbitmqctl set permissions -p / admin . . .* 完成对admin的授权,然后启动项目就正常了

正常权限界面如下:

img

3. 基本模式

img

在上图的模型中,有以下概念:

P:生产者,也就是要发送消息的程序

C:消费者:消息的接受者,会一直等待消息到来。

queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。 

3.1 生产者

新建一个maven工程,添加amqp-client依赖

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.1</version>
</dependency>

连接工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ConnectionUtil {
/**
* 建立与RabbitMQ的连接
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("192.168.1.103");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/kavito");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
factory.setUsername("kavito");
factory.setPassword("123456");
// 通过工厂获取连接
Connection connection = factory.newConnection();
return connection;
}
}

生产者发送消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Send {

private final static String QUEUE_NAME = "simple_queue";

public static void main(String[] argv) throws Exception {
// 1、获取到连接
Connection connection = ConnectionUtil.getConnection();
// 2、从连接中创建通道,使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
// 3、声明(创建)队列
//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 参数明细
* 1、queue 队列名称
* 2、durable 是否持久化,如果持久化,mq重启后队列还在
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4、消息内容
String message = "Hello World!";
// 向指定的队列中发送消息
//参数:String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 参数明细:
* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、props,消息的属性
* 4、body,消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

//关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
channel.close();
connection.close();
}
}

控制台:

img

web管理页面:服务器地址/端口号 (本地:127.0.0.1:15672,默认用户及密码:guest guest)

img

点击队列名称,进入详情页,可以查看消息:

3.2 消费者接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class Recv {
private final static String QUEUE_NAME = "simple_queue";

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 声明队列
//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 参数明细
* 1、queue 队列名称
* 2、durable 是否持久化,如果持久化,mq重启后队列还在
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
/**
* 当接收到消息后此方法将被调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println(" [x] received : " + msg + "!");
}
};

// 监听队列,第二个参数:是否自动进行消息确认。
//参数:String queue, boolean autoAck, Consumer callback
/**
* 参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

控制台打印:

img

再看看队列的消息,已经被消费了

img

消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消息。一旦有新的消息进入队列,就会立即打印.

4. WorkQueues模式

工作队列或者竞争消费者模式

img

work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。

这个消息模型在Web应用程序中特别有用,可以处理短的HTTP请求窗口中无法处理复杂的任务。

接下来我们来模拟这个流程:

P:生产者:任务的发布者

C1:消费者1:领取任务并且完成任务,假设完成速度较慢(模拟耗时)

C2:消费者2:领取任务并且完成任务,假设完成速度较快

4.1 生产者

生产者循环发送50条消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Send {
private final static String QUEUE_NAME = "test_work_queue";

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 循环发布任务
for (int i = 0; i < 50; i++) {
// 消息内容
String message = "task .. " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

Thread.sleep(i * 2);
}
// 关闭通道和连接
channel.close();
connection.close();
}
}

4.2 消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Recv {
private final static String QUEUE_NAME = "test_work_queue";

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println(" [消费者1] received : " + msg + "!");
//模拟任务耗时1s
try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); }
}
};
// 监听队列,第二个参数:是否自动进行消息确认。
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

4.3 消费者2

代码与消费者1基本类似,只是消费者2没有设置消费耗时时间。

接下来,两个消费者一同启动,然后发送50条消息:

1

无标题

可以发现,两个消费者各自消费了不同25条消息,这就实现了任务的分发。

刚才的实现有何问题

  1. 消费者1比消费者2的效率要低,一次任务的耗时较长

  2. 然而两人最终消费的消息数量是一样的

  3. 消费者2大量时间处于空闲状态,消费者1一直忙碌

现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。

怎么实现呢?

通过 BasicQos 方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理1个Message。换句话说,在接收到该Consumer的ack前,他它

不会将新的Message分发给它。相反,它会将其分派给不是仍然忙碌的下一个Consumer。

值得注意的是:prefetchCount在手动ack的情况下才生效,自动ack不生效。
无标题

5. 订阅模式

说明下:

  1. 一个生产者多个消费者

  2. 每个消费者都有一个自己的队列

  3. 生产者没有将消息直接发送给队列,而是发送给exchange(交换机、转发器)

  4. 每个队列都需要绑定到交换机上

  5. 生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者消费

    例子:注册->发邮件、发短信

X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

Exchange类型有以下几种:

Fanout:广播,将消息交给所有绑定到交换机的队列

Direct:定向,把消息交给符合指定routing key 的队列

Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Header:header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。

Header模式不展开了,感兴趣可以参考这篇文章https://blog.csdn.net/zhu_tianwei/article/details/40923131

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

5.1 Publish/subscribe发布与订阅模型

(交换机类型:Fanout,也称为广播

模型示意图 :

img

5.1.1 生产者

和前面两种模式不同:

  • 1) 声明Exchange,不再声明Queue
  • 2) 发送消息到Exchange,不再发送到Queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Send {

private final static String EXCHANGE_NAME = "test_fanout_exchange";

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

// 消息内容
String message = "注册成功!!";
// 发布消息到Exchange
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [生产者] Sent '" + message + "'");

channel.close();
connection.close();
}
}

5.1.2 消费者1 (注册成功发给短信服务)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Recv {
private final static String QUEUE_NAME = "fanout_exchange_queue_sms";//短信队列

private final static String EXCHANGE_NAME = "test_fanout_exchange";

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [短信服务] received : " + msg + "!");
}
};
// 监听队列,自动返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

5.1.3 消费者2(注册成功发给邮件服务)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Recv2 {
private final static String QUEUE_NAME = "fanout_exchange_queue_email";//邮件队列

private final static String EXCHANGE_NAME = "test_fanout_exchange";

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [邮件服务] received : " + msg + "!");
}
};
// 监听队列,自动返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

我们运行两个消费者,然后发送1条消息:

img

img

思考:

1、publish/subscribe与work queues有什么区别。

区别:

1)work queues不用定义交换机,而publish/subscribe需要定义交换机。

2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认交换机)。

3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实际上work queues会将队列绑定到默认的交换机 。

相同点:

所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。

2、实际工作用 publish/subscribe还是work queues。

建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大(也可以做到同一队列竞争),并且发布订阅模式可以指定自己专用的交换机。

5.2 Routing 路由模型

Routing模型示意图:

img

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

C1:消费者,其所在队列指定了需要routing key 为 error 的消息

C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

接下来看代码:

5.2.1 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Send {
private final static String EXCHANGE_NAME = "test_direct_exchange";

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 消息内容,
String message = "注册成功!请短信回复[T]退订";
// 发送消息,并且指定routing key 为:sms,只有短信服务能接收到消息
channel.basicPublish(EXCHANGE_NAME, "sms", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

channel.close();
connection.close();
}
}

5.2.2 消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Recv {
private final static String QUEUE_NAME = "direct_exchange_queue_sms";//短信队列
private final static String EXCHANGE_NAME = "test_direct_exchange";

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");//指定接收发送方指定routing key为sms的消息
//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");

// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [短信服务] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

5.2.3 消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Recv2 {
private final static String QUEUE_NAME = "direct_exchange_queue_email";//邮件队列
private final static String EXCHANGE_NAME = "test_direct_exchange";

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");//指定接收发送方指定routing key为email的消息

// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [邮件服务] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

我们发送sms的RoutingKey,发现结果:只有指定短信的消费者1收到消息了

img

5.3 Topics 通配符模式

Topics模型示意图:

img

每个消费者监听自己的队列,并且设置带统配符的routingkey,生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。

Routingkey一般都是有一个或者多个单词组成,多个单词之间以“.”分割,例如:inform.sms

通配符规则:

:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

audit.#:能够匹配audit.irs.corporate 或者 audit.irs

audit.*:只能匹配audit.irs

从示意图可知,我们将发送所有描述动物的消息。消息将使用由三个字(两个点)组成的Routing key发送。路由关键字中的第一个单词将描述速度,第二个颜色和第三个种类:“..”。

我们创建了三个绑定:Q1绑定了“.orange.”,Q2绑定了“...rabbit”和“lazy.#”。

Q1匹配所有的橙色动物。

Q2匹配关于兔子以及懒惰动物的消息。

下面做个小练习,假如生产者发送如下消息,会进入哪个队列:

quick.orange.rabbit Q1 Q2 routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2

lazy.orange.elephant Q1 Q2

quick.orange.fox Q1

lazy.pink.rabbit Q2 (值得注意的是,虽然这个routingKey与Q2的两个bindingKey都匹配,但是只会投递Q2一次)

quick.brown.fox 不匹配任意队列,被丢弃

quick.orange.male.rabbit 不匹配任意队列,被丢弃

orange 不匹配任意队列,被丢弃

下面我们以指定Routing key=”quick.orange.rabbit”为例,验证上面的答案

5.3.1 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Send {
private final static String EXCHANGE_NAME = "test_topic_exchange";

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 消息内容
String message = "这是一只行动迅速的橙色的兔子";
// 发送消息,并且指定routing key为:quick.orange.rabbit
channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes());
System.out.println(" [动物描述:] Sent '" + message + "'");

channel.close();
connection.close();
}
}

5.3.2 消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Recv {
private final static String QUEUE_NAME = "topic_exchange_queue_Q1";
private final static String EXCHANGE_NAME = "test_topic_exchange";

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机,同时指定需要订阅的routing key。订阅所有的橙色动物
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");

// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者1] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

5.3.3 消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Recv2 {
private final static String QUEUE_NAME = "topic_exchange_queue_Q2";
private final static String EXCHANGE_NAME = "test_topic_exchange";

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机,同时指定需要订阅的routing key。订阅关于兔子以及懒惰动物的消息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");

// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者2] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

结果C1、C2是都接收到消息了:

img

img

6. Spring整合RabbitMQ

6.1 生产者

  1. 创建消息生产者项目:spring_rabbitmq_producer

  2. 添加依赖 

    1
    2
    3
    4
    5
    <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-rabbit</artifactId>
      <version>2.0.1.RELEASE</version>
    </dependency>
  3. 编写spring整合rabbitmq配置文件:spring-rabbitmq-producer.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    <!-- 1. 配置连接 -->
    <rabbit:connection-factory
      id="connectionFactory"
       host="127.0.0.1"
      port="5672"
      username="pomelo"
      password="pomelo"
      virtual-host="/pomelo"
    />
    <!-- 2. 配置队列 -->
    <rabbit:queue name="myQueue"/>
    <!-- 3.配置rabbitAdmin -->
    <rabbit:admin connection-factory="connectionFactory"/>
    <!-- 4. 配置topic类型exchange;队列绑定到交换机 -->
    <rabbit:topic-exchange name="myExchange">
      <rabbit:bindings>
        <rabbit:binding queue="myQueue" pattern="msg.#"></rabbit:binding>
      </rabbit:bindings>
    </rabbit:topic-exchange>
    <!-- 5. 配置消息对象json转换类 -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
    <!-- 6. 配置RabbitTemplate -->
    <rabbit:template
      id="rabbitTemplate"
      connection-factory="connectionFactory"
      exchange="myExchange"
       message-converter="jsonMessageConverter"
    />
  4. 发消息测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    package com.pomelo.producer;


    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.support.ClassPathXmlApplicationContext;

    import java.util.HashMap;
    import java.util.Map;

    public class Send {

      public static void main(String[] args) {
        //创建spring容器
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring-rabbitmq-producer.xml");
        //从容器中获取对象
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        // 发送消息
        Map<String,String> map = new HashMap<>();
        map.put("email","550731230@qq.com");
        template.convertAndSend("msg.email",map);
        context.close();
      }
    }

6.2 消费者

  1. 创建消费者项目:spring_rabbitmq_consumer

  2. 添加依赖

    1
    2
    3
    4
    5
    6
    7
    <dependencies>
      <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.0.1.RELEASE</version>
      </dependency>
    </dependencies>
  3. 编写消息监听器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    package com.pomelo.listener;

    import com.fasterxml.jackson.databind.JsonNode;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;

    import java.io.IOException;
    @Component
    public class EmailMessageListener implements MessageListener {

      private static final ObjectMapper MAPPER = new ObjectMapper();

      @Override
      public void onMessage(Message message) {

        try {
          JsonNode jsonNode = MAPPER.readTree(message.getBody());
          String email = jsonNode.get("email").asText();
          System.out.println("获取队列中消息:" + email);
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  4. 编写spring整合rabbitmq配置文件:spring-rabbitmq-consumer.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    <!-- 1. springIOC注解扫描-->
    <context:component-scan base-package="com.pomelo.listener"/>
    <!-- 2. 配置连接工厂 -->
    <rabbit:connection-factory
      id="connectionFactory"
      host="127.0.0.1"
      port="5672"
      username="pomelo"
      password="pomelo"
      virtual-host="/pomelo"
    />
    <!-- 3. 配置队列名 -->
    <rabbit:queue name="myQueue"/>
    <!-- 4.配置rabbitAdmin -->
    <rabbit:admin connection-factory="connectionFactory"/>
    <!-- 5.配置监听 -->
    <rabbit:listener-container connection-factory="connectionFactory">
      <rabbit:listener ref="emailMessageListener" queue-names="myQueue" />
    </rabbit:listener-container>
  5. 运行项目

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    package com.pomelo.consumer;

    import org.springframework.context.support.ClassPathXmlApplicationContext;

    import java.io.IOException;

    public class Consumer {
      public static void main(String[] args) throws IOException {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring-rabbitmq-consumer.xml");
        System.in.read();
      }
    }

7. SpringBoot整合RabbitMQ

搭建SpringBoot环境

同样创建两个工程 mq-rabbitmq-producer和mq-rabbitmq-consumer

  1. 添加AMQP的启动器:

    1
    2
    3
    4
    5
    6
    7
    8
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring‐boot‐starter‐test</artifactId>
    </dependency>
  2. 在application.yml中添加RabbitMQ的配置:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    server:
    port: 10086
    spring:
    application:
    name: mq-rabbitmq-producer
    rabbitmq:
    host: 192.168.1.103
    port: 5672
    username: kavito
    password: 123456
    virtualHost: /kavito
    template:
    retry:
    enabled: true
    initial-interval: 10000ms
    max-interval: 300000ms
    multiplier: 2
    exchange: topic.exchange
    publisher-confirms: true

    属性说明:

    template:有关AmqpTemplate的配置

    retry:失败重试

    enabled:开启失败重试

    initial-interval:第一次重试的间隔时长

    max-interval:最长重试间隔,超过这个间隔将不再重试

    multiplier:下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍

    exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个

    publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试

    当然如果consumer只是接收消息而不发送,就不用配置template相关内容。

  3. 定义RabbitConfig配置类,配置Exchange、Queue、及绑定交换机。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    @Configuration
    public class RabbitmqConfig {
    public static final String QUEUE_EMAIL = "queue_email";//email队列
    public static final String QUEUE_SMS = "queue_sms";//sms队列
    public static final String EXCHANGE_NAME="topic.exchange";//topics类型交换机
    public static final String ROUTINGKEY_EMAIL="topic.#.email.#";
    public static final String ROUTINGKEY_SMS="topic.#.sms.#";

    //声明交换机
    @Bean(EXCHANGE_NAME)
    public Exchange exchange(){
    //durable(true) 持久化,mq重启之后交换机还在
    return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    //声明email队列
    /*
    * new Queue(QUEUE_EMAIL,true,false,false)
    * durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
    * auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
    * exclusive 表示该消息队列是否只在当前connection生效,默认是false
    */
    @Bean(QUEUE_EMAIL)
    public Queue emailQueue(){
    return new Queue(QUEUE_EMAIL);
    }
    //声明sms队列
    @Bean(QUEUE_SMS)
    public Queue smsQueue(){
    return new Queue(QUEUE_SMS);
    }

    //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
    @Bean
    public Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue,
    @Qualifier(EXCHANGE_NAME) Exchange exchange){
    return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    }
    //ROUTINGKEY_SMS队列绑定交换机,指定routingKey
    @Bean
    public Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue,
    @Qualifier(EXCHANGE_NAME) Exchange exchange){
    return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    }

    }

7.1 生产者(mq-rabbitmq-producer)

为了方便测试,我直接把生产者代码放工程测试类:发送routing key是”topic.sms.email”的消息,那么mq-rabbitmq-consumer下那些监听的(与交换机(topic.exchange)绑定,并且订阅的routingkey中匹配了”topic.sms.email”规则的) 队列就会收到消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@SpringBootTest
@RunWith(SpringRunner.class)
public class Send {

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void sendMsgByTopics(){

/**
* 参数:
* 1、交换机名称
* 2、routingKey
* 3、消息内容
*/
for (int i=0;i<5;i++){
String message = "恭喜您,注册成功!userid="+i;
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"topic.sms.email",message);
System.out.println(" [x] Sent '" + message + "'");
}

}
}

运行测试类发送5条消息:

img

web管理界面: 可以看到已经创建了交换机以及queue_email、queue_sms 2个队列,并且向这两个队列分别发送了5条消息

img

img

7.2 消费者(mq-rabbitmq-consumer)

编写一个监听器组件,通过注解配置消费者队列,以及队列与交换机之间绑定关系。(也可以像生产者那样通过配置类配置)

在SpringAmqp中,对消息的消费者进行了封装和抽象。一个JavaBean的方法,只要添加@RabbitListener注解,就可以成为了一个消费者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Component
public class ReceiveHandler {

//监听邮件队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_email", durable = "true"),
exchange = @Exchange(
value = "topic.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"topic.#.email.#","email.*"}))
public void rece_email(String msg){
System.out.println(" [邮件服务] received : " + msg + "!");
}

//监听短信队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_sms", durable = "true"),
exchange = @Exchange(
value = "topic.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"topic.#.sms.#"}))
public void rece_sms(String msg){
System.out.println(" [短信服务] received : " + msg + "!");
}
}

属性说明:

@Componet:类上的注解,注册到Spring容器
@RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:

bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:

value:这个消费者关联的队列。值是@Queue,代表一个队列

exchange:队列所绑定的交换机,值是@Exchange类型

key:队列和交换机绑定的RoutingKey,可指定多个

启动mq-rabbitmq-comsumer项目:

img

8. 高级特性

8.1 投递消息机制

8.1.1 Confirm确认消息

  • 消息确认,是指生产者消息投递后,如果Broker收到消息,则会给生产者一个应答

  • 生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障

    在这里插入图片描述

  • 生成者发送消息与监听confirm是异步的。

如何实现Confirm确认消息?

  1. 在channel上开启确认模式:channel.confirmSelect()
  2. 在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体返回结果对消息进行重发或日志记录等

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Producer {

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtil.getConn();
//1. 通过connection创建一个Channel
Channel channel = connection.createChannel();
//2.指定消息确认模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";

//3. 通过Channel发送数据
String message = "Hello from Producer";
channel.basicPublish(exchangeName,routingKey,null,message.getBytes());

//4. 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//成功的情况 deliveryTag:消息的唯一标签;
System.out.println("——get ack——");
}

@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//失败的情况
System.out.println("——have no ack——");
}
});

// 关闭掉就没confirm了
// CloseTool.closeElegantly(channel,connection);

}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Consumer {

public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConn();

//1. 通过connection创建一个Channel
Channel channel = connection.createChannel();

String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
String queueName = "test_confirm_queue";
//2. 声明一个exchange
channel.exchangeDeclare(exchangeName,"topic",true);
//3. 声明一个队列
channel.queueDeclare(queueName,true,false,false,null);
//4. 绑定
channel.queueBind(queueName,exchangeName,routingKey);
//5. 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6. 设置Channel
channel.basicConsume(queueName,true,queueingConsumer);
//7. 获取消息
while (true) {
//nextDelivery 会阻塞直到有消息过来
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("收到:" + message);
}
}
}

8.1.2 return返回消息

  • Return Listener用于处理一些不可路由消息
  • 生产者指定Exchange和RoutingKey,将消息投递到某个队列,然后消费者监听队列,进行消息处理
  • 但在某些情况下,在发送消息时,若当前的exchange不存在或指定的路由key路由失败,这时,如果需要监听这种不可达的消息,则要使用return listener

return 消息机制

在基础API中有一个关键的配置项:

  • Mandatory : 若为true,则监听器会接收到路由不可达的消息,然后进行后粗处理 ;若为false,则broker端自动删除该消息

在这里插入图片描述

发送端发送了一条消息,但是没有发现Exchange,则可通过return listener监听这些消息

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Producer {
public static final String MQ_HOST = "192.168.222.101";
public static final String MQ_VHOST = "/";
public static final int MQ_PORT = 5672;

public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(MQ_HOST);//配置host
connectionFactory.setPort(MQ_PORT);//配置port
connectionFactory.setVirtualHost(MQ_VHOST);//配置vHost

//2. 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3. 通过connection创建一个Channel
Channel channel = connection.createChannel();
String exchange = "test_return_exchange";
String routingKey = "return.save";
String routingKeyError = "abc.save";


//4. 通过Channel发送数据
String message = "Hello Return Message";

channel.addReturnListener((replyCode, replyText, exchange1, routingKey1, properties, body) -> {
System.out.println("——handle return——");
System.out.println("replyCode:" + replyCode);
System.out.println("replyText:" + replyText);
System.out.println("exchange1:" + exchange1);
System.out.println("routingKey1:" + routingKey1);
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
});

//mandatory : true
//channel.basicPublish(exchange,routingKey,true,null,message.getBytes());
channel.basicPublish(exchange,routingKeyError,true,null,message.getBytes());


}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

public class Consumer {

public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConn();

//1. 通过connection创建一个Channel
Channel channel = connection.createChannel();

String exchange = "test_return_exchange";
String routingKey = "return.#";
String queueName = "test_return_queue";

//2. 声明一个exchange
channel.exchangeDeclare(exchange,"topic",true,false,null);
//3. 声明一个队列
channel.queueDeclare(queueName,true,false,false,null);
//4. 绑定
channel.queueBind(queueName,exchange,routingKey);
//5. 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6. 设置Channel
channel.basicConsume(queueName,true,queueingConsumer);
//7. 获取消息
while (true) {
//nextDelivery 会阻塞直到有消息过来
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("收到:" + message);
}


}
}

当生产端执行channel.basicPublish(exchange,routingKey,true,null,message.getBytes());消息能发送成功,也可以从消费端看到打印

当执行channel.basicPublish(exchange,routingKeyError,true,null,message.getBytes());消息发送失败,因为路由失败了嘛,生产端能看到如下打印:

1
2
3
4
5
6
7
——handle return——
replyCode:312
replyText:NO_ROUTE
exchange1:test_return_exchange
routingKey1:abc.save
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:Hello Return Message

若生产端将mandatory设为false,则ReturnListener不会进行回调

8.2 consumer ACK

在RabbitMQ中指代的是消费者收到消息后确认的一种行为,关注点在于消费者能否实际接收到MQ发送的消息。

其提供了三种确认方式:

  1. 自动确认acknowledge=”none”:当消费者接收到消息的时候,就会自动给到RabbitMQ一个回执,告诉MQ我已经收到消息了,不在乎消费者接收到消息之后业务处理的成功与否。
  2. 手动确认acknowledge=”manual”:当消费者收到消息后,不会立刻告诉RabbitMQ已经收到消息了,而是等待业务处理成功后,通过调用代码的方式手动向MQ确认消息已经收到。当业务处理失败,就可以做一些重试机制,甚至让MQ重新向消费者发送消息都是可以的。
  3. 根据异常情况确认acknowledge=”auto”:该方式是通过抛出异常的类型,来做响应的处理(如重发、确认等布拉不拉布拉)。这种方式比较麻烦。

当消息一旦被消费者接收到,会立刻自动向MQ确认接收,并将响应的message从RabbitMQ消息缓存中移除,但是在实际的业务处理中,会出现消息收到了,但是业务处理出现异常的情况,在自动确认的模式下,该条业务处理失败的message就相当于被丢弃了。如果设置了手动确认,则需要在业务处理完成之后,手动调用channel.basicAck(),手动的签收,如果业务处理失败,则手动调用channel.basicNack()方法拒收,并让MQ重新发送该消息。

如果不做任何关于acknowledge的配置,默认就是自动确认签收的。
生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    /*
* 功能描述: <br>
* 〈测试Consumer Ack〉
* @Param: []
* @Return: void
* @Author: LeoLee
* @Date: 2020/11/7 21:12
*/
@Test
public void testAck() {

// //消费者接收到该消息,解析到true,就模拟调用channel.basicAck确认签收消息
// rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.test", "test msg send [true]");

//消费者接收到该消息,解析到false,就模拟调用channel.basicNack,拒收消息,让MQ重发
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.test", "test msg send [false]");
}

消费者端打开Ack模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
server:
port: 2002

spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: xxxxxxxxx
password: xxxxxxxxx
virtual-host: /LeoLee
listener:
simple:
acknowledge-mode: manual #消费者端确认模式:none自动确认 manual手动确认 auto通过抛出异常的类型,来做响应的处理
concurrency: 1 #当前监听的数量
max-concurrency: 5 #最大监听数量
retry:
enabled: true #是否支持重试
max-attempts: 4 #最大重试次数,默认为3

消费者端创建一个listener并实现ChannelAwareMessageListener接口(其实也可以不实现该接口,只要@RabbitListener标记的方法,或者@RabbitListener标记的类+@RabbitHandler标记的方法的参数列表有[com.rabbitmq.client.Channel]和[org.springframework.amqp.core.Message]两个参数,都可以)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.leolee.rabbitmq.MsgListener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* @ClassName AckListener
* @Description: Consumer Ack
* 1.设置手动确认签收:acknowledge-mode: manual, retry.enabled: true #是否支持重试
* 2.实现ChannelAwareMessageListener接口,ChannelAwareMessageListener是MessageListener的子接口
* 3.如果消息接收并处理完成,调用channel.basicAck()向MQ确认签收
* 4.如果消息接收但是业务处理失败,调用channel.basicNack()拒收,要求MQ重新发送
* @Author LeoLee
* @Date 2020/11/7
* @Version V1.0
**/
@Component
public class AckListener implements ChannelAwareMessageListener {


@RabbitListener(queues = "boot_queue")
@Override
public void onMessage(Message message, Channel channel) throws Exception {

Thread.sleep(1000);
boolean tag = new String(message.getBody()).contains("true");
System.out.println("接收到msg:" + new String(message.getBody()));
//获取mes deliveryTag
long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {
if (tag) {
System.out.println("业务处理成功");
//手动签收
/*
* deliveryTag:the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* multiple: ture确认本条消息以及之前没有确认的消息,false仅确认本条消息
*/
channel.basicAck(deliveryTag, false);
} else {
//模拟业务处理失败抛出异常
System.out.println("业务处理失败");
throw new IOException("业务处理失败");
}
} catch (IOException e) {
e.printStackTrace();
/*
* deliveryTag:the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* multiple: ture确认本条消息以及之前没有确认的消息,false仅确认本条消息
* requeue: true该条消息重新返回MQ queue,MQ broker将会重新发送该条消息
*/
channel.basicNack(deliveryTag, false, true);
//也可以使用channel.basicReject(deliveryTag, requeue),它只能拒收单条消息
//channel.basicReject(deliveryTag, true);
}

}
}

用生产者测试类发送不同的消息给MQ

成功接收并手动确认后,MQ队列就删除了该消息的缓存:

img

被拒绝的消息会一直发送:

img

8.3 消费端限流

RabbitMQ限流机制

RabbitMQ提供了QOS限流功能,在非自动确认消息的前提下,如果有一定数目的未被确认前,不消费新的消息。

在消费端开启限流机制

  1. channel.basicConsume(...)方法的autoAck参数改为false
  2. 设置Qos
1
channel.basicQos(0,1,false);
1
2
3
4
5
6
/**
* @param prefetchSize 消息的大小,设置为0不做任何限制
* @param prefetchCount 每次消费消息数
* @param global 为true channel级别的限流,false 消费端限流
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

生产端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//设置虚拟主机
connectionFactory.setVirtualHost("/");

//创建一个链接
Connection connection = connectionFactory.newConnection();

//创建channel
Channel channel = connection.createChannel();


String exchangeName="test_qos_exchange";
String routeKey="qos.test";
String msg="RabbitMQ QOS message QOS test!";
for (int i=0;i<=4;i++){
channel.basicPublish(exchangeName,routeKey,null,msg.getBytes());
}
}

消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public static void main(String[] args) throws  Exception{

ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//设置虚拟主机
connectionFactory.setVirtualHost("/");
//创建链接
Connection connection = connectionFactory.newConnection();

//创建channel
Channel channel = connection.createChannel();
String exchangeName="test_qos_exchange";
String exchangeType="topic";
//声明Exchange
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
String queueName="test_qos_queue";
//声明队列
channel.queueDeclare(queueName,true,false,false,null);
String routeKey="qos.#";
//绑定队列和交换机
channel.queueBind(queueName,exchangeName,routeKey);

//设置限流
channel.basicQos(0,1,false);

/**
* 把autoAck设置为false,限流才生效
*/
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息::"+new String(body));
//不批量签收
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}

如果想看限流效果可先把消费端的这行代码channel.basicAck(envelope.getDeliveryTag(),false);先注释。
在RabbitMQ控制台可以看到,有4条消息未被消费,一条消息等待签收确认。只有当前消息签收后,才消费剩余的消息

在这里插入图片描述

8.4 TTL

TTL队列/消息

TTL是Time To Live的缩写, 也就是生存时间
RabbitMQ支持消息的过期时间, 在消息发送时可以进行指定
RabbitMQ支持队列的过期时间, 从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除

代码演示

消费者中设置队列超时时间为10秒, 启动之后关闭消费者
生产者发送两条消息, 一条消息不设置超时时间, 一条消息设置5秒后超时
启动生产者之后, 监控RabbitMQ控制台, 发现5秒后设置了消息超时时间的消息先超时清除, 然后10秒后另外一条消息也超时清除

配置文件

image-20220111221928714

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.qiyexue.api.ttl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class Producer {

public static void main(String[] args) throws Exception {
// 1. 创建连接工厂, 设置属性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");

// 2. 创建连接
Connection connection = factory.newConnection();

// 3. 使用connection创建channel
Channel channel = connection.createChannel();

// 4. 通过channel发送消息
String msg = "hello rabbitmq!";
AMQP.BasicProperties properties = new AMQP.BasicProperties();
Map<String,Object> headers = new HashMap<String, Object>();
headers.put("name", "七夜雪");
properties = properties.builder()
// 设置编码为UTF8
.contentEncoding("UTF-8")
// 设置自定义Header
.headers(headers)
// 设置消息失效时间
.expiration("5000").build();

// 设置了消息超时时间为5秒, 5秒后消息自动删除
channel.basicPublish("", "test_ttl_queue", properties, msg.getBytes());
// 没有设置消息存活时间,消息存活时间根据队列来决定
channel.basicPublish("", "test_ttl_queue", null, msg.getBytes());

// 5. 关闭连接
channel.close();
connection.close();
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.qiyexue.api.ttl;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

import java.util.HashMap;
import java.util.Map;

public class Consumer {

public static void main(String[] args) throws Exception {
// 1. 创建连接工厂, 设置属性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");

// 2. 创建连接
Connection connection = factory.newConnection();

// 3. 使用connection创建channel
Channel channel = connection.createChannel();

// 4. 声明(创建)一个队列
String queueName = "test_ttl_queue";

Map<String, Object> arguments = new HashMap<>();
// 设置队列超时时间为10秒
arguments.put("x-message-ttl", 10000);
channel.queueDeclare(queueName,true, false, false, arguments);

// 5. 创建消费者
QueueingConsumer consumer = new QueueingConsumer(channel);

// 6. 设置channel
channel.basicConsume(queueName, true, consumer);
while (true) {
// 7. 获取消息
Delivery delivery = consumer.nextDelivery();
System.out.println(new String(delivery.getBody()));
// 获取head中内容
System.out.println(delivery.getProperties().getHeaders().get("name"));
}

}

}

8.5 死信队列介绍

  • 死信队列:DLX,dead-letter-exchange
  • 利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX

消息变成死信有以下几种情况:

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度

死信处理过程:

  • DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
  • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
  • 可以监听这个队列中的消息做相应的处理。
死信队列设置
  1. 首先需要设置死信队列的exchange和queue,然后进行绑定:

    1
    2
    3
    4
    Exchange: dlx.exchange
    Queue: dlx.queue
    RoutingKey: #
    #表示只要有消息到达了Exchange,那么都会路由到这个queue上
  2. 然后需要有一个监听,去监听这个队列进行处理

  3. 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可:arguments.put(" x-dead-letter-exchange","dlx.exchange");,这样消息在过期、requeue、 队列在达到最大长度时,消息就可以直接路由到死信队列!

生产端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Producer {

public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.43.157");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 获取Connection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();

String exchange = "test_dlx_exchange";
String routingKey = "dlx.save";

String msg = "Hello RabbitMQ DLX Message";

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
//发送消息
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}

自定义消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MyConsumer extends DefaultConsumer {

public MyConsumer(Channel channel) {
super(channel);
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
消费端
  • 声明正常处理消息的交换机、队列及绑定规则
  • 在正常交换机上指定死信发送的Exchange
  • 声明死信交换机、队列及绑定规则
  • 监听死信队列,进行后续处理,这里省略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Consumer {

public static void main(String[] args) throws Exception {

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.43.157");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

// 声明一个普通的交换机 和 队列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);
//指定死信发送的Exchange
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//这个agruments属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);

//要进行死信队列的声明
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");

channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
运行说明

启动消费端,此时查看管控台,新增了两个Exchange,两个Queue。在test_dlx_queue上我们设置了DLX,也就代表死信消息会发送到指定的Exchange上,最终其实会路由到dlx.queue上。

img

此时关闭消费端,然后启动生产端,查看管控台队列的消息情况,test_dlx_queue的值为1,而dlx_queue的值为0。
10s后的队列结果如图,由于生产端发送消息时指定了消息的过期时间为10s,而此时没有消费端进行消费,消息便被路由到死信队列中。

img

8.6 延迟队列

延迟队列存储的对象肯定是对应的延迟消息,所谓”延迟消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延迟队列将订单信息发送到延迟队列。

场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到只能设备。
RabbitMQ怎么实现延迟队列

AMQP协议,以及RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过TTL和DLX模拟出延迟队列的功能。

TTL(Time To Live)

RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter

RabbitMQ针对队列中的消息过期时间有两种方法可以设置。

  • A: 通过队列属性设置,队列中所有消息都有相同的过期时间。
  • B: 对消息进行单独设置,每条消息TTL可以不同。

如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter

DLX (Dead-Letter-Exchange)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由。

  • x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
  • x-dead-letter-routing-key:指定routing-key发送

队列出现dead letter的情况有:

  • 消息或者队列的TTL过期
  • 队列达到最大长度
  • 消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false

利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。这时候消息就可以重新被消费。

详细可以参考: RabbitMQ之死信队列

实例

首先建立2个exchange和2个queue:

exchange_delay_begin:这个是producer端发送时调用的exchange, 将消息发送至queue_dealy_begin中。
queue_delay_begin: 通过routingKey="delay"绑定exchang_delay_begin, 同时配置DLX=exchange_delay_done, 当消息变成死信时,发往exchange_delay_done中。
exchange_delay_done: 死信的exchange, 如果不配置x-dead-letter-routing-key则采用原有默认的routingKey,即queue_delay_begin绑定exchang_delay_beghin采用的“delay”。
queue_delay_done:消息在TTL到期之后,最终通过exchang_delay_done发送值此queue,消费端通过消费此queue的消息,即可以达到延迟的效果。

建立exchange和queue的代码(当然这里可以通过RabbitMQ的管理界面来实现,无需code相关代码):

1
2
3
4
5
6
7
8
9
10
channel.exchangeDeclare("exchange_delay_begin", "direct", true);
channel.exchangeDeclare("exchange_delay_done", "direct", true);

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "exchange_delay_done");
channel.queueDeclare("queue_delay_begin", true, false, false, args);
channel.queueDeclare("queue_delay_done", true, false, false, null);

channel.queueBind("queue_delay_begin", "exchange_delay_begin", "delay");
channel.queueBind("queue_delay_done", "exchange_delay_done", "delay");

消费端

1
2
3
4
5
6
7
8
9
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("queue_delay_done", false, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("receive msg time:" + new Date() + ", msg body:" + msg);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

生产端

1
2
3
4
5
6
7
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("60000");//设置消息TTL
builder.deliveryMode(2);//设置消息持久化
AMQP.BasicProperties properties = builder.build();

String message = String.valueOf(new Date());
channel.basicPublish("exchange_delay_begin","delay",properties,message.getBytes());

在创建完exchange和queue之后,首先执行consumer端的代码,之后执行producer端的代码,待producer发送完毕之后,查看consumer端的输出:

1
receive msg time:Tue Feb 14 21:06:19 CST 2017, msg body:Tue Feb 14 21:05:19 CST 2017