分布式事务解决方案之 Alibaba Seata

Posted by 彭超 on 2020-07-20
Estimated Reading Time 10 Minutes
Words 2.4k In Total
Viewed Times

关于事务的几点常识

本地事务

该类事务需要满足四大特性:ACID(原子性、一致性、隔离性、持久性),仅限于对单一数据库资源的访问控制。

  • 原子性(Atomicity):指事务作为整体来执行,要么全部执行,要么全部不执行。
  • 一致性(Consistency):指事务应确保数据从一个一致的状态转变为另一个一致状态。
  • 隔离性(Isolation):指多个事务并发时,一个事务的执行不应影响其它事务的执行。
  • 持久性(Durability):指已提交的事务修改数据会被持久保存。

柔性事务

如果将实现了 ACID 的四大事务特性的事务称为刚性事务的话,那么基于 BASE 事务要素的事务则称为柔性事务。

BASE 是基本可用、柔性状态和最终一致性这三个特性的缩写。

  • 基本可用(Basically Available):允许分布式事务参与方不一定要同时在线。
  • 柔性状态(Soft state):则允许系统状态更新有一定的延时。
  • 最终一致性(Eventually consistent):通常是通过消息传递的方式保证系统的 最终一致性

ACID 事务中对隔离性的要求很高,在事务执行过程中,必须将所有的资源锁定。而柔性事务的理念则是通过业务逻辑将互斥锁操作从资源层面移至业务层面。通过放宽对 强一致性 的要求,来换取系统吞吐量的提升。

分布式事务

什么是分布式事务

我们可以把一个分布式事务理解成一个包含了 若干分支事务的全局事务,全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个满足 ACID 的本地事务。这是我们对分布式事务结构的基本认识,与 XA 是一致的。

产生的原因

传统的单体应用中,一个业务操作可能需要调用三个模块完成,此时数据的一致性有本地事务来保证。

随着业务需求的变化,单体应用被拆分成微服务应用,业务操作需要调用三个服务来完成,原来的三个模块被拆分成三个独立的应用,分别使用独立的数据源。此时每个服务内部的数据一致性由本地事务来保证,但是全局的数据一致性问题无法保证。

在微服务分布式架构中由于全局数据一致性没法保证所产生的问题就是分布式事务问题。简单来说,一次业务操作需要操作多个数据源或需要进行远程调用,就会产生分布式事务问题。

制造一个分布式事务问题

这里我们会创建三个服务,分别是订单服务、库存服务、账户服务。当用户下单时,会在 订单服务 中创建一个订单,然后通过远程调用 库存服务 扣减当前商品的库存,再通过远程调用 账户服务 来扣减用户账户里面的余额。该业务操作通过两次远程调用,跨越三个数据库,明显存在分布式事务问题。

Alibaba Seata 简介

概述

Seata 是一款开源的分布式事务解决方案,提供高性能和简单易用的分布式事务服务,提供了 ATSAGAXA 事务模式。

组件

  • TC 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚。
  • TM 事务管理器:定义全局事务的范围,从开始全局事务 > 提交或回滚事务。
  • RM 资源管理器:管理分支事务处理的资源,与 TC 合作以注册分支事务和报告分支事务的状态,驱动分支事务提交或回滚。

接入 Seata 分布式事务

安装配置 Seata Server

先从官网下载 seata server,下载地址:https://github.com/seata/seata/releases/tag/v0.9.0

解压安装包到指定目录,修改 conf 目录下的 file.conf 配置文件。主要修改自定义事务名称、事务日志存储模式为 db 以及数据库连接信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
service {
#vgroup->rgroup
vgroup_mapping.fsp_tx_group = "default" #修改事务组名称为:fsp_tx_group,和客户端自定义的名称对应
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}

## transaction log store
store {
## store mode: file、db
mode = "db" #修改此处将事务信息存储到数据库中

## database store
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
driver-class-name = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/seat-server" #修改数据库连接地址
user = "root" #修改数据库用户名
password = "123456" #修改数据库密码
min-conn = 1
max-conn = 3
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}
}

由于使用了 db 模式的存储事务日志,所以我们需要创建一个 seata server 数据库,运行在 seata server 安装包中的 /conf/db_store.sql 文件。

然后修改 conf 目录下的 registry.conf 配置文件,指明配置中心为 nacos,并配置 nacos 连接信息。

nacos 的安装及使用可以参考:使用 Spring Cloud Alibaba Nacos Discovery 实现服务注册与发现

最后依此启动 nacos serverseata server 安装包中的 /bin/seata-server.bat

创建数据库

  • seata-order:存储订单的数据库。
  • seata-storage:存储库存的数据库。
  • seata-count:存储账户信息的数据库。

初始化业务表

order 表

1
2
3
4
5
6
7
8
9
10
CREATE TABLE `order` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`user_id` bigint(11) DEFAULT NULL COMMENT '用户id',
`product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
`count` int(11) DEFAULT NULL COMMENT '数量',
`money` decimal(11,0) DEFAULT NULL COMMENT '金额',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;

ALTER TABLE `order` ADD COLUMN `status` int(1) DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完结' AFTER `money` ;

storage 表

1
2
3
4
5
6
7
8
9
10
CREATE TABLE `storage` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
`total` int(11) DEFAULT NULL COMMENT '总库存',
`used` int(11) DEFAULT NULL COMMENT '已用库存',
`residue` int(11) DEFAULT NULL COMMENT '剩余库存',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

INSERT INTO `storage` (`id`, `product_id`, `total`, `used`, `residue`) VALUES ('1', '1', '100', '0', '100');

account 表

1
2
3
4
5
6
7
8
9
10
CREATE TABLE `account` (
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
`user_id` bigint(11) DEFAULT NULL COMMENT '用户id',
`total` decimal(10,0) DEFAULT NULL COMMENT '总额度',
`used` decimal(10,0) DEFAULT NULL COMMENT '已用余额',
`residue` decimal(10,0) DEFAULT '0' COMMENT '剩余可用额度',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

INSERT INTO `account` (`id`, `user_id`, `total`, `used`, `residue`) VALUES ('1', '1', '1000', '0', '1000');

最后还需要在每个数据库的表中创建创建事务日志表,运行在 seata server 安装包中的 /conf/db_undo_log.sql 文件。

完成后所有数据库表如图所示:

相关配置

seata-order-serviceseata-storage-serviceseata-account-service 三个服务进行配置大致相同,以 seata-account-service 为例。

  • application.yml 文件中主要加入以下配置:

    1
    2
    3
    4
    5
    spring:
    cloud:
    alibaba:
    seata:
    tx-service-group: fsp_tx_group #自定义事务组名称需要与 seata-server 中的对应

    文件完整内容如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    server:
    port: 8081

    spring:
    application:
    name: seata-account-service
    cloud:
    alibaba:
    seata:
    tx-service-group: fsp_tx_group
    nacos:
    discovery:
    server-addr: localhost:8848
    datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/seata-account
    username: root
    password: 123456

    mybatis:
    mapperLocations: classpath:mapper/*.xml

    logging:
    level:
    io:
    seata: info
  • 创建 file.conf 文件,主要修改自定义事务名称:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    service {
    #vgroup->rgroup
    vgroup_mapping.fsp_tx_group = "default" #修改自定义事务组名称
    #only support single node
    default.grouplist = "127.0.0.1:8091"
    #degrade current not support
    enableDegrade = false
    #disable
    disable = false
    #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
    max.commit.retry.timeout = "-1"
    max.rollback.retry.timeout = "-1"
    disableGlobalTransaction = false
    }
  • 创建 registry.conf 配置文件,主要指明 nacos 注册中心:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    registry {
    # file 、nacos 、eureka、redis、zk
    type = "nacos" #修改为nacos

    nacos {
    serverAddr = "localhost:8848" #修改为nacos的连接地址
    namespace = ""
    cluster = "default"
    }
    }
  • 在启动类中取消自动创建数据源

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
    @EnableDiscoveryClient
    @EnableFeignClients
    public class SeataOrderServiceApplication {

    public static void main(String[] args) {
    SpringApplication.run(SeataOrderServiceApplication.class, args);
    }

    }
  • 创建 DataSourceProxyConfig 配置文件使用 Seata 对数据源进行代理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    /**
    * 使用 Seata 对数据源进行代理
    */
    @Configuration
    public class DataSourceProxyConfig {

    @Value("${mybatis.mapperLocations}")
    private String mapperLocations;

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource(){
    return new DruidDataSource();
    }

    @Bean
    public DataSourceProxy dataSourceProxy(DataSource dataSource) {
    return new DataSourceProxy(dataSource);
    }

    @Bean
    public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
    SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
    sqlSessionFactoryBean.setDataSource(dataSourceProxy);
    sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()
    .getResources(mapperLocations));
    sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
    return sqlSessionFactoryBean.getObject();
    }

    }

开启事务

在业务实现类中使用 @GlobalTransaction 注解开启分布式事务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 订单业务实现类
*/
@Service
public class OrderServiceImpl implements OrderService {

private static final Logger LOGGER = LoggerFactory.getLogger(OrderServiceImpl.class);

@Autowired
private OrderDao orderDao;
@Autowired
private StorageService storageService;
@Autowired
private AccountService accountService;

/**
* 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态
*/
@Override
@GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)
public void create(Order order) {

LOGGER.info("开始下单");
//当前服务创建订单
orderDao.create(order);

//远程调用库存服务扣减库存
LOGGER.info("order-service 中扣减库存开始");
storageService.decrease(order.getProductId(),order.getCount());
LOGGER.info("order-service 中扣减库存结束");

//远程调用账户服务扣减余额
LOGGER.info("order-service 中扣减余额开始");
accountService.decrease(order.getUserId(),order.getMoney());
LOGGER.info("order-service 中扣减余额结束");

//修改订单状态为已完成
LOGGER.info("order-service 中修改订单状态开始");
orderDao.update(order.getUserId(),0);
LOGGER.info("order-service 中修改订单状态结束");

LOGGER.info("下单结束");
}
}

If you like this blog or find it useful for you, you are welcome to comment on it. You are also welcome to share this blog, so that more people can participate in it. If the images used in the blog infringe your copyright, please contact the author to delete them. Thank you !