分布式事务

概述

在微服务架构中,随着服务的逐步拆分,数据库私有已经成为共识,这也导致所面临的分布式事务问题成为微服务落地过程中一个非常难以逾越的障碍,但是目前尚没有一个完整通用的解决方案。

其实不仅仅是在微服务架构中,随着用户访问量的逐渐上涨,数据库甚至是服务的分片、分区、水平拆分、垂直拆分已经逐渐成为较为常用的提升瓶颈的解决方案,因此越来越多的原子操作变成了跨库甚至是跨服务的事务操作,最终结果是在对高性能、高扩展性、高可用性的追求的道路上,我们开始逐渐放松对一致性的追求,但是在很多场景下,尤其是账务、电商等业务中,不可避免的存在着一致性问题,使得我们不得不去探寻一种机制,用以在分布式环境中保证事务的一致性。

微服务使得单体架构扩展为分布式架构,在扩展的过程中,逐渐丧失了单体架构中数据源单一、可以直接依赖于数据库进行事务操作的能力。而关系型数据库中,提供了强大的事务处理能力,可以满足ACID特性,这种特性保证了数据操作的强一致性,这也是分布式环境中弱一致性以及最终一致性能够得以实现的基础。

在讨论分布式事务之前,我们得先弄清楚本地事务和全局事务的概念以及它们的使用场景。

本地事务

本地事务:在单个数据库的本地并且限制在单个进程内的事务(本地事务不涉及多个数据来源),事务由资源管理器(如DBMS)本地管理,而Spring为我们提供了非常方便的声明式事务管理,但是默认的Spring事务只支持单数据源,而实际上一个应用往往需要进行跨库甚至是跨服务的事务操作,这个时候就要依靠分布式事务。

  • 优点
    1. 支持严格的ACID属性
    2. 可靠
    3. 高效
    4. 状态可以只在资源管理器中维护
    5. 应用编程模型简单
  • 缺点
    1. 不具备分布式事务处理能力
    2. 隔离的最小单位由资源管理器决定,如数据库中的一条记录

全局事务

全局事务:由全局事务管理器管理和协调的事务,可以跨越多个资源(如数据库或JMS队列)和进程,全局事务管理器一般使用XA二阶段提交协议与数据库进行交互,事务由全局事务管理器全局管理。

  • 优点:严格的ACID
  • 缺点:效率非常低(微服务架构下已不太适用)
    1. 全局事务方式下,全局事务管理器(TM)通过XA接口使用二阶段提交协议(2PC)与资源层(如数据库)进行交互,使用全局事务,数据被Lock的时间跨整个事务,直到全局事务结束。
    2. 2PC是反可伸缩模式,在事务处理过程中,参与者需要一直持有资源直到整个分布式事务结束。这样,当业务规模越来越大的情况下,2PC的局限性就越来越明显,系统可伸缩性会变得很差。
    3. 与本地事务相比,XA协议的系统开销相当大,因而应当慎重考虑是否确实需要分布式事务,而且只有支持XA协议的资源才能参与分布式事务。

全局事务,作为一种标准的分布式事务解决方案,它解决了本地事务无法满足分布式场景中数据的ACID的要求,但由于其存在效率底下的致命缺点,在微服务架构下已不太适用。

在业内,主要用来解决分布式事务的方案是使用柔性事务,柔性事务包括几种类型:两阶段型、补偿型、异步确保型和最大努力通知型。

基于可靠消息最终一致性(异步确保型)

数据一致性分为三个种类型:强一致性、弱一致性以及最终一致性,数据库实现的就是强一致性,能够保证在写入一份新的数据库,立即使其可见。最终一致性是弱一致性的强化版,系统保证在没有后续更新的前提下,系统最终返回上一次更新操作的值。在最终一致性的实现过程中,最基本的操作就是保证事务参与者的幂等性,所谓的幂等性,就是业务方能够使用相关的手段,保证单个事务多次提交依然能够保证达到同样的目的。

在分布式系统中,服务之间通常会通过RPC来进行网络通讯,而远程调用最郁闷的地方就是结果有三种:成功、失败和超时(成功失败都有可能),从而导致了数据传输的不确定性,如何确保消息发送的一致性是可靠消息的前提保障。

  • RPC同步调用:RPC的同步调用确保请求送达对方并收到对方响应,若没有收到响应,则抛出Timeout异常,这种情况下调用方是无法确定调用是成功还是失败的,需要根据业务场景(是否可重入、幂等)选择重试和补偿策略。
  • 消息发送一致性:如果业务操作成功,那么由这个业务操作所产生的消息一定要成功投递出去,否则就会丢失消息。在设计范式里通常不允许消费下游业务失败,不然后面失败了前面也不好回滚。但消费端消费失败时该怎么办?消费失败了,重试,还一直失败怎么办?是不是要自动回滚整个流程?答案是人工介入。从工程实践角度讲,这种整个流程自动回滚的代价是非常巨大的,不但实现复杂,还会引入新的问题,比如自动回滚失败,又怎么处理?针对这种极低概率的情况,采取人工处理会比实现一个高复杂的自动化回滚系统更加可靠也更加简单。
  • 最终一致性:主要是用记录补偿的方式。在做所有的不确定的事情之前,先把事情记录下来,然后去做不确定的事情,结果可能是:成功、失败或是不确定,不确定(例如超时等)可以等价为失败。成功就可以把记录的东西清理掉了,对于失败和不确定,可以依靠定时任务等方式把所有失败的事情重新执行一遍,直到成功为止。

以购物场景为例,张三购买物品,账户扣款 100 元的同时,需要在下游的会员服务给该账户增加 100 积分,如何保证原子性?

一般的思路都是通过消息中间件来实现最终一致性:资金账户服务扣钱,然后发消息给中间件,会员积分服务接收此消息,进行增加积分。

但这里面有个问题:在资金账户服务中,是先更新DB后发送消息呢? 还是先发送消息后更新DB?

假设先更新DB成功,发送消息网络异常,重发又失败,怎么办?假设先发送消息成功,更新DB失败,消息已经发出去了,又不能撤回,怎么办?

当然,你可能已经想到了,我可以把发送消息这个网络调用和更新DB放在同一个本地事务中,如果发送消息失败,更新DB自动回滚,这样不就保证两个操作的原子性了吗?这个方案看似正确,但其实是错误的,原因有:

  1. 把网络调用放在DB事务里面,可能会因为网络的延时,导致DB长事务,严重地会Block住整个DB,风险很大。
  2. 网络的两军问题:发送消息失败,发送方并不知道消息中间件是真的没有收到消息还是消息已经收到了只是返回的时候失败了,如果是已经收到消息了,而发送端认为没有收到而执行事务回滚操作,从而无法保证数据的一致性。

所以,这里得出结论:只要发送消息和更新DB这两个操作不是原子的,无论谁先谁后都是有问题的。

事务消息的本质就是为了解决此类问题,即解决本地事务执行与消息发送的原子性问题。

基于可靠消息最终一致性的分布式事务解决方案

消息发送基本流程如下:

  1. 上游业务系统首先预发送(同步发送)消息到可靠消息服务
  2. 在预发送消息成功返回后执行本地事务(在返回过程中出现死机、超时等异常情况的消息将由消息状态确认子系统处理)
  3. 上游业务系统将本地事务执行结果发送(异步发送)到可靠消息服务(在发送过程中出现死机、超时等异常情况的消息将由消息状态确认子系统处理)
  4. 如果业务处理成功,可靠消息服务则更新消息状态并将消息投放到实时消息服务(消息中间件);如果业务处理失败,则将该消息进行删除

消息消费基本流程如下:

  1. 投放到实时消息服务的消息会被消息业务消费端监听并调用(同步)下游业务系统进行消费
  2. 如果下游业务系统业务处理成功,消息业务消费端则和实时消息服务、可靠消息服务进行确认(确认结果就是将消息删除);如果业务处理失败,则由消息恢复子系统进行处理
  • 优点
    1. 消息服务独立部署、独立维护、独立伸缩;
    2. 消息存储可以按需选择不同的数据库来集成实现;
    3. 消息服务可以被相同的使用场景共用,降低重复建设消息服务的成本;
    4. 从应用(分布式服务)设计开发的角度实现了消息数据的可靠性,消息数据的可靠性不依赖于MQ中间件,弱化了对MQ中间件特性的依赖;
    5. 降低了业务系统与消息系统间的耦合,有利于系统的扩展维护。
  • 弊端
    1. 一次消息发送需要两次请求;
    2. 上游业务系统需要提供一个事务状态查询接口供可靠消息服务调用;
    3. 当下游业务处理成功时,下游业务系统需要调用可靠消息服务相关接口进行确认。

相关说明:

  • 上游业务系统预发送消息操作为何是同步的?

    因为我们需要获取消息预发送成功后返回的信息(消息ID),否则后面就无法进行后续的消息更新(确认)或删除操作了。但消息预发送可能会有三种结果:成功、失败、超时(成功或失败都有可能)。然而,这个不要紧,因为消息必须要确认后才会进行投递。如果出现超时现象,我们尽可以把这个待确认的消息丢弃。此外,上游业务系统如果收到的不是消息预发送成功的反馈结果,就不会执行下一步业务处理操作,从而仍可以保证整个系统的一致性。

  • 上游业务系统发送业务处理结果为何是异步的?

    首先,上游系统和消息中间件之间采用异步通信是为了提高系统并发度。业务系统直接和用户打交道,用户体验尤为重要,因为这种异步通信方式能够极大程度地降低用户等待问题。此外,异步通信相对于同步通信,没有了长时间的阻塞等待,因此系统的并发性也大大增加。对于异步通信可能引起信息的丢失问题,可以由消息服务的超时询问机制来弥补。

  • 消息中间件投递消息失败后,为何是不断尝试重投而不是进行业务回滚?

    这就涉及到分布式事务系统的实现成本问题。我们知道,当上游业务系统向消息中间件发送业务处理结果后,并不获取任何反馈结果便直接去做别的事情了。如果此时消息投递失败,我们就可以进行重试(下游业务系统需要保证幂等)。如果不断重试(超过了最大重试次数)还是失败了,那么就可以发通知,然后人工介入处理了。而如果是进行业务回滚的话,则需要让上游业务系统事先提供回滚接口,这无疑增加了额外的开发成本,业务系统的复杂度也将提高,为了这种小概率事件而设计这个复杂的流程反而得不偿失。对于一个业务系统的设计目标是:在保证性能的前提下,最大限度地降低系统的复杂度,从而降低系统的运维成本。

  • 消息中间件和下游业务系统之间为什么要采用同步通信?

    上游业务系统发送完业务处理结果后,并不获取任何反馈结果便直接去做别的事情,接下来提交或回滚操作就完全交给消息中间件来完成,并且完全信任消息中间件,认为它一定能正确地完成事务的提交或回滚。然而,消息中间件向下游系统投递消息的过程是同步的,也就是消息中间件将消息投递给下游系统后,它会阻塞等待,等下游系统成功处理完任务返回确认应答后才取消阻塞等待,为什么这两者在设计上是不一致的呢?

    异步通信能提升系统性能,但随之会增加系统复杂度;而同步虽然降低系统并发度,但实现成本较低。因此,在对并发度要求不是很高或者服务器资源较为充裕的情况下,我们可以选择同步来降低系统的复杂度。我们知道,消息中间件是一个独立于业务系统的第三方中间件,它不和任何业务系统产生直接的耦合,它也不和用户产生直接的关联,它一般部署在独立的服务器集群上,具有良好的可扩展性,所以不必太过于担心它的性能,如果处理速度无法满足我们的要求,可以增加机器来解决。而且,即使消息中间件处理速度有一定的延迟也是可以接受的,因为我们追求的是最终一致性而非实时一致性,因此消息中间件产生的延时导致事务短暂的不一致是可以接受的。

可靠消息服务

对消息进行预存储和转发以确保消息不丢失。

尽管我们可以尽量地确保MQ可靠,让MQ可靠地持久化消息,但是网络是不可靠的,几乎没有办法确保网络可靠。所以,我们期望有一个可靠消息,能够避免任何问题,包括网络问题。如果消息不可靠,那么我们就需要采取其他的措施,比如本地消息表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE TABLE IF NOT EXISTS tx_message (
id bigint(20) NOT NULL COMMENT '消息ID',
version int(8) NOT NULL DEFAULT 0 COMMENT '版本号',
msg_status varchar(16) NOT NULL COMMENT '消息状态:待确认、发送中',
msg_body text NOT NULL COMMENT '消息内容',
content_type varchar(64) NOT NULL DEFAULT 'JSON' COMMENT '消息数据类型',
dead_letter bit(1) NOT NULL DEFAULT 0 COMMENT '死信',
retry_times int(11) NOT NULL DEFAULT 0 COMMENT '重试次数(超过最大重试次数将成为死信)',
biz_id varchar(64) DEFAULT NULL COMMENT '业务ID',
queue varchar(64) NOT NULL COMMENT '消息队列',
callback varchar(64) DEFAULT NULL COMMENT '业务状态回查接口',
remark varchar(255) DEFAULT NULL COMMENT '备注',
created_at timestamp NOT NULL COMMENT '创建时间',
created_by varchar(20) NOT NULL COMMENT '创建者',
updated_at timestamp NULL DEFAULT NULL COMMENT '更新时间',
updated_by varchar(20) DEFAULT NULL COMMENT '更新者',
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='事务消息表';

消息状态确认子系统

用于定时从可靠消息服务中查询状态为待确认的消息,然后和上游业务系统进行确认,并根据确认结果进行处理:如果确认上游业务已处理成功,则更新消息状态并将消息投递到实时消息服务;如果确认上游业务处理失败,则将该消息删除;如果上游业务正在处理中,则继续等待(下次处理)。

消息恢复子系统

用于定时从可靠消息服务中查询状态为发送中的消息并进行重新投递到实时消息服务(消息中间件)。

由于可靠消息服务存在重投机制,所以下游业务系统必须保证相关业务接口的幂等性(即f(n)=f(1)),以确保消息不会被重复消费。

消息管理子系统

主要用于对已死亡的消息进行人工干预。

实时消息服务(消息中间件)

主要用于消息存储和转发。

RocketMQ事务消息

在RocketMQ中实现了分布式事务,实际上其实是对本地消息表的一个封装,将本地消息表移动到了MQ内部,其基本流程如下:

  1. 事务发起方首先发送prepare消息到MQ
  2. 在发送prepare消息成功后执行本地事务
  3. 根据本地事务执行结果返回commit或者rollback
  4. 如果是rollback消息,MQ将删除该prepare消息不进行投递;如果是commit消息,MQ将会把这个消息投递给consumer端
  5. 如果执行本地事务过程中,出现死机、超时等异常情况导致消息确认失败,那么MQ将会不停地询问其同组的其它producer来获取状态
  6. consumer端的消费成功机制由MQ保证

RocketMQ的事务消息主要是通过消息的异步处理,可以保证 本地事务消息发送 同时成功或失败,从而保证数据的最终一致性。

RocketMQ事务消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@Slf4j
public class Producer {

public static void main(String[] args) {
TransactionMQProducer producer = new TransactionMQProducer("TransactionGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setTransactionListener(new TransactionListenerImpl());
producer.setExecutorService(ForkJoinPool.commonPool());
try {
producer.start();
// 发送事务消息, 此消息不可见
TransactionSendResult sendResult = producer.sendMessageInTransaction(new Message(
"TransactionTopic",
"事务消息".getBytes(StandardCharsets.UTF_8)
), "tx");
log.debug("====== {} ======", sendResult);
} catch (MQClientException e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}

}

@Slf4j
public class TransactionListenerImpl implements TransactionListener {

// 存储当前线程对应的事务状态
private final Map<String, LocalTransactionState> localTrans = new ConcurrentHashMap<>();

/**
* 发送prepare消息成功后回调该方法用于执行本地事务
*
* @param msg 回传的消息
* @param arg 调用send方法时传递的参数
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transactionId = msg.getTransactionId();
try {
localTrans.put(transactionId, UNKNOW);
log.debug("====== 开始执行本地事务: transactionId={} ======", transactionId);
TimeUnit.SECONDS.sleep(60);
log.debug("====== 执行本地事务成功 ======");
localTrans.put(transactionId, COMMIT_MESSAGE);
return COMMIT_MESSAGE;
} catch (Exception e) {
localTrans.put(transactionId, ROLLBACK_MESSAGE);
return ROLLBACK_MESSAGE;
}
}

/**
* 消息回查, 当本地事务超时, broker会做消息回查操作
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String transactionId = msg.getTransactionId();
LocalTransactionState transactionState = localTrans.get(transactionId);
log.debug("====== 执行消息回查: transactionId={}, transactionState={} ======", transactionId, transactionState);
return Optional.ofNullable(transactionState).orElse(UNKNOW);
}

}
1
2
3
4
5
====== 开始执行本地事务: transactionId=AC140A031F7C18B4AAC28A2C0D070000 ======
====== 执行消息回查: transactionId=AC140A0323D018B4AAC28A25C2F40000, transactionState=null ======
====== 执行消息回查: transactionId=AC140A031F7C18B4AAC28A2C0D070000, transactionState=UNKNOW ======
====== 执行本地事务成功 ======
====== SendResult [sendStatus=SEND_OK, msgId=AC140A031F7C18B4AAC28A2C0D070000, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=Lenovo-PC, queueId=1], queueOffset=8] ======

Seata分布式事务解决方案