Redis 帮助类RedisHelper之前发布过帖子,指路.Net Core WebApi Redis消息订阅与发布
本帖着重主要记录一下WebApi 订阅Redis消息。
故事的起因
我们需要使用Redis缓存,进行数据存储于处理,起初直接更新Redis,由于需要频繁更新和多线程操作,导致redis操作时卡死,经过一番某度了解,决定使用 消息的订阅-发布 进行操作,然后做为菜鸟的我就开始发愁了,然后又经过某度一番了解,故事徐徐展开......
故事的发展
首先说一下业务逻辑,我们的程序分为三块Service服务、WebApi接口、前端页面,每块负责的主要职责大致如下图:
我们要做的就是在WebApi接口中订阅Redis通道,并消费消息。
众所周知,普通的WebApi接口中是不信呢个直接订阅消息的,因为controller中的方法不能在后台主动运行,然后做为菜鸟的我开始发愁,怎么去实现消息订阅呢,然后经过度娘的谆谆教诲,发现需要借助host的一些接口来实现后台任务,在任务中进行订阅。
然后就迎来了我的第一个坑,使用IHostedService,然后我就一波操作猛如虎,回头一看好痛苦。首先,新建ChennelSubTest类,实现IHostedService接口及方法,代码如下:
public class ChennelSubTest : IHostedService, IDisposable
{
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
private readonly RedisHelper _helper;
public ChennelSubTest (ILogger<ChennelSubTest > logger, IServiceProvider provider, RedisHelper helper)
{
_logger = logger;
_provider = provider;
_helper = helper;
}
//消息订阅
private const string Channel_test = "test";
public static MemoryCache memoryCache = new MemoryCache(new MemoryCacheOptions() { });
public Task StartAsync(CancellationToken cancellationToken)
{
//缓存过期时间
int seconds = Convert.ToInt32(ConfigurationHelper.GetConfigValueByKey("AppSettings:CatchTimeoutSecond"));
_logger.LogInformation("程序启动");
Task.Run(async () =>
{
using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
{
//redis对象
//var _redis = scope.ServiceProvider.GetService<ICacheService>();
await _helper.SubscribeAsync(Channel_test, new Action<RedisChannel, RedisValue>((channel, msg) =>
{
_logger.LogInformation("收到订阅结果:" + msg);
string key = 1;
memoryCache.Set(key, msg, DateTime.Now.AddSeconds(seconds));
}));
}
});
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("结束");
return Task.CompletedTask;
}
public void Dispose()
{
_logger.LogInformation("退出");
}
}
示例代码中的数据处理比较简单,直接存在了缓存中。实际的业务是,我需要数据处理存储到数据库中,常规操作,我就直接在构造方法中注入了,结果程序运行不起来,一直报错,
又经过一番百度,原来 AddHostedService默认的注入默认生存时间是Transient,而我的ReadRateBLL是以Scoped 添加到容器的,所以会报上面截图中的错误,因为:
服务不能依赖于生命周期小于其自身的服务。
所以,我就把我的业务方法注入改成了Transient生命周期,然后程序可正常启动了,这个坑暂且迈过去了。也可以有其他方法解决这个问题,可参考:
netcore的依赖注入,在BackgroundService中使用Scope注入的服务_atzqtzq的博客-CSDN博客
翻过了一座山 ~ 越过了一道弯~ 撩动白云蓝天蓝~~ 望眼平川大步迈向前~
激动的心,颤抖的手,赶紧跑起来程序进行测试,发不了几条测试消息之后,程序却无动于衷,我....心如死灰
没办法,又问百度,发现IHostedService 接口,只是一次注册了服务,不是后台一直运行的,继续肝~发现微软其实为了解决后台运行问题,提供了BackgroundService 这个基类,心又活了~~继续唱:翻过了一座山 ~ 越过了一道弯~
看到了大佬的大佬教大佬做事的一篇帖子,借鉴之后才看到看到了解决问题的曙光,此时,我也想拥有大佬慈爱教我做事的目光,这样就不会翻那么多山了,呜呜~BackgroundService 大佬教的好_段丛磊的博客-CSDN博客
借鉴大佬的InitBackgroundWork 基类之后,在DoWork 方法中订阅通道:
protected override void DoWork(object state)
{
_helper.Unsubscribe(ReadReturn);
_helper.Subscribe(ReadReturn, (channel, msg) => SetCmdData(channel, msg));
}
这里是先取消订阅,然后再重新订阅通道,因为Init方法中是while(true)一直在执行的,如果不先取消再订阅,会出现每次收到消息都订阅一次的情况,这样会重复订阅多次,明显是不对的,所以我就先取消订阅,再进行订阅,这种方式感觉不大好,但是暂时想不到其他更好的方法,希望有大佬看到帖子能指正一下。
最终,问题全部解决,运行程序,发布消息,可以成功订阅,喜大普奔~翻过了一座山 ~ 越过了一道弯~ 撩动白云蓝天蓝~~ 望眼平川大步迈向前~