一、引言
微服务的环境下,数据库隔离,数据不一致的情况越来越多,要么建立补偿机制要么手动刷数据,这时候就需要分布式事务,有问题上下游一起回滚,但是数据的可见性、链路时长还有其他很多原因说明链路不适宜拉的过长。
二、使用
1、服务端
作者使用的是db模式
在server使用的数据库建立了global_table、branch_table、lock_table,这些是服务端用于记录全局事务并进行锁表控制的记录表。
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int(11) DEFAULT NULL,
`begin_time` bigint(20) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `lock_table` (
`row_key` varchar(128) NOT NULL,
`xid` varchar(96) DEFAULT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`branch_id` bigint(20) NOT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`table_name` varchar(32) DEFAULT NULL,
`pk` varchar(36) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
修改file.conf
## transaction log store, only used in seata-server
store {
## store mode: file、db、redis
mode = "db"
## rsa decryption public key
publicKey = ""
## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 16384
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 16384
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}
## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
datasource = "druid"
## mysql/oracle/postgresql/h2/oceanbase etc.
dbType = "mysql"
driverClassName = "com.mysql.jdbc.Driver"
## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
//选择db模式将此处修改为global_table等表所在的库
url = "jdbc:mysql://**?zeroDateTimeBehavior=convertToNull&autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true"
user = "**"
password = "**"
minConn = 5
maxConn = 100
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
maxWait = 5000
}
## redis store property
redis {
## redis mode: single、sentinel
mode = "single"
## single mode property
single {
host = "127.0.0.1"
port = "6379"
}
## sentinel mode property
sentinel {
masterName = ""
## such as "10.28.235.65:26379,10.28.235.65:26380,10.28.235.65:26381"
sentinelHosts = ""
}
password = ""
database = "0"
minConn = 1
maxConn = 10
maxTotal = 100
queryLimit = 100
}
}
服务端需要加入到注册中心,目的是使客户端找到seata.registry配置的application,客户端会与服务端建立连接
作者使用的是eureka,修改registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka"
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = ""
password = ""
}
eureka {
//修改为客户端向eureka注册的路径
serviceUrl = "***"
//定义seata服务端的名称,客户端要配置相同名称
application = "seata-server"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = 0
password = ""
cluster = "default"
timeout = 0
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
aclToken = ""
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
dataId = "seataServer.properties"
}
consul {
serverAddr = "127.0.0.1:8500"
aclToken = ""
}
apollo {
appId = "seata-server"
## apolloConfigService will cover apolloMeta
apolloMeta = "http://192.168.1.204:8801"
apolloConfigService = "http://192.168.1.204:8080"
namespace = "application"
apolloAccesskeySecret = ""
cluster = "seata"
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
nodePath = "/seata/seata.properties"
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
2、客户端
客户端服务建立undo_log,存储的是服务协议回滚的信息。
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=83 DEFAULT CHARSET=utf8mb4;
客户端一开始选用的是最新的1.4.2,但是这个有坑在,具体看后面
<!--seata-->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>
客户端需要对事务分组、服务名等进行配置
#事务分组,与seata-server配置的分组对应
seata.tx-service-group = my_test_tx_group
#配置TC服务名
seata.service.vgroupMapping.my_test_tx_group = seata-server
#注册中心,与TC一致
seata.registry.type = eureka
seata.registry.eureka.service-url = ***
//这里的名称要和服务端配置的注册中心一致
seata.registry.eureka.application = seata-server
seata.registry.eureka.weight = 1
XA模式使用加@GlobalTransactional就可以,的确很方便
@Override
@GlobalTransactional
public void testSeata(MacUnserviceDayEntity macUnserviceDayEntity) {
String xid = RootContext.getXID();
this.save(macUnserviceDayEntity);
bitClient.departmentPage();
int i = 1/0;
}
但是测试之后下游服务没有回滚,应该是seata的全局事务xid没有传到下游
作者尝试引入spring-cloud-starter-alibaba-seata,很遗憾客户端无法启动
跟seata的创作者聊了一下,他们认为是博主公司父pom的boot版本太低了,这里博主也要吐槽一下父pom的boot才1.5.22,很多人反映过这个boot版本低有影响,但是负责人就是不改,没办法只能找其他办法
3、原因分析
既然是xid没有传递到下级服务,看看能不能自己传吧
上游服务定义配置类把xid加载到feign的请求头
@Configuration
public class FeignConfigSeata implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
//从seata的RootContext中取出xid
String xid = RootContext.getXID();
if (StringUtils.isNotBlank(xid)) {
System.out.println("feign 获得分布式事务xid:"+xid);
}
//把xid放入feign的请求头里面
requestTemplate.header("Fescar-Xid", xid);
}
}
下游服务从feign请求头取出xid进行seata的RootContext绑定
@Configuration
public class FescarXidFilter extends OncePerRequestFilter {
protected Logger logger = LoggerFactory.getLogger(FescarXidFilter.class);
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
String xid = RootContext.getXID();
//获取请求头中的xid
String restXid = request.getHeader("Fescar-Xid");
boolean bind = false;
if(StringUtils.isBlank(xid)&&StringUtils.isNotBlank(restXid)){
//使用seata的RootContext进行全局事务id的绑定
RootContext.bind(restXid);
bind = true;
if (logger.isDebugEnabled()) {
logger.debug("bind[" + restXid + "] to RootContext");
}
}
try{
filterChain.doFilter(request, response);
} finally {
if (bind) {
String unbindXid = RootContext.unbind();
if (logger.isDebugEnabled()) {
logger.debug("unbind[" + unbindXid + "] from RootContext");
}
if (!restXid.equalsIgnoreCase(unbindXid)) {
logger.warn("xid in change during http rest from " + restXid + " to " + unbindXid);
if (unbindXid != null) {
RootContext.bind(unbindXid);
logger.warn("bind [" + unbindXid + "] back to RootContext");
}
}
}
}
}
}
再试一下,然后还是没传过来,打断点看了一下,从上游的拦截开始就没有从RootContext中获取到,但是在测试方法是有的
想了一会,hystrix是有线程池的,使用线程池中的线程进行请求调用再把数据加载到原始线程,这样原始线程的ThreadLocal就没带过来啊
没辙了,hystrix关掉试试feign.hystrix.enabled = false,最终成功回滚,在undo_log中科院看到seata的回滚日志。
四、总结
这是一次seata的初体验,在无法使用官方包传递xid的情况下,需要服务自己实现,但是不能开启hystrix,这其实不能接受,所以作者后续还要尝试其他的方式。