简介
消息队列中间件是服务架构中常见的组件,可用于异步处理、应用解耦、流量削锋、消息通讯等场景,是大型分布式系统不可缺少的中间件。ActiveMQ是一个用Java编写的、完全支持JMS1.1规范的开源消息中间件,支持REST、AMQP、STOMP、WS-Notification、MQTT、XMPP、OpenWire协议。
消息队列组成
- 生产者(Producer):负责产生消息
- 消息代理(Message Broker):负责存储和转发消息
- 消费者(Consumer):负责消费消息
Broker负责把消息从发送端传送到接收端,可以把它看成是MQ的服务端。
转发分为推和拉两种:拉是指Consumer主动从Message Broker获取消息,推是指Message Broker主动将Consumer感兴趣的消息推送给Consumer
1 | /** |
可靠性机制
默认情况下,生产者发送的消息是持久化的(同步发送),消息发送到broker后,Producer会等待broker对这条消息的处理情况反馈。同步发送持久消息能够提供更好的可靠性,但这潜在地影响了程序的响应速度,因为在接受到 broker 的确认消息之前应用程序或线程会被阻塞。如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。
异步发送不会在收到 broker 的确认之前一直阻塞 MessageProducer.send(message) 方法。如果想启动异步传送可以在 brokerURL 中配置 jms.useAsyncSend=true 选项,如:tcp://127.0.0.1:61616?jms.useAsyncSend=true
。但即使是异步发送消息,生产者也是在收到 broker 的确认应答后才把下一条消息传送给broker。当使用异步传送的时候,可以通过设置 jms.producerWindowSize(单位为字节)属性来控制发送端无节制地向 broker 发送消息,如: tcp://127.0.0.1:61616?jms.useAsyncSend=true&jms.producerWindowSize=1024000
,设置后,当达到了 producerWindowSize 上限,即使是异步调用也会被阻塞。
非持久化消息默认就是异步发送的,且 producerWindowSize 设置只对异步发送有意义。
消息确认模式
在事务性会话中,acknowledgeMode
被JMS服务器自动设置为SESSION_TRANSACTED
模式。在该模式下,发送端只有commit后消息才能发送出去,接收端在commit后会自动签收消息。而在非事务会话中,消息何时被确认取决于创建会话时的应答模式。
应答模式 | 描述 |
---|---|
AUTO_ACKNOWLEDGE | 自动确认模式,当接收端成功从MessageConsumer.receive()或MessageListener.onMessage(message)方法返回后,会话会自动确认该消息 |
CLIENT_ACKNOWLEDGE | 客户端手动确认,接收端通过调用Message.acknowledge()方法手动确认消息 |
DUPS_OK_ACKNOWLEDGE | 延迟确认模式 |
说明:这三种应答模式只在接收端设置有效。
消息传送模型
点对点模型
1 | public class ActivemqTest { |
发布/订阅模型
1 | public class ActivemqTest { |
Spring整合
配置文件
1 |
|
1 | # ActiveMQ |
整合测试
1 | .class) (SpringJUnit4ClassRunner |
持久化
为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。ActiveMQ的消息持久化机制有KahaDB、LevelDB、JDBC和Memory。
1 | <beans> |
高性能
如果有broker1和broker2通过networkConnector连接,consumer1连接到broker1,consumer2连接到broker2。程序往broker1上面发送10条消息,然后当consumer2连接到broker2消费了5条消息时,突然broker2挂掉了。但是还剩5条消息在broker2上,除非broker2重启,然后有消费者连接到broker2上消费消息,否则,这些消息就好像消失了。遇到这样的情况,就可以通过设置replayWhenNoConsumers
这个选项来使得broker2上有需要转发的消息但是没有消费者时,把消息回流到它原来的broker1上。
对于broker1来讲,broker2也是消费者,所以当consumer2连接到broker2进行消费时,broker1上的消息会被broker2消费掉,而broker2上的消息又会被consumer2消费。
1 | <beans> |
容错的链接:Failover协议实现了自动重新链接的逻辑,当一个Broker链接失败时,那么会链接到其他的Broker上。
1 | private static final String BROKER_URL = "failover:(tcp://192.168.8.88:61616,tcp://192.168.8.129:61616)?randomize=false"; |
高可用
1 | <persistenceAdapter> |
监控
Download the hawtio-app-{VERSION}.jar
Execute with command
java -jar hawtio-app-{VERSION}.jar