分布式消息通信 - RabbitMQ

RabbitMQ

AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列和路由,可靠且安全。RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 Ajax。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ相关概念

概念 描述
VHost Virtual Host只是起到一个命名空间的作用
Broker 消息队列服务器的实体,用于存储转发消息,可以把它看成MQ的Server端
Exchange 接收消息并将其路由到一个或多个队列,路由算法决定消息按什么规则、路由到哪个队列
Queue 用来存储消息,是消息的容器,它只受主机内存和磁盘的限制,是消息弹出之前的最终目的地
Binding 把 Exchange 和 Queue 按照路由算法绑定起来
RoutingKey 路由规则,Exchange 根据这个规则进行消息路由
Producter 消息生产者,产生消息的程序
Consumer 消息消费者,消费消息的程序
Channel Channel 是进行消息读写的通道,在客户端的每个连接里可建立多个 Channel,每个 Channel 代表一个会话

Exchange: Takes a message and routes it to one or more queues. Routing algorithms decides where to send the message from the exchange. Routing algorithms depends on the exchange type and rules called “bindings”.

Exchange Type Routing Algorithms Purpose
Direct It routes messages with a routing key equal to the routing key declared by the binding queue This is a Default exchange type. It is used when a message needs to send to a queue
Fanout It routes messages to all the queues from the bound exchange. If routing key is provided then it will be ignored Useful for broadcast feature using publish subscribe pattern
Topic It routes messages to queues based on either full or a portion of routing key matches Useful for broadcast to specific queues based on some criteria

RabbitMQ消息处理流程

RabbitMQ安装

RabbitMQ 是基于 Erlang 语言开发的,所以首先必须安装 Erlang 运行时环境。

  1. 下载安装 Erlang

  2. 下载安装 RabbitMQ

  3. 修改 RabbitMQ 配置文件

    1
    2
    $ cd ${RABBITMQ_HOME}/etc
    $ copy rabbitmq.config.example rabbitmq.config
  4. 启用相关插件

    1
    $ rabbitmq-plugins enable rabbitmq_management rabbitmq_web_stomp rabbitmq_stomp
  5. 启动 RabbitMQ 服务

  6. 通过访问 http://127.0.0.1:15672 用 guest/guest 登录验证是否安装成功

默认情况下,RabbitMQ的默认的guest用户只允许本机访问,如果想让guest用户能够远程访问的话,只需要将配置文件中的loopback_users列表置为空即可{loopback_users, []};另外关于新添加的用户,直接就可以从远程访问的,如果想让新添加的用户只能本地访问,可以将用户名添加到上面的列表,如只允许admin用户本机访问{loopback_users, ["admin"]}

五种消息模型

基本消息模型

一个生产者,一个消费者,生产者生产的消息直接被消费者消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class SimpleTest {

private static final String QUEUE_NAME = "simple_queue";

private Connection connection;

@Before
public void setUp() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//factory.setHost(ConnectionFactory.DEFAULT_HOST);
//factory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT);
//factory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST);
//factory.setUsername(ConnectionFactory.DEFAULT_USER);
//factory.setPassword(ConnectionFactory.DEFAULT_PASS);
connection = factory.newConnection();
}

@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
}

@Test
public void testSend() throws Exception {
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
}

@Test
public void testRecv() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages...");
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) { // autoAck为true时则消息一收到立马会确认
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(envelope.getDeliveryTag(), false); // 手动ACK
latch.countDown();
}
});
latch.await();
}

}

Work消息模型

在基本消息模型中,当消息处理比较耗时的时候,生产者生产消息的速度会远远快于消费者消费的速度,那就可能出现消息的堆积。此时就可以使用Work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费就会消失,因此任务是不会被重复执行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class WorkTest {

private static final String QUEUE_NAME = "work_queue";

private Connection connection;

@Before
public void setUp() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
connection = factory.newConnection();
}

@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
}

@Test
public void testSend() throws Exception {
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 1; i <= 50; i++) {
String message = "task_" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
channel.close();
}

@Test
public void testRecv() throws Exception {
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
System.out.println(" [*] Waiting for messages...");
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
Thread.currentThread().join();
}

}

订阅模型 - Fanout(广播)

生产者发送的消息,没有直接发送到队列,而是发送到了交换机,交换机把消息发送给绑定过的所有队列,队列的消费者都能拿到消息,生产者发送的消息经过交换机到达队列,实现一条消息被多个消费者消费。需要注意的是,如果将消息发送到一个没有队列绑定的Exchange上面,那么该消息将会丢失,这是因为在RabbitMQ中Exchange只负责转发消息不具备存储消息的能力,只有队列具备存储消息的能力。

Fanout 完全不关心key,直接采取广播的方式进行消息投递,任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。

在基本消息模型和Work消息模型中,一条消息只能被一个消息者消费;在Fanout模型中,一条消息会被所有订阅的队列消费(一个生产者多个消费者)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class FanoutTest {

private static final String EXCHANGE_NAME = "fanout_exchange";
private static final String QUEUE_NAME = "fanout_exchange_queue";

private Connection connection;

@Before
public void setUp() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
connection = factory.newConnection();
}

@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
}

@Test
public void testSend() throws Exception {
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = "Hello RabbitMQ!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
}

@Test
public void testRecv() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 绑定Queue到Exchange
System.out.println(" [*] Waiting for messages...");
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(envelope.getDeliveryTag(), false);
latch.countDown();
}
});
latch.await();
}

}

订阅模型 - Direct(定向)

在Fanout模型中,一条消息会被所有订阅的队列都消费,但是,在某些场景下,我们希望不同的消息被不同的队列消费,这时就要用到Direct Exchange模型:

  • Queue与Exchange的绑定,不能是任意绑定了,而是要指定一个RoutingKey;
  • 消息的发送方在向Exchange发送消息时,也必须指定消费的RoutingKey;
  • Exchange不再把消息投递给每一个绑定的队列,而是根据消息的RoutingKey来进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收到消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class DirectTest {

private static final String EXCHANGE_NAME = "direct_exchange";
private static final String QUEUE_NAME = "direct_exchange_queue";

private Connection connection;

@Before
public void setUp() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
connection = factory.newConnection();
}

@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
}

@Test
public void testSend() throws Exception {
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message = "Hello RabbitMQ!";
channel.basicPublish(EXCHANGE_NAME, "order.create", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
}

@Test
public void testRecv() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "order.create"); // 使用routingKey绑定queue与exchange
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "order.update");
System.out.println(" [*] Waiting for messages...");
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(envelope.getDeliveryTag(), false);
latch.countDown();
}
});
latch.await();
}

}

订阅模型 - Topic(通配符)

Topic与Direct非常相似,只不过Topic Exchange可以让队列在绑定RoutingKey时使用通配符。符号 # 表示匹配一个或多个词,符号 * 表示仅匹配一个词。

RoutingKey一般是由一个或多个单词组成,多个单词之间以.分割。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class TopicTest {

private static final String EXCHANGE_NAME = "topic_exchange";
private static final String QUEUE_NAME = "topic_exchange_queue";

private Connection connection;

@Before
public void setUp() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
connection = factory.newConnection();
}

@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
}

@Test
public void testSend() throws Exception {
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String message = "Hello RabbitMQ!";
channel.basicPublish(EXCHANGE_NAME, "order.create", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
}

@Test
public void testRecv() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "order.#");
System.out.println(" [*] Waiting for messages...");
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(envelope.getDeliveryTag(), false);
latch.countDown();
}
});
latch.await();
}

}

持久化

交换机持久化

1
2
3
Channel channel = connection.createChannel();
// 第三个参数durable设置为true
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);

队列持久化

1
2
3
Channel channel = connection.createChannel();
// 第二个参数durable设置为true
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

消息持久化

1
2
3
String message = "Hello World!";
// 第三个参数BasicProperties设置为MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish(EXCHANGE_NAME, "order.create", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

高级特性

生产者确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class PublisherCallbackTest {

private Connection connection;

@Before
public void setUp() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
connection = factory.newConnection();
}

@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
}

/**
* Confirm模式只能保证消息到达Exchange却不能保证消息准确投递到目标Queue中
*/
@Test
public void testConfirmCallback() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
Channel channel = connection.createChannel();
channel.confirmSelect(); // 启用生产者确认
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
log.debug("=== Ack ===");
latch.countDown();
}

@Override
public void handleNack(long deliveryTag, boolean multiple) {
log.debug("=== Nack ===");
latch.countDown();
}
});
channel.exchangeDeclare("order.exch", DIRECT);
String msg = "Test confirm callback";
channel.basicPublish("order.exch", "order.create", null, msg.getBytes());
latch.await();
channel.close();
}

/**
* Return模式用于处理一些不可路由的消息, 配合mandatory使用(值为true表示接收路由不可达的消息, 为false表示broker自动删除不可路由的消息而不会触发ReturnCallback)
*/
@Test
public void testReturnCallback() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
Channel channel = connection.createChannel();
channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
log.error("消息无法从交换机路由到队列, 原因: {}", replyText);
latch.countDown();
});
channel.exchangeDeclare("order.exch", DIRECT);
String msg = "Test confirm callback";
channel.basicPublish("order.exch", "order.create.err", true, null, msg.getBytes());
latch.await();
channel.close();
}

}

消费端限流策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class QosTest {

private static final String EXCHANGE_NAME = "order.exch";
private static final String ROUTING_KEY = "order.create";
private static final String QUEUE_NAME = "order-create-queue";

private Connection connection;

@Before
public void setUp() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
connection = factory.newConnection();
}

@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
}

@Test
public void testQos() throws Exception {
CountDownLatch latch = new CountDownLatch(5);
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, DIRECT);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
for (int i = 1; i <= 5; i++) {
String msg = "Hello RabbitMQ " + i;
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes());
}
channel.basicQos(0, 1, false); // 手动确认模式下限流才生效
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug(" [x] Received '{}'", new String(body));
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException ignored) {
}
channel.basicAck(envelope.getDeliveryTag(), false);
latch.countDown();
}
});
latch.await();
channel.close();
}
}

消费端ACK与重回队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class RequeueTest {

private static final String EXCHANGE_NAME = "order.exch";
private static final String ROUTING_KEY = "order.create";
private static final String QUEUE_NAME = "order-create-queue";

private Connection connection;

@Before
public void setUp() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
connection = factory.newConnection();
}

@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
}

@Test
public void testRequeue() throws Exception {
CountDownLatch latch = new CountDownLatch(5);
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, DIRECT);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
for (int i = 1; i <= 5; i++) {
String msg = "Hello RabbitMQ " + i;
AMQP.BasicProperties prop = new AMQP.BasicProperties().builder()
.headers(Collections.singletonMap("x-counter", i))
.build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, prop, msg.getBytes());
}
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug(" [x] Received '{}'", new String(body));
Integer count = (Integer) properties.getHeaders().get("x-counter");
if (count == 1) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
latch.countDown();
}
}
});
latch.await();
channel.close();
}

}

TTL消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class TTLTest {

private static final String EXCHANGE_NAME = "order.exch";
private static final String ROUTING_KEY = "order.create";
private static final String QUEUE_NAME = "order-create-queue";

private Connection connection;

@Before
public void setUp() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
connection = factory.newConnection();
}

@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
}

@Test
public void testTTL() throws Exception {
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, DIRECT);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
String msg = "Hello RabbitMQ";
AMQP.BasicProperties prop = new AMQP.BasicProperties().builder()
.expiration("5000")
.build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, prop, msg.getBytes());
channel.close();
}

}

死信队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class DLXTest {

private static final String DLX_EXCHANGE_NAME = "dlx.exch";
private static final String DLX_ROUTING_KEY = "#";
private static final String DLX_QUEUE_NAME = "dlx-queue";
private static final String EXCHANGE_NAME = "order.exch";
private static final String ROUTING_KEY = "order.create";
private static final String QUEUE_NAME = "order-create-queue";

private Connection connection;

@Before
public void setUp() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
connection = factory.newConnection();
}

@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
}

@Test
public void testDLX() throws Exception {
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, DIRECT);
channel.queueDeclare(QUEUE_NAME, false, false, false,
Collections.singletonMap("x-dead-letter-exchange", DLX_EXCHANGE_NAME));
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 死信队列声明
channel.exchangeDeclare(DLX_EXCHANGE_NAME, TOPIC);
channel.queueDeclare(DLX_QUEUE_NAME, false, false, false, null);
channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, DLX_ROUTING_KEY);
// 消息变成死信的情况: (1)TTL过期 (2)消息被拒绝(basicReject/basicNack)且requeue为false (3)队列达到最大值
String msg = "Hello RabbitMQ";
AMQP.BasicProperties prop = new AMQP.BasicProperties().builder()
.expiration("5000")
.build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, prop, msg.getBytes());
channel.close();
}
}

延迟队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class DelayedPluginTest {

private Connection connection;

@Before
public void setUp() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
connection = factory.newConnection();
}

@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
}

@Test
public void testDelayedMessage() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
Channel channel = connection.createChannel();
channel.exchangeDeclare("order-exchange", "x-delayed-message", true, false,
Collections.singletonMap("x-delayed-type", DIRECT.getType()));
channel.queueDeclare("order-create-queue", false, false, false, null);
channel.queueBind("order-create-queue", "order-exchange", "order.create");
AMQP.BasicProperties prop = new AMQP.BasicProperties().builder()
.headers(Collections.singletonMap("x-delay", 5000))
.build();
String msg = "Test delayed message";
log.debug(" [x] Send '{}'", msg);
channel.basicPublish("order-exchange", "order.create", prop, msg.getBytes());
channel.basicConsume("order-create-queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug(" [x] Received '{}'", new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
latch.countDown();
}
});
latch.await();
}

}

幂等性

所谓幂等性,数学上定义为f(n)=f(1),表示n被函数f作用一次和作用多次的结果是一样的。在软件系统中,表示某个接口使用相同参数调用一次或者多次其造成的后果是一样的。

消息表

在数据库里面,添加一张消息消费记录表,表字段加上唯一约束条件,消费完之后就往表里写入一条数据,因为加了唯一约束条件,第二次保存时,数据库就会报错回滚事务,这样通过数据库唯一索引就可以防止重复消费。

乐观锁

乐观锁,大多是基于数据版本(version)记录机制实现。何谓数据版本?即为数据增加一个版本标识,在基于数据库表的版本解决方案中,一般是通过为数据库表增加一个 version 字段来实现。读取出数据时,将此版本号一同读出,之后更新时,对此版本号加一。此时,将提交数据的版本数据与数据库表对应记录的当前版本信息进行比对,如果提交的数据版本号大于数据库表当前版本号,则予以更新,否则认为是过期数据。

分布式锁

如果你的数据库将来不会分库分表,那么可以在业务表字段加上唯一约束,这样相同的数据就不会保存多份;如果你的数据库做了分库分表,那么可以使用 Redis 或 Zookeeper 对消息id加锁来防止消息被重复消费。

可靠性投递方案

生产者:

  • 保证消息成功发出
  • 保证Broker成功接收
  • 生产者收到Broker确认应答
  • 完善的消息补偿机制

消费者:

  • 手动确认

生产者:生产者向Broker发送消息,但由于网络波动,Broker可能会没收到该条消息,所以当Broker收到消息后需要向生产者发送回执(如果是失败的回执,生产者需要进行重发)。需要注意的是,生产者确认只能保证消息到达Exchange却不能保证消息准确投递到目标Queue中,即如果RabbitMQ找不到任何需要投递的Queue队列,那么RabbitMQ依然后发ack给生产者,此时生产者可以认为消息已经正确投递,而不用关心消息没有Queue接收的问题(这是RabbitMQ和消息的接收方需要考虑的事情),生产者只需要保证消息能够发送到Exchange即可。在实际生产中,很难保障前三点的完全可靠,在某些极端的情况下,比如生产者向Broker发送消息过程中失败了,或者在Broker在返回确认应答过程中出现网络闪断等现象,所以要保障消息可靠性投递还需要有完善的消息补偿机制。

消费者:在RabbitMQ中,消息默认是自动ack的,即消息到达消费端立即ack而不管消费端业务是否处理成功,在自动ack模式下,如果业务处理失败或者出现突然宕机现象就会导致消息丢失,因此可以开启手动确认模式由消费端自行决定何时ack。

可靠性投递方案:

  1. 消息落库

消息可靠性投递方案之消息落库

  1. 延迟投递做二次确认

消息可靠性投递方案之延迟投递

Spring整合RabbitMQ

整合配置

XML配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

<context:property-placeholder location="classpath:application.properties"/>

<!-- ==================== RabbitMQ Common Config ==================== -->

<task:executor id="taskExecutor"
pool-size="8"
keep-alive="120"
queue-capacity="100"
rejection-policy="CALLER_RUNS"/>

<task:scheduler id="taskScheduler" pool-size="8"/>

<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host:localhost}"
port="${rabbitmq.port:5672}"
virtual-host="${rabbitmq.vhost:/}"
username="${rabbitmq.username:guest}"
password="${rabbitmq.password:guest}"
publisher-confirms="true"
executor="taskExecutor"/>

<!-- The default exchange and routingKey are empty. -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>

<rabbit:admin connection-factory="connectionFactory"/>

<!-- ==================== Queues ==================== -->

<!-- 声明Queue -->
<rabbit:queue id="directExchangeQueue" name="direct_exchange_queue"/>

<!-- ==================== Bindings ==================== -->

<!-- 绑定Queue到Exchange -->
<rabbit:direct-exchange name="direct_exchange">
<rabbit:bindings>
<rabbit:binding queue="directExchangeQueue" key="order.create"/>
<rabbit:binding queue="directExchangeQueue" key="order.update"/>
</rabbit:bindings>
</rabbit:direct-exchange>

<!-- ==================== Listener ==================== -->

<!-- 绑定Queue和Listener -->
<rabbit:listener-container type="direct" connection-factory="connectionFactory"
task-executor="taskExecutor" task-scheduler="taskScheduler">
<rabbit:listener ref="messageListener" queues="directExchangeQueue"/>
</rabbit:listener-container>

<bean id="messageListener" class="com.gavin.ssm.msg.core.mq.MessageHandler"/>

</beans>
1
2
3
4
5
6
7
public class MessageHandler implements MessageListener {

public void onMessage(Message message) {
System.err.println(new String(message.getBody(), StandardCharsets.UTF_8));
}

}

注解配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

<context:property-placeholder location="classpath:application.properties"/>

<!-- ==================== RabbitMQ Common Config ==================== -->

<task:executor id="taskExecutor"
pool-size="8"
keep-alive="120"
queue-capacity="100"
rejection-policy="CALLER_RUNS"/>

<task:scheduler id="taskScheduler" pool-size="8"/>

<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host:localhost}"
port="${rabbitmq.port:5672}"
virtual-host="${rabbitmq.vhost:/}"
username="${rabbitmq.username:guest}"
password="${rabbitmq.password:guest}"
publisher-confirms="true"
executor="taskExecutor"/>

<!-- The default exchange and routingKey are empty. -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>

<rabbit:admin connection-factory="connectionFactory"/>

<!-- ==================== @RabbitListener Support ==================== -->

<!-- 使用@RabbitListener注解方式若不指定containerFactory则默认为rabbitListenerContainerFactory -->
<rabbit:annotation-driven container-factory="rabbitListenerContainerFactory"/>

<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="taskExecutor" ref="taskExecutor"/>
<property name="taskScheduler" ref="taskScheduler"/>
<!-- The container acknowledge the message automatically unless MessageListener throws an exception -->
<property name="acknowledgeMode" value="AUTO"/>
<property name="prefetchCount" value="1"/>
</bean>

</beans>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
@Component
public class MessageListener {

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "directExchangeQueue", durable = "true"),
exchange = @Exchange(value = "direct_exchange", type = ExchangeTypes.DIRECT),
key = "order.create"
))
public void onMessage(String message) {
log.info("====== {} ======", message);
}

}

整合测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:spring-rabbitmq.xml")
public class SpringRabbitmqTest {

private static final String EXCHANGE_NAME = "order.exch";
private static final String ROUTING_KEY = "order.create";
private static final String QUEUE_NAME = "order-queue";

@Autowired
private RabbitAdmin rabbitAdmin;

@Autowired
private AmqpTemplate amqpTemplate;

@Before
public void setUp() {
Assert.assertNotNull(rabbitAdmin);
Assert.assertNotNull(amqpTemplate);
}

@Test
public void testManage() {
rabbitAdmin.declareExchange(new DirectExchange(EXCHANGE_NAME));
rabbitAdmin.declareQueue(new Queue(QUEUE_NAME));
rabbitAdmin.declareBinding(new Binding(QUEUE_NAME, DestinationType.QUEUE, EXCHANGE_NAME, ROUTING_KEY, null));
}

@Test
public void testSend() {
amqpTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "订单创建");
}

}

参考链接