0
点赞
收藏
分享

微信扫一扫

分布式事务seata使用

钟罗敏 2022-04-25 阅读 57

一、引言

        微服务的环境下,数据库隔离,数据不一致的情况越来越多,要么建立补偿机制要么手动刷数据,这时候就需要分布式事务,有问题上下游一起回滚,但是数据的可见性、链路时长还有其他很多原因说明链路不适宜拉的过长。

二、使用

1、服务端

        作者使用的是db模式

        在server使用的数据库建立了global_table、branch_table、lock_table,这些是服务端用于记录全局事务并进行锁表控制的记录表。

CREATE TABLE `global_table` (
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint(20) DEFAULT NULL,
  `status` tinyint(4) NOT NULL,
  `application_id` varchar(32) DEFAULT NULL,
  `transaction_service_group` varchar(32) DEFAULT NULL,
  `transaction_name` varchar(128) DEFAULT NULL,
  `timeout` int(11) DEFAULT NULL,
  `begin_time` bigint(20) DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`xid`),
  KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
  KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `branch_table` (
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint(20) DEFAULT NULL,
  `resource_group_id` varchar(32) DEFAULT NULL,
  `resource_id` varchar(256) DEFAULT NULL,
  `branch_type` varchar(8) DEFAULT NULL,
  `status` tinyint(4) DEFAULT NULL,
  `client_id` varchar(64) DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime(6) DEFAULT NULL,
  `gmt_modified` datetime(6) DEFAULT NULL,
  PRIMARY KEY (`branch_id`),
  KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `lock_table` (
  `row_key` varchar(128) NOT NULL,
  `xid` varchar(96) DEFAULT NULL,
  `transaction_id` bigint(20) DEFAULT NULL,
  `branch_id` bigint(20) NOT NULL,
  `resource_id` varchar(256) DEFAULT NULL,
  `table_name` varchar(32) DEFAULT NULL,
  `pk` varchar(36) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`row_key`),
  KEY `idx_branch_id` (`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

         修改file.conf

## transaction log store, only used in seata-server
store {
  ## store mode: file、db、redis
  mode = "db"
  ## rsa decryption public key
  publicKey = ""
  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"
    ## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
    //选择db模式将此处修改为global_table等表所在的库
    url = "jdbc:mysql://**?zeroDateTimeBehavior=convertToNull&autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true"
    user = "**"
    password = "**"
    minConn = 5
    maxConn = 100
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }

  ## redis store property
  redis {
    ## redis mode: single、sentinel
    mode = "single"
    ## single mode property
    single {
      host = "127.0.0.1"
      port = "6379"
    }
    ## sentinel mode property
    sentinel {
      masterName = ""
      ## such as "10.28.235.65:26379,10.28.235.65:26380,10.28.235.65:26381"
      sentinelHosts = ""
    }
    password = ""
    database = "0"
    minConn = 1
    maxConn = 10
    maxTotal = 100
    queryLimit = 100
  }
}

        服务端需要加入到注册中心,目的是使客户端找到seata.registry配置的application,客户端会与服务端建立连接

        作者使用的是eureka,修改registry.conf

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = ""
    password = ""
  }
  eureka {
    //修改为客户端向eureka注册的路径
    serviceUrl = "***"
    //定义seata服务端的名称,客户端要配置相同名称
    application = "seata-server"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = 0
    password = ""
    cluster = "default"
    timeout = 0
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
    aclToken = ""
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
    dataId = "seataServer.properties"
  }
  consul {
    serverAddr = "127.0.0.1:8500"
    aclToken = ""
  }
  apollo {
    appId = "seata-server"
    ## apolloConfigService will cover apolloMeta
    apolloMeta = "http://192.168.1.204:8801"
    apolloConfigService = "http://192.168.1.204:8080"
    namespace = "application"
    apolloAccesskeySecret = ""
    cluster = "seata"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
    nodePath = "/seata/seata.properties"
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

2、客户端

        客户端服务建立undo_log,存储的是服务协议回滚的信息。

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=83 DEFAULT CHARSET=utf8mb4;

        客户端一开始选用的是最新的1.4.2,但是这个有坑在,具体看后面

        <!--seata-->
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>1.4.2</version>
        </dependency>

         客户端需要对事务分组、服务名等进行配置

#事务分组,与seata-server配置的分组对应
seata.tx-service-group = my_test_tx_group
#配置TC服务名
seata.service.vgroupMapping.my_test_tx_group = seata-server
#注册中心,与TC一致
seata.registry.type = eureka
seata.registry.eureka.service-url = ***
//这里的名称要和服务端配置的注册中心一致
seata.registry.eureka.application = seata-server
seata.registry.eureka.weight = 1

        XA模式使用加@GlobalTransactional就可以,的确很方便

    @Override
    @GlobalTransactional
    public void testSeata(MacUnserviceDayEntity macUnserviceDayEntity) {
        String xid = RootContext.getXID();
        this.save(macUnserviceDayEntity);
        bitClient.departmentPage();
        int i  = 1/0;
    }

         但是测试之后下游服务没有回滚,应该是seata的全局事务xid没有传到下游

         作者尝试引入spring-cloud-starter-alibaba-seata,很遗憾客户端无法启动

         跟seata的创作者聊了一下,他们认为是博主公司父pom的boot版本太低了,这里博主也要吐槽一下父pom的boot才1.5.22,很多人反映过这个boot版本低有影响,但是负责人就是不改,没办法只能找其他办法

3、原因分析

        既然是xid没有传递到下级服务,看看能不能自己传吧

        上游服务定义配置类把xid加载到feign的请求头

@Configuration
public class FeignConfigSeata implements RequestInterceptor {
    @Override
    public void apply(RequestTemplate requestTemplate) {
        //从seata的RootContext中取出xid
        String xid = RootContext.getXID();
        if (StringUtils.isNotBlank(xid)) {
            System.out.println("feign 获得分布式事务xid:"+xid);
        }
        //把xid放入feign的请求头里面
        requestTemplate.header("Fescar-Xid", xid);
    }
}

        下游服务从feign请求头取出xid进行seata的RootContext绑定

@Configuration
public class FescarXidFilter extends OncePerRequestFilter {
    protected Logger logger = LoggerFactory.getLogger(FescarXidFilter.class);
    @Override
    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
        String xid = RootContext.getXID();
        //获取请求头中的xid
        String restXid = request.getHeader("Fescar-Xid");
        boolean bind = false;
        if(StringUtils.isBlank(xid)&&StringUtils.isNotBlank(restXid)){
            //使用seata的RootContext进行全局事务id的绑定
            RootContext.bind(restXid);
            bind = true;
            if (logger.isDebugEnabled()) {

                logger.debug("bind[" + restXid + "] to RootContext");

            }

        }
        try{

            filterChain.doFilter(request, response);

        } finally {
            if (bind) {
                String unbindXid = RootContext.unbind();
                if (logger.isDebugEnabled()) {

                    logger.debug("unbind[" + unbindXid + "] from RootContext");
                }

                if (!restXid.equalsIgnoreCase(unbindXid)) {
                    logger.warn("xid in change during http rest from " + restXid + " to " + unbindXid);

                    if (unbindXid != null) {
                        RootContext.bind(unbindXid);
                        logger.warn("bind [" + unbindXid + "] back to RootContext");

                    }
                }
            }
        }
    }
}

         再试一下,然后还是没传过来,打断点看了一下,从上游的拦截开始就没有从RootContext中获取到,但是在测试方法是有的

        想了一会,hystrix是有线程池的,使用线程池中的线程进行请求调用再把数据加载到原始线程,这样原始线程的ThreadLocal就没带过来啊

        没辙了,hystrix关掉试试feign.hystrix.enabled = false,最终成功回滚,在undo_log中科院看到seata的回滚日志。 

        

四、总结

        这是一次seata的初体验,在无法使用官方包传递xid的情况下,需要服务自己实现,但是不能开启hystrix,这其实不能接受,所以作者后续还要尝试其他的方式。

举报

相关推荐

0 条评论