0
点赞
收藏
分享

微信扫一扫

高优异步任务解决双重异步集合点阻塞问题

在性能测试的实践当中,异步任务是离不开的。Java异步编程提高了应用程序的性能和响应性,通过避免线程阻塞提高了资源利用率,并简化了并发编程的复杂性。改善用户体验,避免死锁和线程阻塞等问题。异步编程利用CompletableFuture、Future等工具和API简化了开发流程,提高了系统的稳定性和可靠性。

缘起

我也参照了 Go 语言的 go 关键字,自定义了 fun 关键字Java自定义异步功能实践 。但是在使用过程中,遇到了一个略显尴尬的问题,就是如果当一个异步任务中,又增加一个异步任务,且使用集合点设置。那么就会阻塞线程池,导致大量任务阻塞的情况。

比如一个学校,200个班级,每个班级有一个班主任,要给30个学生发作业,之后再报告作业分发已完成。按照之前的思路,我会包装两个异步且设置集合点的任务,伪代码如下:

    static void main(String[] args) {
        200.times {
            fun {
                sleep(1.0)// 模拟业务处理
                pushHomework()// 布置作业
            }
        }

    }

    /**
     * 布置作业
     */
    static void pushHomework() {
        FunPhaser phaser = new FunPhaser()// 创建同步屏障
        30.times {
            fun {
                sleep(1.0)// 模拟业务处理
                output("布置作业")
            } , phaser
        }
        phaser.await()// 等待所有作业布置完成
    }

最终的结果就是,等于最大线程数的任务会阻塞在 pushHomework() 方法中,而 pushHomework() 方法需要完成的异步任务又全都等待在线程池的等待队列中。

初解

一开始我的思路采取优先级策略。如果区分任务的优先级,让有可能阻塞在等待队列的高优任务优先执行即可。所以我先想使用 java.util.concurrent.PriorityBlockingQueue 当做 java.util.concurrent.BlockingQueue 的实现当做异步线程池的等待队列。

但也无法解决问题,因为依然存在阻塞的问题,只不过概率变小了而已。看来不得不使用单独的异步线程池来实现了。

关于线程池的选择有两种选择:

  1. 选择最大线程数较小的线程池,只是作为辅助功能,防止阻塞。在普通异步任务执行时,优先执行高优任务,利用普通线程池优先执行高优任务。
  2. 选择最小线程数较大的线程池,大概率是缓存线程池。单独用来执行高优任务。同时也可以利用普通的线程池执行高优任务。

关于我的选择,也没有选择。根据实际的情况使用吧。高优任务的多少、需要限制的频率等等因素。我自己的项目用的是第二种,原因是我用到高优任务的机会不多,我可以在脚本中控制高优任务的数量。

方案

首先是线程池的实现代码:

priorityPool = createFixedPool(POOL_SIZE, "P")

创建时机放在了普通线程池中:

    /**
     * 获取异步任务连接池
     * @return
     */
    static ThreadPoolExecutor getFunPool() {
        if (asyncPool == null) {
            synchronized (ThreadPoolUtil.class) {
                if (asyncPool == null) {
                    asyncPool = createPool(POOL_SIZE, POOL_SIZE, ALIVE_TIME, new LinkedBlockingDeque<Runnable>(Constant.MAX_WAIT_TASK), getFactory("F"))
                    daemon()
                }
                priorityPool = createFixedPool(POOL_SIZE, "P")
//                priorityPool = createPool(1, POOL_MAX, ALIVE_TIME, new LinkedBlockingQueue<Runnable>(10), getFactory("P"), new ThreadPoolExecutor.DiscardOldestPolicy())
            }
        }
        return asyncPool
    }

下面是调用执行高优的异步任务的方法:

    /**
     * 执行高优异步任务
     * @param runnable
     */
    static void executeSyncPriority(Runnable runnable) {
	    if (priorityPool == null) getFunPool()
        priorityPool.execute(runnable)
    }

还有一个调用方法,用来普通线程池优先执行高优任务:

    /**
     * 执行高优任务
     */
    static void executePriority() {
        def locked = priorityLock.compareAndSet(false, true)//如果没有锁,则加锁
        if (locked) {//如果加锁成功
            while (priorityPool.getQueue().size() > 0) {
                def poll = priorityPool.getQueue().poll()
                def queue = (LinkedBlockingDeque<Runnable>) getFunPool().getQueue()
                if (poll != null) {
                    queue.offerFirst(poll)
                }

            }
            priorityLock.set(false)//解锁
        }
    }

这里用到了一个原子类,当做高优之行时候的锁 private static AtomicBoolean priorityLock = new AtomicBoolean(false) ,避免在这块浪费过多性能。这里没有 try-catch-finally ,此处没有使用,确实发生异常概率较小。

我重新修改了任务队列的实现,用的是 java.util.concurrent.LinkedBlockingDeque ,这样我就可以将高优任务直接插入到队列的最前头,可以优先执行高优任务。

对于异步关键字,我也进行了一些改动:

    /**
     * 使用自定义同步器{@link FunPhaser}进行多线程同步
     *
     * @param f
     * @param phaser
     * @param log
     */
    public static void fun(Closure f, FunPhaser phaser, boolean log) {
        if (phaser != null) phaser.register();
        ThreadPoolUtil.executeSync(() -> {
            try {
                ThreadPoolUtil.executePriority();
                f.call();
            } finally {
                if (phaser != null) {
                    phaser.done();
                    if (log) logger.info("async task {}", phaser.queryTaskNum());
                }
            }
        });
    }

执行高优任务的关键字,我也进行了同样的封装,只不过换了个关键字和线程池:

    /**
     * 提交高优任务
     *
     * @param f
     * @param phaser
     * @param log
     */
    public static void funny(Closure f, FunPhaser phaser, boolean log) {
        if (phaser != null) phaser.register();
        ThreadPoolUtil.executeSyncPriority(() -> {
            try {
                f.call();
            } finally {
                if (phaser != null) {
                    phaser.done();
                    if (log) logger.info("priority async task {}", phaser.queryTaskNum());
                }
            }
        });
    }

验证

我们修改一下开始的脚本:

    static void main(String[] args) {
        setPoolMax(2)
        6.times {
            fun {
                sleep(1.0)// 模拟业务处理
                pushHomework()// 布置作业
            }
        }

    }

    /**
     * 布置作业
     */
    static void pushHomework() {
        FunPhaser phaser = new FunPhaser()// 创建同步屏障
        4.times {
            fun {
                sleep(1.0)// 模拟业务处理
                output("布置作业")
            } , phaser
        }
        phaser.await()// 等待所有作业布置完成
    }

执行的话,线程池的 F 线程全都全都是 TIME_WAITING 状态。当把 pushHomework() 方法改成高优关键字 funny 之后问题便可迎刃而解。

控制台输出如下:

22:47:17:160 P-1  布置作业
22:47:17:160 P-1  布置作业
22:47:17:160 P-1  priority async task 3
22:47:17:160 P-1  priority async task 4
22:47:18:178 F-2  布置作业
22:47:18:179 F-2  priority async task 3
22:47:19:183 F-2  布置作业

可以看出,已经开始有了 F 线程执行高优任务了。

举报

相关推荐

0 条评论