RabbitMQ
AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列和路由,可靠且安全。RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 Ajax。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ相关概念
概念 | 描述 |
---|---|
VHost | Virtual Host只是起到一个命名空间的作用 |
Broker | 消息队列服务器的实体,用于存储转发消息,可以把它看成MQ的Server端 |
Exchange | 接收消息并将其路由到一个或多个队列,路由算法决定消息按什么规则、路由到哪个队列 |
Queue | 用来存储消息,是消息的容器,它只受主机内存和磁盘的限制,是消息弹出之前的最终目的地 |
Binding | 把 Exchange 和 Queue 按照路由算法绑定起来 |
RoutingKey | 路由规则,Exchange 根据这个规则进行消息路由 |
Producter | 消息生产者,产生消息的程序 |
Consumer | 消息消费者,消费消息的程序 |
Channel | Channel 是进行消息读写的通道,在客户端的每个连接里可建立多个 Channel,每个 Channel 代表一个会话 |
Exchange: Takes a message and routes it to one or more queues. Routing algorithms decides where to send the message from the exchange. Routing algorithms depends on the exchange type and rules called “bindings”.
Exchange Type | Routing Algorithms | Purpose |
---|---|---|
Direct | It routes messages with a routing key equal to the routing key declared by the binding queue | This is a Default exchange type. It is used when a message needs to send to a queue |
Fanout | It routes messages to all the queues from the bound exchange. If routing key is provided then it will be ignored | Useful for broadcast feature using publish subscribe pattern |
Topic | It routes messages to queues based on either full or a portion of routing key matches | Useful for broadcast to specific queues based on some criteria |
RabbitMQ安装
RabbitMQ 是基于 Erlang 语言开发的,所以首先必须安装 Erlang 运行时环境。
下载安装 Erlang
下载安装 RabbitMQ
修改 RabbitMQ 配置文件
1
2$ cd ${RABBITMQ_HOME}/etc
$ copy rabbitmq.config.example rabbitmq.config启用相关插件
1
$ rabbitmq-plugins enable rabbitmq_management rabbitmq_web_stomp rabbitmq_stomp
启动 RabbitMQ 服务
通过访问 http://127.0.0.1:15672 用 guest/guest 登录验证是否安装成功
默认情况下,RabbitMQ的默认的guest用户只允许本机访问,如果想让guest用户能够远程访问的话,只需要将配置文件中的
loopback_users
列表置为空即可{loopback_users, []}
;另外关于新添加的用户,直接就可以从远程访问的,如果想让新添加的用户只能本地访问,可以将用户名添加到上面的列表,如只允许admin用户本机访问{loopback_users, ["admin"]}
五种消息模型
基本消息模型
一个生产者,一个消费者,生产者生产的消息直接被消费者消费。
1 | public class SimpleTest { |
Work消息模型
在基本消息模型中,当消息处理比较耗时的时候,生产者生产消息的速度会远远快于消费者消费的速度,那就可能出现消息的堆积。此时就可以使用Work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费就会消失,因此任务是不会被重复执行的。
1 | public class WorkTest { |
订阅模型 - Fanout(广播)
生产者发送的消息,没有直接发送到队列,而是发送到了交换机,交换机把消息发送给绑定过的所有队列,队列的消费者都能拿到消息,生产者发送的消息经过交换机到达队列,实现一条消息被多个消费者消费。需要注意的是,如果将消息发送到一个没有队列绑定的Exchange上面,那么该消息将会丢失,这是因为在RabbitMQ中Exchange只负责转发消息不具备存储消息的能力,只有队列具备存储消息的能力。
Fanout 完全不关心key,直接采取广播
的方式进行消息投递,任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
在基本消息模型和Work消息模型中,一条消息只能被一个消息者消费;在
Fanout
模型中,一条消息会被所有订阅的队列消费(一个生产者多个消费者)。
1 | public class FanoutTest { |
订阅模型 - Direct(定向)
在Fanout模型中,一条消息会被所有订阅的队列都消费,但是,在某些场景下,我们希望不同的消息被不同的队列消费,这时就要用到Direct Exchange模型:
- Queue与Exchange的绑定,不能是任意绑定了,而是要指定一个RoutingKey;
- 消息的发送方在向Exchange发送消息时,也必须指定消费的RoutingKey;
- Exchange不再把消息投递给每一个绑定的队列,而是根据消息的RoutingKey来进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收到消息。
1 | public class DirectTest { |
订阅模型 - Topic(通配符)
Topic与Direct非常相似,只不过Topic Exchange可以让队列在绑定RoutingKey时使用通配符。符号 #
表示匹配一个或多个词,符号 *
表示仅匹配一个词。
RoutingKey一般是由一个或多个单词组成,多个单词之间以
.
分割。
1 | public class TopicTest { |
持久化
交换机持久化
1 | Channel channel = connection.createChannel(); |
队列持久化
1 | Channel channel = connection.createChannel(); |
消息持久化
1 | String message = "Hello World!"; |
高级特性
生产者确认
1 | public class PublisherCallbackTest { |
消费端限流策略
1 | public class QosTest { |
消费端ACK与重回队列
1 | public class RequeueTest { |
TTL消息
1 | public class TTLTest { |
死信队列
1 | public class DLXTest { |
延迟队列
1 | public class DelayedPluginTest { |
幂等性
所谓幂等性,数学上定义为f(n)=f(1)
,表示n被函数f作用一次和作用多次的结果是一样的。在软件系统中,表示某个接口使用相同参数调用一次或者多次其造成的后果是一样的。
消息表
在数据库里面,添加一张消息消费记录表,表字段加上唯一约束条件,消费完之后就往表里写入一条数据,因为加了唯一约束条件,第二次保存时,数据库就会报错回滚事务,这样通过数据库唯一索引就可以防止重复消费。
乐观锁
乐观锁,大多是基于数据版本(version)记录机制实现。何谓数据版本?即为数据增加一个版本标识,在基于数据库表的版本解决方案中,一般是通过为数据库表增加一个 version
字段来实现。读取出数据时,将此版本号一同读出,之后更新时,对此版本号加一。此时,将提交数据的版本数据与数据库表对应记录的当前版本信息进行比对,如果提交的数据版本号大于数据库表当前版本号,则予以更新,否则认为是过期数据。
分布式锁
如果你的数据库将来不会分库分表,那么可以在业务表字段加上唯一约束,这样相同的数据就不会保存多份;如果你的数据库做了分库分表,那么可以使用 Redis 或 Zookeeper 对消息id加锁来防止消息被重复消费。
可靠性投递方案
生产者:
- 保证消息成功发出
- 保证Broker成功接收
- 生产者收到Broker确认应答
- 完善的消息补偿机制
消费者:
- 手动确认
生产者:生产者向Broker发送消息,但由于网络波动,Broker可能会没收到该条消息,所以当Broker收到消息后需要向生产者发送回执(如果是失败的回执,生产者需要进行重发)。需要注意的是,生产者确认只能保证消息到达Exchange却不能保证消息准确投递到目标Queue中,即如果RabbitMQ找不到任何需要投递的Queue队列,那么RabbitMQ依然后发ack给生产者,此时生产者可以认为消息已经正确投递,而不用关心消息没有Queue接收的问题(这是RabbitMQ和消息的接收方需要考虑的事情),生产者只需要保证消息能够发送到Exchange即可。在实际生产中,很难保障前三点的完全可靠,在某些极端的情况下,比如生产者向Broker发送消息过程中失败了,或者在Broker在返回确认应答过程中出现网络闪断等现象,所以要保障消息可靠性投递还需要有完善的消息补偿机制。
消费者:在RabbitMQ中,消息默认是自动ack的,即消息到达消费端立即ack而不管消费端业务是否处理成功,在自动ack模式下,如果业务处理失败或者出现突然宕机现象就会导致消息丢失,因此可以开启手动确认模式由消费端自行决定何时ack。
可靠性投递方案:
- 消息落库
- 延迟投递做二次确认
Spring整合RabbitMQ
整合配置
XML配置
1 |
|
1 | public class MessageHandler implements MessageListener { |
注解配置
1 |
|
1 | 4j |
整合测试
1 | .class) (SpringJUnit4ClassRunner |