0
点赞
收藏
分享

微信扫一扫

.Net Core WebApi Redis消息订阅

小磊z 2022-04-01 阅读 90

        Redis 帮助类RedisHelper之前发布过帖子,指路.Net Core WebApi Redis消息订阅与发布

本帖着重主要记录一下WebApi 订阅Redis消息。

故事的起因

        我们需要使用Redis缓存,进行数据存储于处理,起初直接更新Redis,由于需要频繁更新和多线程操作,导致redis操作时卡死,经过一番某度了解,决定使用 消息的订阅-发布 进行操作,然后做为菜鸟的我就开始发愁了,然后又经过某度一番了解,故事徐徐展开......

故事的发展

        首先说一下业务逻辑,我们的程序分为三块Service服务、WebApi接口、前端页面,每块负责的主要职责大致如下图:

我们要做的就是在WebApi接口中订阅Redis通道,并消费消息。

        众所周知,普通的WebApi接口中是不信呢个直接订阅消息的,因为controller中的方法不能在后台主动运行,然后做为菜鸟的我开始发愁,怎么去实现消息订阅呢,然后经过度娘的谆谆教诲,发现需要借助host的一些接口来实现后台任务,在任务中进行订阅。

        然后就迎来了我的第一个坑,使用IHostedService,然后我就一波操作猛如虎,回头一看好痛苦。首先,新建ChennelSubTest类,实现IHostedService接口及方法,代码如下:

       

 public class ChennelSubTest : IHostedService, IDisposable
    {

        private readonly IServiceProvider _provider;
        private readonly ILogger _logger;
        private readonly RedisHelper _helper;
        public ChennelSubTest (ILogger<ChennelSubTest > logger, IServiceProvider provider, RedisHelper helper)
        {
            _logger = logger;
            _provider = provider;
            _helper = helper;
        }
        //消息订阅
        private const string Channel_test = "test";
        public static MemoryCache memoryCache = new MemoryCache(new MemoryCacheOptions() { });

        public Task StartAsync(CancellationToken cancellationToken)
        {
            //缓存过期时间
            int seconds = Convert.ToInt32(ConfigurationHelper.GetConfigValueByKey("AppSettings:CatchTimeoutSecond"));
            _logger.LogInformation("程序启动");
            Task.Run(async () =>
            {
                using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
                {
                    //redis对象
                    //var _redis = scope.ServiceProvider.GetService<ICacheService>();
                    await _helper.SubscribeAsync(Channel_test, new Action<RedisChannel, RedisValue>((channel, msg) =>
                    {
                        _logger.LogInformation("收到订阅结果:" + msg);
                        string key =  1;
                        memoryCache.Set(key, msg, DateTime.Now.AddSeconds(seconds));
                    }));

                }
            });
            return Task.CompletedTask;
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("结束");
            return Task.CompletedTask;
        }

        public void Dispose()
        {
            _logger.LogInformation("退出");
        }

        
    }

示例代码中的数据处理比较简单,直接存在了缓存中。实际的业务是,我需要数据处理存储到数据库中,常规操作,我就直接在构造方法中注入了,结果程序运行不起来,一直报错,

         又经过一番百度,原来 AddHostedService默认的注入默认生存时间是Transient,而我的ReadRateBLL是以Scoped 添加到容器的,所以会报上面截图中的错误,因为:

服务不能依赖于生命周期小于其自身的服务。

所以,我就把我的业务方法注入改成了Transient生命周期,然后程序可正常启动了,这个坑暂且迈过去了。也可以有其他方法解决这个问题,可参考:

netcore的依赖注入,在BackgroundService中使用Scope注入的服务_atzqtzq的博客-CSDN博客

翻过了一座山 ~    越过了一道弯~ 撩动白云蓝天蓝~~    望眼平川大步迈向前~

激动的心,颤抖的手,赶紧跑起来程序进行测试,发不了几条测试消息之后,程序却无动于衷,我....心如死灰

        没办法,又问百度,发现IHostedService 接口,只是一次注册了服务,不是后台一直运行的,继续肝~发现微软其实为了解决后台运行问题,提供了BackgroundService 这个基类,心又活了~~继续唱:翻过了一座山 ~    越过了一道弯~

看到了大佬的大佬教大佬做事的一篇帖子,借鉴之后才看到看到了解决问题的曙光,此时,我也想拥有大佬慈爱教我做事的目光,这样就不会翻那么多山了,呜呜~BackgroundService 大佬教的好_段丛磊的博客-CSDN博客

借鉴大佬的InitBackgroundWork 基类之后,在DoWork 方法中订阅通道:

 protected override void DoWork(object state)
        {
            _helper.Unsubscribe(ReadReturn);
            _helper.Subscribe(ReadReturn, (channel, msg) => SetCmdData(channel, msg));      
        }

这里是先取消订阅,然后再重新订阅通道,因为Init方法中是while(true)一直在执行的,如果不先取消再订阅,会出现每次收到消息都订阅一次的情况,这样会重复订阅多次,明显是不对的,所以我就先取消订阅,再进行订阅,这种方式感觉不大好,但是暂时想不到其他更好的方法,希望有大佬看到帖子能指正一下。

        最终,问题全部解决,运行程序,发布消息,可以成功订阅,喜大普奔~翻过了一座山 ~    越过了一道弯~ 撩动白云蓝天蓝~~    望眼平川大步迈向前~

举报

相关推荐

0 条评论