目录
3.0 .NET CORE 结合CAP实现最终一致性分布式事务
引言:
结合前三期 .NET CORE 分布式事务(一) DTM实现二阶段提交(.NET CORE 分布式事务(一) DTM实现二阶段提交_net 分布式事务-CSDN博客);.NET CORE 分布式事务(二) DTM实现TCC(.NET CORE 分布式事务(二) DTM实现TCC_.net core 分布式事物-CSDN博客);.NET CORE 分布式事务(三) DTM实现Saga及高并发下的解决方案(.NET CORE 分布式事务(三) DTM实现Saga及高并发下的解决方案-CSDN博客)。
1.0 最终一致性介绍
在分布式系统中,事务管理是确保数据一致性和系统稳定性的关键技术。传统的集中式事务处理模型(如两阶段提交协议)在分布式环境中面临诸多挑战,包括性能瓶颈、单点故障和扩展性问题。为了解决这些问题,最终一致性模型被提出并广泛应用于分布式事务处理中。
最终一致性(Eventual Consistency)是一种分布式系统的数据一致性模型,它不要求数据在每个时刻都保持一致,而是允许数据在短时间内不一致,但保证在一定时间后,所有节点上的数据会达到一致状态。这种模型降低了对即时一致性的要求,从而提高了系统的可用性和扩展性。
在介绍最终一致性之前,我们需要了解CAP定理,它是分布式计算领域的一个重要理论。CAP定理指出,在分布式系统中,一致性(Consistency)、可用性(Availability)和分区容错性(Partition Tolerance)三者不可兼得。换句话说,一个分布式系统最多只能满足这三个属性中的两个。最终一致性模型通常在可用性和分区容错性之间做出权衡,以实现高可用和可扩展的分布式系统。
最终一致性的实现方式有很多,以下是一些常见的方法:
-  
异步复制:在分布式系统中,数据更新操作首先在一个节点上完成,然后通过异步方式将更新复制到其他节点。这种方式可以降低响应延迟,提高系统性能,但可能导致短时间的数据不一致。
 -  
消息队列:通过消息队列来实现节点间的通信和数据同步。当一个节点完成数据更新后,它会将更新操作发送到消息队列,其他节点从队列中获取更新操作并执行。这种方式可以解耦节点间的依赖关系,提高系统的可扩展性。
 -  
版本向量:每个数据项都有一个版本向量,用于记录数据在不同节点上的更新顺序。当一个节点完成数据更新后,它会更新版本向量并将更新操作发送到其他节点。其他节点根据版本向量判断是否需要执行更新操作,从而避免冲突和数据不一致。
 -  
补偿事务:在某些场景下,无法立即完成数据一致更新。这时可以采用补偿事务的方式,即在后续操作中修复数据不一致的问题。例如,用户在购物车中添加商品后,系统可能会因为网络延迟导致库存数据不一致。这时可以在用户提交订单时进行库存校验,如果发现库存不足,则提示用户重新选择商品或等待库存恢复。
 
最终一致性模型在很多分布式系统中得到了广泛应用,如Amazon的Dynamo、Google的Bigtable和Megastore等。这些系统通过最终一致性实现了高可用、高性能和可扩展的分布式存储和计算服务。
然而,最终一致性也存在一定的局限性。首先,它不能保证实时数据一致性,可能导致短时间内的数据不一致和冲突。其次,最终一致性的实现通常需要复杂的逻辑和额外的资源消耗,如消息队列、版本向量等。此外,在某些关键业务场景下,如金融交易、实时监控等,最终一致性可能无法满足业务需求,需要采用更严格的一致性模型。
总之,最终一致性作为一种分布式事务管理模型,通过降低即时一致性要求,提高了分布式系统的可用性和扩展性。然而,它也存在一定局限性,需要根据具体业务场景和需求进行权衡和选择。在未来的分布式系统设计中,最终一致性仍将是一种重要的数据一致性模型,与其他一致性模型共同支撑着分布式事务处理的发展。
2.0 CAP

CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。
在我们构建 SOA 或者 微服务系统的过程中,我们通常需要使用事件来对各个服务进行集成,在这过程中简单的使用消息队列并不能保证数据的最终一致性, CAP 采用的是和当前数据库集成的本地消息表的方案来解决在分布式系统互相调用的各个环节可能出现的异常,它能够保证任何情况下事件消息都是不会丢失的。
你同样可以把 CAP 当做 EventBus 来使用,CAP提供了一种更加简单的方式来实现事件消息的发布和订阅,在订阅以及发布的过程中,你不需要继承或实现任何接口。
2.0 架构预览

根据上图我们可以看到 Micro-serviceA 微服务A 通过Cap 把数据提交给 Message Queue 也就是消息队列,另一个微服务B Micro-serviceB 通过Cap 连接 Message Queue 获取信息,然后执行。
CAP 支持主流的消息队列作为传输器,你可以按需选择下面的包进行安装:
PM> Install-Package DotNetCore.CAP.Kafka
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.AzureServiceBus
PM> Install-Package DotNetCore.CAP.AmazonSQS
PM> Install-Package DotNetCore.CAP.NATS
PM> Install-Package DotNetCore.CAP.RedisStreams
PM> Install-Package DotNetCore.CAP.Pulsar 
CAP 提供了主流数据库作为存储,你可以按需选择下面的包进行安装:
PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB 
3.0 .NET CORE 结合CAP实现最终一致性分布式事务
案例就不换了,还是以跨行转账作为例子,给大家详解这种架构。
3.1 准备工作(数据库,本文使用的是MySql)
3.1.1 数据模型
//模型
public class UserMoney
{
    public int id { get; set; }
    public int money { get; set; }
    public int trading_balance { get; set; }
    public int balance { get; set; }
    public int trymoney { get; set; }
    public string guid { get; set; }
} 
3.1.2 DbContext
 public class DtmDbContext : DbContext
 {
     public DtmDbContext() { }
     public DtmDbContext(DbContextOptions<DtmDbContext> options) : base(options) { }
 
     public virtual DbSet<UserMoney> UserMoney { get; set; }
 
     protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
     {
         optionsBuilder
             .UseMySql("server=localhost;port=3307;user id=root;password=123;database=DTM_Test", ServerVersion.Parse("8.0.23-mysql"))
             .UseLoggerFactory(LoggerFactory.Create(option =>
             {
                 option.AddConsole();
             }));
     }
 
     protected override void OnModelCreating(ModelBuilder modelBuilder)
     {
         modelBuilder
             .UseCollation("utf8_general_ci")
             .HasCharSet("utf8");
 
         modelBuilder.Entity<UserMoney>(entity =>
         {
             entity.ToTable("UserMoney");
         });
     }
 } 
3.1.3 数据库最终生成

数据库不仅会生成我们的数据模型,还会多生成两张表,一张是事务发布表,一张是事务接收表。
3.2 Nuget引入
  <ItemGroup>
    <PackageReference Include="DotNetCore.CAP" Version="5.1.0" />
    <PackageReference Include="DotNetCore.CAP.MySql" Version="5.1.0" />
    <PackageReference Include="DotNetCore.CAP.Dashboard" Version="5.1.0" />
    <PackageReference Include="DotNetCore.CAP.RabbitMQ" Version="5.1.0" />
  </ItemGroup> 
3.3 appsettings.json
{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*",
  "RabbitMQ": {
    "HostName": "192.168.157.157",
    "UserName": "123",
    "Password": "123",
    "Port": "5672"
  },
  "MysqlConn": "server=localhost;port=3307;user id=root;password=123;database=DTM_Test"
}
 
3.4 docker启动一个RabbitMQ
docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=123 -e RABBITMQ_DEFAULT_PASS=123 -p 15672:15672 -p 5672:5672 rabbitmq:3-management
 
账号密码都是 123。之前的博文详细介绍过RabbitMQ 。(.NET CORE消息队列RabbitMQ-CSDN博客)
3.5 Program.cs
namespace Ncc_Cap_Service
{
    public class Program
    {
        public static void Main(string[] args)
        {
            var builder = WebApplication.CreateBuilder(args);
            builder.Services.AddControllers();
            builder.Services.AddEndpointsApiExplorer();
            builder.Services.AddSwaggerGen();
            #region NCC_CAP
            var mysqlConn = builder.Configuration.GetValue<string>("MysqlConn")!;
            var rabbitMQHostName = builder.Configuration.GetValue<string>("RabbitMQ:HostName")!;
            var rabbitMQUserName = builder.Configuration.GetValue<string>("RabbitMQ:UserName")!;
            var rabbitMQPassword = builder.Configuration.GetValue<string>("RabbitMQ:Password")!;
            var rabbitMQPort = builder.Configuration.GetValue<int>("RabbitMQ:Port")!;
            builder.Services.AddCap(x =>
            {
                x.UseMySql(mysqlConn);
                x.UseRabbitMQ(x =>
                {
                    x.HostName = rabbitMQHostName;
                    x.UserName = rabbitMQUserName;
                    x.Password = rabbitMQPassword;
                    x.Port = rabbitMQPort;
                });
                //重试次数
                x.FailedRetryCount = 10;
                //失败的重试间隔
                x.FailedRetryInterval = 60;
            });
            #region   注册DbContext
            builder.Services.AddDbContext<DtmDbContext>(options =>
            {
                options.UseMySql(mysqlConn, ServerVersion.Parse("8.0.23-mysql"));
            });
            #endregion
            #endregion
            var app = builder.Build();
            if (app.Environment.IsDevelopment())
            {
                app.UseSwagger();
                app.UseSwaggerUI();
            }
            app.UseAuthorization();
            app.MapControllers();
            app.Run();
        }
    }
}
 
3.6 用户1 API控制器
using DotNetCore.CAP;
using DTM_EF;
using DTM_EF.Model;
using Microsoft.AspNetCore.Mvc;
namespace Ncc_Cap_Service.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class CapUserAController : ControllerBase
    {
        private readonly ILogger<CapUserAController> _logger;
        private readonly ICapPublisher _iCapPublisher;
        private readonly DtmDbContext _dtmDbContext;
        const string PublishName = "NCC_CAP.CapUserAController.CAP";
      
        public CapUserAController(ILogger<CapUserAController> logger,
            ICapPublisher iCapPublisher,
            DtmDbContext dtmDbContext)
        {
            _logger = logger;
            _iCapPublisher = iCapPublisher;
            _dtmDbContext = dtmDbContext;
        }
        /// <summary>
        /// 普通的Cap
        /// </summary>
        /// <param name="trymoney"></param>
        /// <returns></returns>
        /// <exception cref="Exception"></exception>
        [Route("/Cap")]
        [HttpPost]
        public async Task<IActionResult> CapTransaction(int trymoney)
        {
            //获取用户账户信息
            var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == 1).FirstOrDefault();
            if (UserMoney is null) throw new Exception($"用户{1}--不存在");
            if (UserMoney.money + trymoney < 0) throw new Exception($"用户{1}--金额不足");
            //前序判断都通过,修改信息准备提交   
            UserMoney!.money += trymoney;
            _dtmDbContext.SaveChanges();
            await _iCapPublisher.PublishAsync(PublishName, -trymoney);
            return Ok();
        }
    }
}
 
3.7 用户2 API控制器
using DotNetCore.CAP;
using DTM_EF;
using Microsoft.AspNetCore.Mvc;
using DTM_EF.Model;
using Microsoft.EntityFrameworkCore;
namespace Ncc_Cap_Service.Controllers
{
    public class CAPUserBController
    {
        private readonly ILogger<CAPUserBController> _logger;
        private readonly DtmDbContext _dtmDbContext;
        public CAPUserBController(ILogger<CAPUserBController> logger,
            DtmDbContext dtmDbContext)
        {
            _logger = logger;
            _dtmDbContext = dtmDbContext;
        }
        [NonAction]
        [CapSubscribe("NCC_CAP.CapUserAController.CAP")]
        public async Task CapTransaction(int trymoney)
        {
            //获取用户账户信息
            var UserMoney = await _dtmDbContext.Set<UserMoney>().Where(c => c.id == 2).FirstOrDefaultAsync();
            if (UserMoney is null) throw new Exception($"用户{1}--不存在");
            if (UserMoney.money + trymoney < 0) throw new Exception($"用户{1}--金额不足");
            //前序判断都通过,修改信息准备提交   
            UserMoney!.money += trymoney;
            _dtmDbContext.SaveChanges();
        }
    }
}
 
 
4.0 测试运行
用户1和2都先初始化金额1000元。

转100元:

 
5.0 消息附加头信息的CAP
CAP可以使用一个 Dictionary<string, string> 字典类型的作为下游事务的附加消息头。
5.1 上游事务代码
        /// <summary>
        /// Cap--消息附加的头信息
        /// </summary>
        /// <param name="trymoney"></param>
        /// <returns></returns>
        /// <exception cref="Exception"></exception>
        [Route("/Cap/Header")]
        [HttpPost]
        public async Task<IActionResult> CapHeaderTransaction(int trymoney)
        {
            //获取用户账户信息
            var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == 1).FirstOrDefault();
            if (UserMoney is null) throw new Exception($"用户{1}--不存在");
            if (UserMoney.money + trymoney < 0) throw new Exception($"用户{1}--金额不足");
            //前序判断都通过,修改信息准备提交   
            UserMoney!.money += trymoney;
            _dtmDbContext.SaveChanges();
            IDictionary<string, string> header = new Dictionary<string, string>();
            header.Add("转账发起人", "1");
            header.Add("转账接受人", "2");
            header.Add("转账金额", trymoney.ToString());
            await _iCapPublisher.PublishAsync(PublishHeaderName, -trymoney, header);
            return Ok();
        } 
5.2 下游事务代码
  [NonAction]
  [CapSubscribe("NCC_CAP.CapUserAController.CapHeader")]
  public async Task CapHeaderTransaction(int trymoney, [FromCap] CapHeader header)
  {
      #region header信息
      _logger.LogInformation($"转账发起人:{header["转账发起人"]}");
      _logger.LogInformation($"转账接受人:{header["转账接受人"]}");
      _logger.LogInformation($"转账金额:{header["转账金额"]}");
      #endregion
      //获取用户账户信息
      var UserMoney = await _dtmDbContext.Set<UserMoney>().Where(c => c.id == 2).FirstOrDefaultAsync();
      if (UserMoney is null) throw new Exception($"用户{1}--不存在");
      if (UserMoney.money + trymoney < 0) throw new Exception($"用户{1}--金额不足");
      //前序判断都通过,修改信息准备提交   
      UserMoney!.money += trymoney;
      _dtmDbContext.SaveChanges();
  } 
5.3 测试运行
我们在下游事务中,接受上游事务的头信息,并输出。

6.0 使用消息队列中不同的交换机和队列
服务实例在启动时会自动向RabbitMQ注册一个名为 cap.default.router 的交换机。这是最普通的交换机,如果您想设置其他种类的交换机请参考(.NET CORE消息队列RabbitMQ-CSDN博客),可以先建立一个交换机或队列。然后代码指向这个交换机或队列。
  builder.Services.AddCap(x =>
  {
      //设置队列名称
      x.DefaultGroupName = "queue.name";
      x.UseMySql(mysqlConn);
      x.UseRabbitMQ(x =>
      {
          x.HostName = rabbitMQHostName;
          x.UserName = rabbitMQUserName;
          x.Password = rabbitMQPassword;
          x.Port = rabbitMQPort;
          //设置交换机名称--就是可以先设置交换机类型--扇形模式,发布订阅模式,主题模式,路由模式。
          x.ExchangeName = "Exchange";
      });
      //重试次数
      x.FailedRetryCount = 10;
      //失败的重试间隔
      x.FailedRetryInterval = 60;
  }); 
如果设置了队列名称,下游服务怎么区分是哪个队列的事务呢?我们可以在代码中指向对应的队列名字。
        [NonAction]
        [CapSubscribe("NCC_CAP.CapUserAController.CapQueue", Group = "queue.name")]
        public async Task CapQueueTransaction(int trymoney)
        {
            //获取用户账户信息
            var UserMoney = await _dtmDbContext.Set<UserMoney>().Where(c => c.id == 2).FirstOrDefaultAsync();
            if (UserMoney is null) throw new Exception($"用户{1}--不存在");
            if (UserMoney.money + trymoney < 0) throw new Exception($"用户{1}--金额不足");
            //前序判断都通过,修改信息准备提交   
            UserMoney!.money += trymoney;
            _dtmDbContext.SaveChanges();
        } 
通过Group = "queue.name" 直接指向了队列名称。
7.0 重试失败回调
正常情况下,分布式事务正常执行完毕之后会在数据库中进行标识。

但是当出现网络延迟、服务宕机时。CAP就会根据设置时间,进行指定次数的重试。全部重试均失败之后,会执行一个回调。
  builder.Services.AddCap(x =>
  {
      x.UseMySql(mysqlConn);
      x.UseRabbitMQ(x =>
      {
          x.HostName = rabbitMQHostName;
          x.UserName = rabbitMQUserName;
          x.Password = rabbitMQPassword;
          x.Port = rabbitMQPort;
      });
      //重试次数
      x.FailedRetryCount = 10;
      //失败的重试间隔
      x.FailedRetryInterval = 60;
      x.FailedThresholdCallback = failed =>
      {
          //当一个cap重试失败之后执行的回调(这里可以执行邮件通知,错误短信发送给运维工程师,进行人工介入处理)
          var logger = failed.ServiceProvider.GetService<ILogger<Program>>()!;
          logger.LogError($@"MessageType {failed.MessageType} 失败了, 重试了 {x.FailedRetryCount} 次, 消息参数: {failed.Message.Value}");
      };
  }); 
小结:
本文给出了一个完整的 CAP 事务方案,是一个可以实际运行的 CAP ,并解决高并发的使用场景,您只需要在这个示例的基础上进行简单修改,就能够用于解决您的真实问题。










