分布式消息通信 - ActiveMQ

简介

消息队列中间件是服务架构中常见的组件,可用于异步处理、应用解耦、流量削锋、消息通讯等场景,是大型分布式系统不可缺少的中间件。ActiveMQ是一个用Java编写的、完全支持JMS1.1规范的开源消息中间件,支持REST、AMQP、STOMP、WS-Notification、MQTT、XMPP、OpenWire协议。

消息队列组成

消息队列组成

  • 生产者(Producer):负责产生消息
  • 消息代理(Message Broker):负责存储和转发消息
  • 消费者(Consumer):负责消费消息

Broker负责把消息从发送端传送到接收端,可以把它看成是MQ的服务端。
转发分为推和拉两种:拉是指Consumer主动从Message Broker获取消息,推是指Message Broker主动将Consumer感兴趣的消息推送给Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 自定义broker
*/
public class BrokerServer {

public static void main(String[] args) throws Exception {
BrokerService brokerServer = new BrokerService();
brokerServer.setUseJmx(Boolean.TRUE);
brokerServer.addConnector("tcp://127.0.0.1:61616");
brokerServer.start();
}

}

可靠性机制

默认情况下,生产者发送的消息是持久化的(同步发送),消息发送到broker后,Producer会等待broker对这条消息的处理情况反馈。同步发送持久消息能够提供更好的可靠性,但这潜在地影响了程序的响应速度,因为在接受到 broker 的确认消息之前应用程序或线程会被阻塞。如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。

异步发送不会在收到 broker 的确认之前一直阻塞 MessageProducer.send(message) 方法。如果想启动异步传送可以在 brokerURL 中配置 jms.useAsyncSend=true 选项,如:tcp://127.0.0.1:61616?jms.useAsyncSend=true。但即使是异步发送消息,生产者也是在收到 broker 的确认应答后才把下一条消息传送给broker。当使用异步传送的时候,可以通过设置 jms.producerWindowSize(单位为字节)属性来控制发送端无节制地向 broker 发送消息,如: tcp://127.0.0.1:61616?jms.useAsyncSend=true&jms.producerWindowSize=1024000,设置后,当达到了 producerWindowSize 上限,即使是异步调用也会被阻塞。

非持久化消息默认就是异步发送的,且 producerWindowSize 设置只对异步发送有意义。

消息确认模式

在事务性会话中,acknowledgeMode被JMS服务器自动设置为SESSION_TRANSACTED模式。在该模式下,发送端只有commit后消息才能发送出去,接收端在commit后会自动签收消息。而在非事务会话中,消息何时被确认取决于创建会话时的应答模式。

应答模式 描述
AUTO_ACKNOWLEDGE 自动确认模式,当接收端成功从MessageConsumer.receive()或MessageListener.onMessage(message)方法返回后,会话会自动确认该消息
CLIENT_ACKNOWLEDGE 客户端手动确认,接收端通过调用Message.acknowledge()方法手动确认消息
DUPS_OK_ACKNOWLEDGE 延迟确认模式

说明:这三种应答模式只在接收端设置有效。

消息传送模型

点对点模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class ActivemqTest {

private static final String BROKER_URL = "tcp://127.0.0.1:61616";

private static final String QUEUE_NAME = "my-queue";

private Connection connection;

@Before
public void setUp() throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
connection = factory.createConnection();
// Starts (or restarts) a connection's delivery of incoming messages.
connection.start();
}

@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
}

@Test
public void testQueueSender() throws Exception {
Session session = connection.createSession(Boolean.TRUE, Session.SESSION_TRANSACTED);
Destination queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("Hello ActiveMQ");
producer.send(message);
session.commit();
session.close();
}

@Test
public void testQueueReceiver() throws Exception {
Session session = connection.createSession(Boolean.TRUE, Session.SESSION_TRANSACTED);
Destination queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
// This call blocks indefinitely until a message is produced or until this message consumer is closed.
TextMessage message = (TextMessage) consumer.receive();
System.out.println(message.getText());
session.commit();
session.close();
}

}

发布/订阅模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class ActivemqTest {

private static final String BROKER_URL = "tcp://127.0.0.1:61616";

private static final String TOPIC_NAME = "my-topic";

private Connection connection;

@Before
public void setUp() throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
connection = factory.createConnection();
// Starts (or restarts) a connection's delivery of incoming messages.
connection.start();
}

@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
}

@Test
public void testTopicPublisher() throws Exception {
Session session = connection.createSession(Boolean.TRUE, Session.SESSION_TRANSACTED);
Destination topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
Message message = session.createTextMessage("Hello ActiveMQ");
producer.send(message);
session.commit();
session.close();
}

@Test
public void testTopicSubscriber() throws Exception {
Session session = connection.createSession(Boolean.TRUE, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(topic);
// This call blocks indefinitely until a message is produced or until this message consumer is closed.
TextMessage message = (TextMessage) consumer.receive();
System.out.println(message.getText());
session.commit();
session.close();
}

@Test
public void testDurableSubscriber() throws Exception {
final String clientID = "my-durable-news";
Connection connection = new ActiveMQConnectionFactory(BROKER_URL).createConnection();
connection.setClientID(clientID);
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createDurableSubscriber(topic, clientID);
TextMessage message = (TextMessage) consumer.receive();
System.out.println(message.getText());
session.commit();
session.close();
}

}

Spring整合

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">

<context:property-placeholder location="classpath:application.properties"/>

<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${activemq.brokerURL}"/>
<property name="userName" value="${activemq.user}"/>
<property name="password" value="${activemq.password}"/>
</bean>

<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="activeMQConnectionFactory"/>
<property name="maxConnections" value="${activemq.pool.maxConnections}"/>
</bean>

<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<!-- 在程序中根据具体业务设置 -->
<!--<property name="defaultDestinationName" value="${jms.template.defaultDestination}"/>-->
</bean>

</beans>
1
2
3
4
5
6
7
# ActiveMQ
activemq.brokerURL=failover:(tcp://192.168.8.88:61616,tcp://192.168.8.129:61616)?randomize=false
activemq.user=admin
activemq.password=admin
activemq.pool.maxConnections=50
# JMS
jms.template.defaultDestination=order-notify

整合测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:spring-activemq.xml")
public class SpringActivemqTest {

private static final String QUEUE_NAME = "my-queue";
private static final String TOPIC_NAME = "my-topic";

@Autowired
private JmsTemplate jmsTemplate;

@Before
public void setUp() throws Exception {
Assert.assertNotNull(jmsTemplate);
}

@Test
public void testQueueSender() throws Exception {
jmsTemplate.setPubSubDomain(Boolean.FALSE);
jmsTemplate.setDefaultDestinationName(QUEUE_NAME);
jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("Hello ActiveMQ");
}
});
}

@Test
public void testQueueReceiver() throws Exception {
jmsTemplate.setPubSubDomain(Boolean.FALSE);
jmsTemplate.setDefaultDestinationName(QUEUE_NAME);
String message = (String) jmsTemplate.receiveAndConvert();
System.out.println(message);
}

@Test
public void testTopicPublisher() throws Exception {
jmsTemplate.setPubSubDomain(Boolean.TRUE);
jmsTemplate.setDefaultDestinationName(TOPIC_NAME);
jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("Hello ActiveMQ");
}
});
}

@Test
public void testTopicSubscriber() throws Exception {
jmsTemplate.setPubSubDomain(Boolean.TRUE);
jmsTemplate.setDefaultDestinationName(TOPIC_NAME);
String message = (String) jmsTemplate.receiveAndConvert();
System.out.println(message);
}

}

持久化

为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。ActiveMQ的消息持久化机制有KahaDB、LevelDB、JDBC和Memory。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<beans>

<broker>
<persistenceAdapter>
<!-- 基于文件存储(默认)-->
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
<!-- 基于文件存储 -->
<!-- <levelDB directory="${activemq.data}/leveldb"/> -->
<jdbcPersistenceAdapter dataSource="#dataSource" createTablesOnStartup="true"/>
<!-- <memoryPersistenceAdapter/> -->
</persistenceAdapter>
</broker>

<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="url" value="jdbc:mysql://127.0.0.1:3306/test"/>
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
</bean>

</beans>

高性能

如果有broker1和broker2通过networkConnector连接,consumer1连接到broker1,consumer2连接到broker2。程序往broker1上面发送10条消息,然后当consumer2连接到broker2消费了5条消息时,突然broker2挂掉了。但是还剩5条消息在broker2上,除非broker2重启,然后有消费者连接到broker2上消费消息,否则,这些消息就好像消失了。遇到这样的情况,就可以通过设置replayWhenNoConsumers这个选项来使得broker2上有需要转发的消息但是没有消费者时,把消息回流到它原来的broker1上。

对于broker1来讲,broker2也是消费者,所以当consumer2连接到broker2进行消费时,broker1上的消息会被broker2消费掉,而broker2上的消息又会被consumer2消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<beans>
<broker>
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- 把enableAudit设置为false以防止消息回流后被当做重复消息而不被分发 -->
<policyEntry queue=">" enableAudit="false">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<networkConnectors>
<!-- 静态 Network Connectors -->
<networkConnector uri="static://(tcp://192.168.8.88:61616,tcp://192.168.8.129:61616)"/>
</networkConnectors>
</broker>
</beans>

容错的链接:Failover协议实现了自动重新链接的逻辑,当一个Broker链接失败时,那么会链接到其他的Broker上。

1
2
private static final String BROKER_URL = "failover:(tcp://192.168.8.88:61616,tcp://192.168.8.129:61616)?randomize=false";
ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);

高可用

replicated-leveldb-store

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<persistenceAdapter>
<!--
replicas: 集群中的节点数,集群中至少有 (replicas/2)+1 个节点是存活的以避免服务中断
bind: 当该节点成为master后它将绑定已配置的地址和端口来为复制协议提供服务,只需使用tcp://0.0.0.0:0进行配置即可
-->
<replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:0" hostname="192.168.8.88"
zkAddress="192.168.8.88:2181,192.168.8.129:2181,192.168.8.131:2181" zkPath="/activemq/leveldb"/>
<!--
<replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:0" hostname="192.168.8.129"
zkAddress="192.168.8.88:2181,192.168.8.129:2181,192.168.8.131:2181" zkPath="/activemq/leveldb"/>
<replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:0" hostname="192.168.8.131"
zkAddress="192.168.8.88:2181,192.168.8.129:2181,192.168.8.131:2181" zkPath="/activemq/leveldb"/>
-->
</persistenceAdapter>

监控

  1. Download the hawtio-app-{VERSION}.jar

  2. Execute with command java -jar hawtio-app-{VERSION}.jar