0
点赞
收藏
分享

微信扫一扫

redisson使用阻塞队列引发的异常解决

认真的老去 2021-09-21 阅读 159

背景

  • 升级redisson版本后解决了redisson的队列丢消息问题
<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.15.0</version>
</dependency>
  • 随之又出现了其它异常问题,报错信息有如下几种:
  1. PingConnectionHandler类报异常,之前分析过这个类,这个异常报得特别多 一晚上将近10w条
    

  1. RedisExecutor类异常,堆栈如下
        at org.redisson.command.RedisExecutor$2.run(RedisExecutor.java:205) ~[redisson-3.15.0.jar!/:3.15.0]
        at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672) ~[netty-common-4.1.45.Final.jar!/:4.1.45.Final]
        at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747) ~[netty-common-4.1.45.Final.jar!/:4.1.45.Final]
        at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472) ~[netty-common-4.1.45.Final.jar!/:4.1.45.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar!/:4.1.45.Final]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]

异常信息如下:

  1. Caused by: org.redisson.client.WriteRedisConnectionException: Channel has been closed!
    

分析结果

上面的报错信息redisson在最新的版本报错信息已经明确提示了"Avoid to use blocking commands in Async/JavaRx/Reactive handlers即不要用阻塞命令,否则会有问题。

经过分析之后,上面几种异常发生的原因都是因为发送PING命令引发的,原理就是代码使用了阻塞方法,由netty-threads执行的监听器或者带延迟时间的监听器导致了Redis的请求响应处理错误。

解决方案

  1. 不用redisson的阻塞相关方法,改用非阻塞方式
  2. 使用redisson的异步非阻塞方式实现代码,此种方式实现起来,代码更简洁,直观,并且节省了资源。下面是官方给的使用示例
  3. 禁用redisson ping即可解决使用阻塞方式引发的报错
官方示例地址:
https://github.com/redisson/redisson/wiki/3.-Operations-execution#31-async-way

结合我们自身业务代码伪代码如下:

  • redisQueueExecutorPool是一个自定义的线程池
public <T extends AbstractRedisQueueJob, R> void doTask(RedisQueueJobListener<T, R> taskEventListener, RBlockingQueue<T> blockingFairQueue) {
    //使用非阻塞方式
        blockingFairQueue.pollAsync().whenCompleteAsync((res, throwable) -> {
            try {
                if (Objects.nonNull(res)) {
                    log.info(res);
                    RedisQueueJobQueueResult<R> result = taskEventListener.invoke(res);
                    //判断是否进行重试 true:结束循环  false:继续循环
                    if (taskEventListener.postProcessor(res, result.getResult()) != RedisQueueJobListener.Result.PROCESSING_COMPLETE) {
                        if (res.getIsDelayQueue()) {
                            RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
                            delayedQueue.offer(res, res.getDelay(), TimeUnit.SECONDS);
                        } else {
                            blockingFairQueue.add(res);
                        }
                    } else {
                        log.info("redis队列任务执行完成,任务对象 {} 结果对象 {}", res, result);
                    }
                }
                if (Objects.nonNull(throwable)) {
                    log.error(throwable.getMessage(), throwable);
                }
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            } finally {
                //在这里休眠一小会儿
                //注意这里不是递归,不会产生递归的方法栈溢出问题,这里是重新调用而已不会堆栈
                doTask(taskEventListener, blockingFairQueue);
            }
        }, redisQueueExecutorPool);
    }
举报

相关推荐

0 条评论