0
点赞
收藏
分享

微信扫一扫

单机线程池执行断电了应该怎么处理?Fork/Join框架是什么?

单机线程池执行断电了应该怎么处理?

我们可以对正在处理和阻塞队列的任务做事务管理或者对阻塞队列中的任务持久化处理,并且当断电或者系统崩溃,操作无法继续下去的时候,可以通过回溯日志的方式来撤销正在处理的已经执行成功的操作。然后重新执行整个阻塞队列。

也就是说,对阻塞队列持久化;正在处理任务事务控制;断电之后正在处理任务的回滚,通过日志恢复该次操作;服务器重启后阻塞队列中的数据再加载。

Fork/Join框架

Fork/Join框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

要想掌握Fork/Join框架,首先需要理解两个点,分而治之和工作窃取算法。

分而治之

Fork/Join框架的定义,其实就体现了分治思想:将一个规模为N的问题分解为K个规模较小的子问题,这些子问题相互独立且与原问题性质相同。求出子问题的解,就可得到原问题的解。

单机线程池执行断电了应该怎么处理?Fork/Join框架是什么?_阻塞队列

工作窃取算法

大任务拆成了若干个小任务,把这些小任务放到不同的队列里,各自创建单独线程来执行队列里的任务。

那么问题来了,有的线程干活块,有的线程干活慢。干完活的线程不能让它空下来,得让它去帮没干完活的线程干活。它去其它线程的队列里窃取一个任务来执行,这就是所谓的工作窃取。

工作窃取发生的时候,它们会访问同一个队列,为了减少窃取任务线程和被窃取任务线程之间的竞争,通常任务会使用双端队列,被窃取任务线程永远从双端队列的头部拿,而窃取任务的线程永远从双端队列的尾部拿任务执行。

单机线程池执行断电了应该怎么处理?Fork/Join框架是什么?_阻塞队列_02

看一个Fork/Join框架应用的例子,计算1~n之间的和:1+2+3+…+n

设置一个分割阈值,任务大于阈值就拆分任务

任务有结果,所以需要继承RecursiveTask

public class CountTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 16; // 阈值
    private int start;
    private int end;
    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        // 如果任务足够小就计算任务
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
          for (int i = start; i <= end; i++) {
          sum += i;
        	}
        } else {
          // 如果任务大于阈值,就分裂成两个子任务计算
          int middle = (start + end) / 2;
          CountTask leftTask = new CountTask(start,middle);
          CountTask rightTask = new CountTask(middle + 1, end);
          // 执行子任务
          leftTask.fork();
          rightTask.fork(); // 等待子任务执行完,并得到其结果
          int leftResult = leftTask.join();
          int rightResult = rightTask.join(); // 合并子任务
          sum = leftResult + rightResult;
        }
        return sum;
        }
        public static void main(String[] args) {
          ForkJoinPool forkJoinPool = new ForkJoinPool(); // 生
          成一个计算任务,负责计算1+2+3+4
          CountTask task = new CountTask(1, 100); // 执行一个任务
          Future<Integer> result = forkJoinPool.submit(task);
          try {
            System.out.println(result.get());
            } catch (InterruptedException e) {
            } catch (ExecutionException e) {
          }
    }
}

ForkJoinTask与一般Task的主要区别在于它需要实现compute方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果比较大,就必须分割成两个子任务,每个子任务在调用fork方法时,又会进compute方法,看看当前子任务是否需要继续分割成子任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。

举报

相关推荐

0 条评论