文章目录
- 并发基础中的Future异步回调模式
- 背景
- 泡茶案例
- join异步阻塞
- 使用join实现异步泡茶喝的实践案例
- join合并方法
- FutureTask异步回调
- Callable接口
- Callable代码
- FutureTask类
- Future接口
- 使用FutureTask类实现异步泡茶喝的实践案例
- 结论
- Guava的异步回调
- FutureCallback介绍
- 使用Guava实现泡茶喝的实践案例
- Netty 的异步回调模式
并发基础中的Future异步回调模式
背景
随着业务模块系统越来越多,各个系统的业务架构变得越来越复杂,跨设别的接口调用越来越复杂,因此,面临一个问题: 如何高效率的异步去调用这些接口,然后同步去处理接口的返回结果? 涉及线程的异步回调问题,这也是高并发的一个基础问题
泡茶案例
join异步阻塞
使用join实现异步泡茶喝的实践案例
package com.wangyg.FutureTask.Join;
import util.Logger;
/**
* 程序中有三个线程:
* 主线程 main
* 烧水线程hThread
* 清洗线程 wThread
*
*/
public class JoinDemo {
public static final int SLEEP_GAP=500;
/**
* 获取当前线程的名字
* @return
*/
public static String getCurThreadName(){
return Thread.currentThread().getName();
}
/**
* 烧水线程
*/
static class HotWaterThread extends Thread{
//烧水线程
public HotWaterThread() {
super("** 烧水-Thread");
}
public void run(){
try {
Logger.info("洗好水壶");
Logger.info("灌上凉水");
Logger.info("放在火上");
//线程睡眠一段时间,表示烧水中
Thread.sleep(SLEEP_GAP);
Logger.info("水开了");
} catch (InterruptedException e) {
e.printStackTrace();
Logger.info("发生异常被中断");
}
Logger.info("运行结束");
}
}
static class WashThread extends Thread{
public WashThread() {
super("$$ 清洗-Thread");
}
public void run(){
try {
Logger.info("洗茶壶");
Logger.info("洗茶杯");
Logger.info("拿茶叶");
//线程睡眠一段时间,代表清洗中
Thread.sleep(SLEEP_GAP);
Logger.info("洗完了");
} catch (InterruptedException e) {
Logger.info("发生异常 被中断");
}
Logger.info("运行结束.");
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
Thread hThread= new HotWaterThread(); //烧水线程
Thread wThread = new WashThread(); //洗水杯线程
hThread.start();
wThread.start();
try {
// 合并烧水-线程
hThread.join(); //使用join进行加塞线程
// 合并清洗-线程
wThread.join(); //使用join进行加塞线程
Thread.currentThread().setName("主线程");
Logger.info("泡茶喝");
} catch (InterruptedException e) {
Logger.info(getCurThreadName() + "发生异常被中断.");
}
Logger.info(getCurThreadName() + " 运行结束.");
long end = System.currentTimeMillis() - start;
System.out.println(end);
}
}
join合并方法
应用场景
A线程调用B线程的join方法,等待B线程执行完成,A线程阻塞
- join是实例方法,不是静态方法,需要使用线程对象调用,thread.join()
- join 调用时,当前线程阻塞
FutureTask异步回调
Java1.5版本后,提供一个新的多线程创建方式–FutureTask方式
Callable接口
Callable代码
public interface Callable<V>{
//call方法有返回值
V call() throws Exception;
}
- 有返回值
- 可以抛出异常
- 泛型接口
FutureTask类
FutureTask类代表一个未来执行的任务,表示新线程所执行的操作,FutureTask类位于 java.util.concurrent包中
FutureTask构造函数参数
Callable类型, 实际上是对Callable类型的二次封装
Future接口
Future接口并不复杂,主要是对并发任务的执行及获取结果的一些操作,主要提供三个功能:
- 判断并发任务是否执行完成
- 获取并发的任务的结果
- 取消并发执行中的任务
使用FutureTask类实现异步泡茶喝的实践案例
package com.wangyg.netty.ch05;
import util.Logger;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class JavaFutureDemo {
public static final int SLEEP_GAP=500;
public static String getCurrentThreadName(){
return Thread.currentThread().getName();
}
static class HotWaterJob implements Callable<Boolean>{
//重写call方法
@Override
public Boolean call() throws Exception {
try {
Logger.info("洗好水壶");
Logger.info("灌上凉水");
Logger.info("放在火上");
Thread.sleep(SLEEP_GAP);
Logger.info("水开了");
} catch (Exception e) {
Logger.info("发生异常被中断。");
return false;
}
Logger.info("运行结束.");
return true;
}
}
static class WashJob implements Callable<Boolean>{
//重写call()方法
@Override
public Boolean call() throws Exception {
try {
Logger.info("洗茶壶");
Logger.info("洗茶杯");
Logger.info("拿茶叶");
Thread.sleep(SLEEP_GAP);
Logger.info("洗完了");
} catch (Exception e) {
Logger.info("清洗工作发生中断异常.");
return false;
}
Logger.info("清洗工作运行结束。");
return true;
}
}
public static void drinkTea(Boolean waterOk, boolean cupOk) {//两个参数:是否烧水成功,是否洗杯子成功
if(waterOk && cupOk){
Logger.info("泡茶喝");
}else if(!waterOk){
Logger.info("烧水失败, 没有茶喝了");
}else if(!cupOk){
Logger.info("杯子洗不了,没有茶喝了");
}
}
//main函数
public static void main(String[] args) {
long start = System.currentTimeMillis();
Callable<Boolean> hJob = new HotWaterJob();
//传入callable接口
FutureTask<Boolean> hTask = new FutureTask<>(hJob);
//在使用Thread 传入 runnable线程接口
Thread hThread = new Thread(hTask, "** 烧水-Thread");
Callable<Boolean> wJob = new WashJob();
//使用futureTask包装
FutureTask<Boolean> wTask = new FutureTask<>(wJob);
Thread wThread = new Thread(wTask, "$$ 清洗-Thread");
hThread.start();
wThread.start();
Thread.currentThread().setName("主线程");
try {
boolean hOk = hTask.get(); //获取异步线程执行结果
boolean wOk = wTask.get();//获取清洗线程异步执行的结果
} catch (Exception e) {
e.printStackTrace();
Logger.info(getCurrentThreadName()+"发生异常被中断.");
}
Logger.info(getCurrentThreadName() + " 运行结束.");
long end = System.currentTimeMillis();
System.out.println(end-start);
}
}
结论
目前这两种方式效率是差不多的,因为FutureTask类的get方法,获取异步结果时,主线程也会阻塞的,前两种方式(join,FutureTask) 都是异步阻塞模式
Guava的异步回调
谷歌公司提供的Java扩展包,提供了一种异步回调的解决方案,相关源码在com.google.common.util.concurrent包中
Guava对java的异步回调机制,做了一下增强:
引入一个新的接口ListennableFuture,继承Java的future接口,是的java的Future异步任务,在Guava中能被监控和获取非阻塞异步执行的结果
引入一个新的接口FutureCallback,这是一个独立的新接口,该接口的目的,是在异步任务执行完成后,根据异步结果,完成不同的回调任务,并且可以处理异步结果
FutureCallback介绍
FutureCallback有用两个回调方法
- onSuccess方法,在异步执行成功后被回调
- onFailure方法,异步任务执行过程中,抛出异常被执行
使用Guava实现泡茶喝的实践案例
package com.wangyg.netty.ch05;
import com.google.common.util.concurrent.*;
import util.Logger;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by 尼恩 at 疯狂创客圈
*/
public class GuavaFutureDemo {
public static final int SLEEP_GAP = 500;
public static String getCurThreadName() {
return Thread.currentThread().getName();
}
static class HotWarterJob implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
try {
Logger.info("洗好水壶");
Logger.info("灌上凉水");
Logger.info("放在火上");
//线程睡眠一段时间,代表烧水中
Thread.sleep(SLEEP_GAP);
Logger.info("水开了");
} catch (InterruptedException e) {
Logger.info(" 发生异常被中断.");
return false;
}
Logger.info(" 烧水工作,运行结束.");
return true;
}
}
static class WashJob implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
try {
Logger.info("洗茶壶");
Logger.info("洗茶杯");
Logger.info("拿茶叶");
//线程睡眠一段时间,代表清洗中
Thread.sleep(SLEEP_GAP);
Logger.info("洗完了");
} catch (InterruptedException e) {
Logger.info(" 清洗工作 发生异常被中断.");
return false;
}
Logger.info(" 清洗工作 运行结束.");
return true;
}
}
//泡茶线程
static class MainJob implements Runnable {
boolean warterOk = false;
boolean cupOk = false;
int gap = SLEEP_GAP / 10;
@Override
public void run() {
boolean flg =true;
while (flg) {
try {
Thread.sleep(gap);
Logger.info("读书中......");
} catch (InterruptedException e) {
Logger.info(getCurThreadName() + "发生异常被中断.");
}
if (warterOk && cupOk) {
drinkTea(warterOk, cupOk);
flg=false;
}
}
}
public void drinkTea(Boolean wOk, Boolean cOK) {
if (wOk && cOK) {
Logger.info("泡茶喝,茶喝完");
this.warterOk = false;
this.gap = SLEEP_GAP * 100;
} else if (!wOk) {
Logger.info("烧水失败,没有茶喝了");
} else if (!cOK) {
Logger.info("杯子洗不了,没有茶喝了");
}
}
}
public static void main(String args[]) {
//新起一个线程,作为泡茶主线程
MainJob mainJob = new MainJob();
Thread mainThread = new Thread(mainJob);
mainThread.setName("主线程");
mainThread.start();
//烧水的业务逻辑
Callable<Boolean> hotJob = new HotWarterJob();
//清洗的业务逻辑
Callable<Boolean> washJob = new WashJob();
//创建java 线程池
ExecutorService jPool =
Executors.newFixedThreadPool(10);
//包装java线程池,构造guava 线程池
ListeningExecutorService gPool =
MoreExecutors.listeningDecorator(jPool);
//提交烧水的业务逻辑,取到异步任务
ListenableFuture<Boolean> hotFuture = gPool.submit(hotJob);
//绑定任务执行完成后的回调,到异步任务
Futures.addCallback(hotFuture, new FutureCallback<Boolean>() {
public void onSuccess(Boolean r) {
if (r) {
mainJob.warterOk = true;
}
}
public void onFailure(Throwable t) {
Logger.info("烧水失败,没有茶喝了");
}
});
//提交清洗的业务逻辑,取到异步任务
ListenableFuture<Boolean> washFuture = gPool.submit(washJob);
//绑定任务执行完成后的回调,到异步任务
Futures.addCallback(washFuture, new FutureCallback<Boolean>() {
public void onSuccess(Boolean r) {
if (r) {
mainJob.cupOk = true;
}
}
public void onFailure(Throwable t) {
Logger.info("杯子洗不了,没有茶喝了");
}
});
}
}
Netty 的异步回调模式
Netty和Guava一样,实现了自己的异步回调体系: Netty继承和扩展了JDK Future系列异步回调 API,定义了自身的Future系列接口和类,实现了异步任务的监控,异步执行结果的获取
总体来说,Netty对javaFuture异步任务的扩展如下:
- 继承了java的Future接口,得到一个新的属于Netty自己的Future异步任务接口,该接口对原有的接口进行了增强,是的netty异步任务能够以非阻塞的方式处理回调的结果
- 引入一个新接口–GennericFutureListener, 用于表示异步执行完成的监听器,这个接口和Guava的FutureCallback回调接口不同,netty使用了监听器的模式,异步任务的执行完成后的回调逻辑抽象成了Listener监听器接口