1. fork/join
1.1 分而治之

1.2 工作密取
当前线程的 Task 已经全被执行完毕,则自动取到其他线程的 Task 池中取出 Task 继续执行。
ForkJoinPool 中维护着多个线程(一般为 CPU 核数)在不断地执行 Task,每个线程除了执行自己职务内的 Task 之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的 Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高 CPU 利用率。

1.3 标准范式

public class MakeArray {
public static final int ARRAY_LENGTH = 40000000;
public static int[] makeArray() {
Random r = new Random();
int[] result = new int[ARRAY_LENGTH];
for (int i = 0; i < ARRAY_LENGTH; i++) {
result[i] = r.nextInt(ARRAY_LENGTH * 3);
}
return result;
}
}
public class SumArray {
private static class SumTask extends RecursiveTask<Integer> {
private final static int THRESHOLD = MakeArray.ARRAY_LENGTH / 10;
private int[] src;
private int fromIndex;
private int toIndex;
public SumTask(int[] src, int fromIndex, int toIndex) {
this.src = src;
this.fromIndex = fromIndex;
this.toIndex = toIndex;
}
@Override
protected Integer compute() {
/*任务的大小是否合适*/
if (toIndex - fromIndex < THRESHOLD) {
int count = 0;
for (int i = fromIndex; i <= toIndex; i++) {
SleepTools.ms(1);
count = count + src[i];
}
return count;
} else {
//fromIndex....mid.....toIndex
int mid = (fromIndex + toIndex) / 2;
SumTask left = new SumTask(src, fromIndex, mid);
SumTask right = new SumTask(src, mid + 1, toIndex);
invokeAll(left, right);
return left.join() + right.join();
}
}
}
public static void main(String[] args) {
int[] src = MakeArray.makeArray();
ForkJoinPool pool = new ForkJoinPool();
SumTask innerFind = new SumTask(src, 0, src.length - 1);
pool.invoke(innerFind);
}
}
2. CountDownLatch
public class UseCountDownLatch {
static CountDownLatch latch = new CountDownLatch(6);
private static class InitThread implements Runnable {
public void run() {
for (int i = 0; i < 2; i++) {
System.out.println("123");
}
latch.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 6; i++) {
new Thread(new InitThread()).start();
}
latch.await();
System.out.println("Main do work........");
}
}
3. CyclicBarrier
大家到一个点,再同时执行。
public class UseCyclicBarrier {
private static CyclicBarrier barrier = new CyclicBarrier(4, new CollectThread());
private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();
public static void main(String[] args) {
for (int i = 0; i < 4; i++) {
Thread thread = new Thread(new SubThread());
thread.start();
}
}
private static class CollectThread implements Runnable {
@Override
public void run() {
StringBuilder result = new StringBuilder();
for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
result.append("[" + workResult.getValue() + "]");
}
System.out.println(" the result = " + result);
}
}
private static class SubThread implements Runnable {
@Override
public void run() {
long id = Thread.currentThread().getId();
resultMap.put(Thread.currentThread().getId() + "", id);
try {
Thread.sleep(1000 + id);
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
4. Semaphore
acquire、release
注意使用 Semaphore 时,先要调用 acquire,如果先调用 release,会凭空多一个资源。
public class DBPoolSemaphore {
private final static int POOL_SIZE = 10;
private final Semaphore useful, useless;
//存放数据库连接的容器
private static LinkedList<Connection> pool = new LinkedList<Connection>();
//初始化池
static {
for (int i = 0; i < POOL_SIZE; i++) {
pool.addLast(SqlConnectImpl.fetchConnection());
}
}
public DBPoolSemaphore() {
this.useful = new Semaphore(10);
this.useless = new Semaphore(0);
}
/*归还连接*/
public void returnConnect(Connection connection) throws InterruptedException {
if (connection != null) {
useless.acquire();
synchronized (pool) {
pool.addLast(connection);
}
useful.release();
}
}
/*取连接*/
public Connection takeConnect() throws InterruptedException {
useful.acquire();
Connection connection;
synchronized (pool) {
connection = pool.removeFirst();
}
useless.release();
return connection;
}
}
5. Exchange
用于两个线程之间的数据交换。

public class UseExchange {
private static final Exchanger<Set<String>> exchange = new Exchanger<Set<String>>();
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
Set<String> setA = new HashSet<String>();//存放数据的容器
try {
setA.add("1");
setA = exchange.exchange(setA);//交换set
/*处理交换后的数据*/
} catch (InterruptedException e) {
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
Set<String> setB = new HashSet<String>();//存放数据的容器
try {
setB.add("2");
setB = exchange.exchange(setB);//交换set
/*处理交换后的数据*/
} catch (InterruptedException e) {
}
}
}).start();
}
}
6. Callable、Future、FutureTask
FutureTask 可取消任务。

public class UseFutureTask {
private static class UseCallable implements Callable<Integer> {
private int sum;
@Override
public Integer call() {
System.out.println("Callable子线程开始计算!");
for (int i = 0; i < 5000; i++) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("Callable子线程计算任务中断!");
return null;
}
sum = sum + i;
System.out.println("sum=" + sum);
}
System.out.println("Callable子线程计算结束!结果为: " + sum);
return sum;
}
}
public static void main(String[] args)
throws InterruptedException, ExecutionException {
UseCallable useCallable = new UseCallable();
//包装
FutureTask<Integer> futureTask = new FutureTask<>(useCallable);
new Thread(futureTask).start();
Thread.sleep(1);
Random r = new Random();
if (r.nextInt(100) > 50) {
System.out.println("Get UseCallable result = " + futureTask.get());
} else {
System.out.println("Cancel................. ");
futureTask.cancel(true);
}
}
}
