一、前言

目前分布式事务解决的方案主要有对业务无入侵和有入侵的方案。

无入侵方案主要有基于数据库 XA 协议的两段式提交(2PC)方案,它的优点是对业务代码无入侵,但是它的缺点也是很明显:必须要求数据库对 XA 协议的支持,且由于 XA 协议自身的特点,它会造成事务资源长时间得不到释放,锁定周期长,而且在应用层上面无法干预,因此它性能很差,它的存在相当于七伤拳那样“伤人七分,损己三分”,因此在互联网项目中并不是很流行这种解决方案。

入侵业务的方案:为了这个弥补这种方案带来性能低的问题,大佬们又想出了很多种方案来解决,但这无一例外都需要通过在应用层做手脚,即入侵业务的方式,比如很出名的 TCC 方案,基于 TCC 也有很多成熟的框架,如 ByteTCC、tcc-transaction 等。以及基于可靠消息的最终一致性来实现,如 RocketMQ 的事务消息。

入侵代码的方案是基于现有情形“迫不得已”才推出的解决方案,实际上它们实现起来非常不优雅,一个事务的调用通常伴随而来的是对该事务接口增加一系列的反向操作,比如 TCC 三段式提交,提交逻辑必然伴随着回滚的逻辑,这样的代码会使得项目非常臃肿,维护成本高。

什么是侵入性

当你的代码引入了一个组件,导致其它代码或者设计,要做相应的 更改以适应新组件。这样的情况我们就认为这个新组件具有侵入性。

如何定义一个分布式事务

首先,很自然的,我们可以把一个分布式事务理解成一个包含了若干 分支事务全局事务

全局事务 的职责是协调其下管辖的 分支事务 达成一致,要么一起成功提交,要么一起失败回滚。此外,通常 分支事务 本身就是一个满足 ACID 的 本地事务。这是我们对分布式事务结构的基本认识,与 XA 是一致的。

二、Seata 介绍

seata 是阿里开源的一个分布式事务框架,能够让大家在操作分布式事务时,像操作本地事务一样简单。一个注解搞定分布式事务。

解决分布式事务问题,有两个设计初衷

  • 对业务无侵入:即减少技术架构上的微服务化所带来的分布式事务问题对业务的侵入
  • 高性能:减少分布式事务解决方案所带来的性能消耗

seata 中有两种分布式事务实现方案:AT 及 TCC

  • AT 模式主要关注多 DB 访问的数据一致性,当然也包括多服务下的多 DB 数据访问一致性问题 2PC-改进
  • TCC 模式主要关注业务拆分,在按照业务横向扩展资源时,解决微服务间调用的一致性问题

Seata 的各个模块之间的关系

Seata 的设计思路是将一个分布式事务可以理解成一个全局事务,下面挂了若干个分支事务,而一个分支事务是一个满足 ACID 的本地事务,因此我们可以操作分布式事务像操作本地事务一样。

1、AT 模式

它使得应用代码可以像使用本地事务一样使用分布式事务,完全屏蔽了底层细节。

AT 模式下,把每个数据库被当做是一个 Resource,Seata 里称为 DataSource Resource。

业务通过 JDBC 标准接口访问数据库资源时,Seata 框架会对所有请求进行拦截,做一些操作。每个本地事务提交时,Seata RM(Resource Manager,资源管理器) 都会向 TC(Transaction Coordinator,事务协调器) 注册一个分支事务。当请求链路调用完成后,发起方通知 TC 提交或回滚分布式事务,进入二阶段调用流程。此时,TC 会根据之前注册的分支事务回调到对应参与者去执行对应资源的第二阶段。

TC 是怎么找到分支事务与资源的对应关系呢?每个资源都有一个全局唯一的资源 ID,并且在初始化时用该 ID 向 TC 注册资源。在运行时,每个分支事务的注册都会带上其资源 ID。这样 TC 就能在二阶段调用时正确找到对应的资源。

解释:

  • Transaction Coordinator (TC) :事务协调器,维护全局事务的运行状态,负责协调并决定全局事务的提交或回滚
  • Transaction Manager(TM):控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决
  • Resource Manager (RM): 资源管理器,负责本地事务的注册,本地事务状态的汇报(投票),并且负责本地事务的提交和回滚。

XID:一个全局事务的唯一标识

其中,TM 是一个分布式事务的发起者和终结者,TC 负责维护分布式事务的运行状态,而 RM 则负责本地事务的运行。如下图所示:

下面是一个分布式事务在 Seata 中的执行流程:

  1. TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID
  2. XID 在微服务调用链路的上下文中传播。
  3. RM 向 TC 注册分支事务,接着执行这个分支事务并提交(重点:RM 在第一阶段就已经执行了本地事务的提交/回滚),最后将执行结果汇报给 TC
  4. TM 根据 TC 中所有的分支事务的执行情况,发起全局提交或回滚决议。
  5. TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

Seata 中有三大模块,分别是 TM、RM 和 TC。

其中 TM 和 RM 是作为 Seata 的客户端与业务系统集成在一起,TC 作为 Seata 的服务端独立部署。

2、MT 模式

Seata 还支持 MT 模式。MT 模式本质上是一种 TCC 方案,业务逻辑需要被拆分为 Prepare/Commit/Rollback 3 部分,形成一个 MT 分支,加入全局事务。如图所示:

MT 模式一方面是 AT 模式的补充。另外,更重要的价值在于,通过 MT 模式可以把众多非事务性资源纳入全局事务的管理中。

3、相关概念讲解

XID:全局事务的唯一标识,由 ip:port:sequence 组成;
Transaction Coordinator (TC):事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚;
Transaction Manager (TM ):控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议;
Resource Manager (RM):控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚;

Fescar 使用 XID 表示一个分布式事务,XID 需要在一次分布式事务请求所涉的系统中进行传递,从而向 feacar-server 发送分支事务的处理情况,以及接收 feacar-server 的 commit、rollback 指令。

4、配置讲解

fescar 的配置入口文件是 registry.conf, 查看代码 ConfigurationFactory 得知目前还不能指定该配置文件,所以配置文件名称只能为 registry.conf。

registry 中可以指定具体配置的形式,默认使用 file 类型,在 file.conf 中有 3 部分配置内容:

transport transport :用于定义 Netty 相关的参数,TM、RM 与 fescar-server 之间使用 Netty 进行通信。

三、Spring Cloud 快速集成 Seata

Seata 参考地址:https://github.com/seata/seata-samples/blob/master/doc/quick-integration-with-spring-cloud.md

相关概念讲解

XID:全局事务的唯一标识,由 ip:port:sequence 组成;
Transaction Coordinator (TC):事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚;
Transaction Manager (TM ):控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议;
Resource Manager (RM):控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚;

Fescar 使用 XID 表示一个分布式事务,XID 需要在一次分布式事务请求所涉的系统中进行传递,从而向 feacar-server 发送分支事务的处理情况,以及接收 feacar-server 的 commit、rollback 指令。

1. 添加依赖

添加 Seata 依赖

1
2
3
4
5
6
<!--Seata依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>

2. 添加 Seata 配置文件

①registry.conf

fescar 的配置入口文件是 registry.conf,该配置用于指定 TC 的注册中心和配置文件,默认都是 file; 如果使用其他的注册中心,要求 Seata-Server 也注册到该配置中心上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "file"

nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}

config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"

nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}

②file.conf

各大微服务的 RM 和 TC 之间的通信配置,该配置用于指定 TC 的相关属性;如果使用注册中心也可以将配置添加到配置中心

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
#thread factory for netty
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
boss-thread-size = 1
#auto default pin or 8
worker-thread-size = 8
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#vgroup->rgroup
vgroup_mapping.my_test_tx_group = "default"
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}

client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
report.retry.count = 5
}

## transaction log store
store {
## store mode: file、db
mode = "file"

## file store
file {
dir = "sessionStore"

# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
max-branch-session-size = 16384
# globe session size , if exceeded throws exceptions
max-global-session-size = 512
# file buffer size , if exceeded allocate new buffer
file-write-buffer-cache-size = 16384
# when recover batch read size
session.reload.read_size = 100
# async, sync
flush-disk-mode = async
}

## database store
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
url = "jdbc:mysql://127.0.0.1:3306/seata"
user = "mysql"
password = "mysql"
min-conn = 1
max-conn = 3
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}
}
lock {
## the lock store mode: local、remote
mode = "remote"

local {
## store locks in user's database
}

remote {
## store locks in the seata's server
}
}
recovery {
committing-retry-delay = 30
asyn-committing-retry-delay = 30
rollbacking-retry-delay = 30
timeout-retry-delay = 30
}

transaction {
undo.data.validation = true
undo.log.serialization = "jackson"
}

## metrics settings
metrics {
enabled = false
registry-type = "compact"
# multi exporters use comma divided
exporter-list = "prometheus"
exporter-prometheus-port = 9898
}

需要注意的是 service.vgroup_mapping这个配置,在 Spring Cloud 中默认是${spring.application.name}-fescar-service-group,可以通过指定application.propertiesspring.cloud.alibaba.seata.tx-service-group这个属性覆盖,但是必须要和 file.conf 中的一致,否则会提示 no available server to connect

3. 注入数据源

Seata 通过代理数据源的方式实现分支事务;MyBatis 和 JPA 都需要注入 io.seata.rm.datasource.DataSourceProxy, 不同的是,MyBatis 还需要额外注入 org.apache.ibatis.session.SqlSessionFactory

使用 DataSourceProxy 的目的是为了引入 ConnectionProxy ,fescar 无侵入的一方面就体现在 ConnectionProxy 的实现上,即分支事务加入全局事务的切入点是在本地事务的 commit 阶段,这样设计可以保证业务数据与 undo_log 是在一个本地事务中。

MyBatis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration
public class DataSourceProxyConfig {

@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource dataSource() {
return new DruidDataSource();
}

@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}

@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
return sqlSessionFactoryBean.getObject();
}
}

4. 添加 undo_log 表

undo_log 是需要在业务库上创建的一个表,fescar 依赖该表记录每笔分支事务的状态及二阶段 rollback 的回放数据。不用担心该表的数据量过大形成单点问题,在全局事务 commit 的场景下事务对应的 undo_log 会异步删除。

所以在每个微服务对应的数据库中需要创建一张 undo_log 表,用于保存需要回滚的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE `undo_log`
(
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`branch_id` BIGINT(20) NOT NULL,
`xid` VARCHAR(100) NOT NULL,
`context` VARCHAR(128) NOT NULL,
`rollback_info` LONGBLOB NOT NULL,
`log_status` INT(11) NOT NULL,
`log_created` DATETIME NOT NULL,
`log_modified` DATETIME NOT NULL,
`ext` VARCHAR(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8

5. 启动 Seata-Server

下载地址: https://github.com/seata/seata/releases

下载相应版本的 Seata-Server,修改 registry.conf为相应的配置(如果使用 file 则不需要修改),解压并通过以下命令启动:

1
sh ./bin/seata-server.sh

6. 使用@GlobalTransactional开启事务

在业务的发起方的方法上使用@GlobalTransactional开启全局事务,Seata 会将事务的 xid 通过拦截器添加到调用其他服务的请求中,实现分布式事务

7、修改各大微服务的 application.yml 配置

配置通信指定的组名( my_test_tx_groupfile.conf 中有):

1
2
3
4
5
spring:
cloud:
alibaba:
seata:
tx-service-group: my_test_tx_group