Future和Callable接口
Future接口**(FutureTask实现类)定义了操作异步任务执行一些方法**,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。(异步:可以被叫停,可以被取消)
一句话:Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。
eg.比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会才去获取子任务的执行结果。老师在上课,但是口渴,于是让班长这个线程去买水,自己可以继续上课,实现了异步任务。
有个目的:异步多线程任务执行且有返回结果,三个特点:多线程/有返回/异步任务(班长作为老师去买水作为新启动的异步多线程任务且买到水有结果返回)
FutureTask实现类
- FutureTak(实现了x接口,x接口又继承了a和v接口)
- 在源码可以看到,他既继承了
RunnableFuture
接口,也在构造方法中实现了Callable
接口(有返回值、可抛出异常)和Runnable
接口
- 在源码可以看到,他既继承了
(ctrl
+alt
+u
)
1 | public class FutureTaskDemo { |
Future到CompletableFuture
Future优点
- future+线程池异步多线程任务配合,能显著提高程序的执行效率。
- 方案一,3个任务1个main线程处理,大概1130ms
方案二,3个任务3个线程,利用线程池(假如每次new一个Thread,太浪费资源,会有GC这些工作),大概500毫秒。
Future缺点
get()阻塞
- 一旦调用get()方法,不管是否计算完成,都会导致阻塞(所以一般get方法放到最后)
isDone()轮询
- 利用if(futureTask.isDone())的方式使得他在结束之后才get(),但是也会消耗cpu
Future应用现状
对于简单的业务场景使用Future完全OK
回调通知
- 前面的isDone()方法耗费cpu资源,一般应该还是利用回调函数,在Future结束时自动调用该回调函数。应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知
创建异步任务
- Future+线程池配合
多个任务前后依赖可以组合处理(水煮鱼)
想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值,将两个或多个异步计算合成一个异步计算,这几个异步计算相互独立,同时后面这个又依赖前一个处理的结果
比如买鱼-加料-烹饪
对计算速度选最快完成的(并返回结果)
- 当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果。
CompletableFuture基本介绍
- 阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无畏的CPU资源。因此,JDK8设计出CompletableFuture
CompletableFuture
1 | public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { |
在Java 8中, Complet able Future提供了非常强大的Future的扩展功能, 可以帮助我们简化异步编程的复杂性, 并且提供了函数式编程的能力, 可以通过回调的方式处理计算结果, 也提供了转换和组合Complet able Future的方法。
它可能代表一个明确完成的Future, 也有可能代表一个完成阶段(Completion Stage) , 它支持在计算完成以后触发一些函数或执行某些动作。
它实现了Future和Completion Stage接口
CompletionStage
Completion Stage代表异步计算过程中的某一个阶段, 一个阶段完成以后可能会触发另外一个阶段
一个阶段的计算执行可以是一个Function, Consumer或者Runnable
比如:
stage.then Apply(x->square(x) ) .then Accept(x->System.out.print(x) ) .then Run() ->System.out.print In() )
一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。
核心的四个静态方法(分为两组)
利用核心的四个静态方法创建一个异步操作 | 不建议用new
关键就是 |有没有返回值|是否用了线程池|
参数说明:
没有指定Executor的方法,直接使用默认的ForkJoinPool.commPool()作为它的线程池执行异步代码。
如果指定线程池,则使用我们定义的或者特别指定的线程池执行异步代码。
runAsync无返回值
runAsync
1 | public static CompletableFuture<Void> runAsync(Runnable runnable) |
1 | public class CompletableFutureBuildDemo { |
runAsync+线程池
1 | public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) |
1 | public class CompletableFutureBuildDemo { |
supplyAsync有返回值
supplyAsync
1 | public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) |
1 | public class CompletableFutureBuildDemo { |
supplyAsync+线程池
1 | public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) |
1 | public class CompletableFutureBuildDemo { |
CompletableFuture使用演示(日常使用)
基本功能
CompletableFuture
可以完成Future
的功能
1 | public class CompletableFutureUseDemo { |
减少阻塞和轮询whenComplete
CompletableFuture
通过whenComplete
来减少阻塞和轮询(自动回调)
1 | public class CompletableFutureUseDemo { |
- 假如换用自定义线程池
1 | public class CompletableFutureUseDemo { |
- 异常情况的展示,设置一个异常
int i = 10 / 0 ;
1 | public class CompletableFutureUseDemo { |
CompletableFuture优点总结
- 异步任务结束时,会自动回调某个对象的方法;
- 主线程设置好毁掉后,不再关心异步任务的执行,异步任务之间可以顺序执行
- 异步任务出错时,会自动回调某个对象的方法。
CompletableFuture案例
准备
函数式接口
- 函数式接口的定义:
- 任何接口,如果只包含唯一一个抽象方法,那么它就是一个函数式接口。对于函数式接口,我们可以通过lambda表达式来创建该接口的对象。
1 | public interface Runnable{ |
常见的函数式接口
Runnable
1
2
3
4
public interface Runnable {
public abstract void run();
}Function
1
2
3
4
public interface Function<T, R> {
R apply(T t);
}Consumer
1
2
3
4
public interface Consumer<T> {
void accept(T t);
}Supplier
1
2
3
4
5
6
7
8
9
10
public interface Supplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get();
}Biconsumer(Bi代表两个的意思,我们要传入两个参数,在上面的案例中是v和e)
1
2
3
4
5
public interface BiConsumer<T, U> {
void accept(T t, U u);
}
函数式接口名称 方法名称 参数 返回值 Runnable run 无参数 无返回值 Function apply 1个参数 有返回值 Consume accept 1个参数 无返回值 Supplier get 没有参数 有返回值 Biconsumer accept 2个参数 无返回值
链式调用|链式编程|链式写法
1 | public class Chain { |
join和get对比
- 功能几乎一样,区别在于编码时是否需要抛出异常
- get()方法需要抛出异常
- join()方法不需要抛出异常
1 | public class Chain { |
实战精讲-比价网站case
需求
- 需求说明
- 同一款产品,同时搜索出同款产品在各大电商平台的售价;
- 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
- 输出返回:
出来结果希望是同款产品的在不同地方的价格清单列表, 返回一个List
《mysql》in jd price is 88.05
《mysql》in dang dang price is 86.11
《mysql》in tao bao price is 90.43 - 解决方案,比对同一个商品在各个平台上的价格,要求获得一个清单列表
- stepbystep , 按部就班, 查完京东查淘宝, 查完淘宝查天猫……
- all in ,万箭齐发,一口气多线程异步任务同时查询。。。
基本框架搭建
- 相当于是一个一个按部就班
1 | public class Case { |
从功能到性能:利用CompletableFuture
- 这里是利用异步线程
- 此处用了两步流式编程。
- 性能差距巨大
1 | public class Case { |
CompletableFuture常用API
- getNow调用的时候如果计算完了,就拿取这个计算完的值;否则就拿备胎值
获得结果和触发计算
获取结果
public T get() 不见不散,容易阻塞
public T get(long timeout,TimeUnit unit) 过时不候,超过时间会爆异常
public T join() 类似于get(),区别在于是否需要抛出异常
public T getNow(T valueIfAbsent)
没有计算完成的情况下,给一个替代结果
立即获取结果不阻塞
计算完,返回计算完成后的结果
没算完,返回设定的valueAbsent(直接返回了备胎值xxx)
主动触发计算
public boolean complete(T value) 是否立即打断get()方法返回括号值
- (执行要2s,等待只有1s,所以还没执行完就被打断了。返回true表示打断了获取这个过程,直接返回了备胎值complete;如果没打断,返回false 和原来的abc)
1 | public class CompletableFutureAPIDemo { |
对计算结果进行处理
thenApply
计算结果存在在依赖关系,使得线程串行化。因为依赖关系,所以一旦有异常,直接叫停。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39public class CompletableFutureDemo2
{
public static void main(String[] args) throws ExecutionException, InterruptedException
{
//当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("111");
return 1024;
}).thenApply(f -> {
System.out.println("222");
return f + 1;
}).thenApply(f -> {
//int age = 10/0; // 异常情况:那步出错就停在那步。
System.out.println("333");
return f + 1;
}).whenCompleteAsync((v,e) -> {
System.out.println("*****v: "+v);
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
System.out.println("-----主线程结束,END");
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
//-----正常情况
//111
//222
//333
//----计算结果: 6
//-----异常情况
//111
//异常.....handle
类似于thenApply,但是有异常的话仍然可以往下走一步。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public class CompletableFutureDemo2
{
public static void main(String[] args) throws ExecutionException, InterruptedException
{
//当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
// 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("111");
return 1024;
}).handle((f,e) -> {
int age = 10/0;//异常语句
System.out.println("222");
return f + 1;
}).handle((f,e) -> {
System.out.println("333");
return f + 1;
}).whenCompleteAsync((v,e) -> {
System.out.println("*****v: "+v);
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
System.out.println("-----主线程结束,END");
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
//-----异常情况
//111
//333
//异常,可以看到多走了一步333一般用thenApply
对计算结果进行消费
接收任务的处理结果,并消费处理,无返回结果|消费型函数式接口,之前的是Function
thenAccept
1
2
3
4
5
6
7
8
9
10
11
12
13
14public static void main(String[] args) throws ExecutionException, InterruptedException
{
CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply(f -> {
return f + 2;
}).thenApply(f -> {
return f + 3;
}).thenApply(f -> {
return f + 4;
}).thenAccept(r -> System.out.println(r));
}
//6
//消费一下,直接得到6补充:Code之任务之间的顺序执行
thenRun
thenRun(Runnable runnable)
任务A执行完执行B,并且B不需要A的结果
thenAccept
thenAccept(Consumer action)
任务A执行完执行B,B需要A的结果,但是任务B无返回值
thenApply
thenApply(Function fn)
任务A执行完执行B,B需要A的结果,同时任务B有返回值
1
2
3
4
5
6
7
8
9
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
//null
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());
//resultA打印出来的 null因为没有返回值
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
//resultAresultB 返回值
CompleteFuture和线程池说明(非常重要)
上面的几个方法都有普通版本和后面加Async的版本
以thenRun和thenRunAsync为例,有什么区别?
先看结论
没有传入自定义线程池,都用默认线程池ForkJoinPool
传入了一个自定义线程池如果你执行第一个任务的时候,传入了一个自定义线程池
调用thenRun方法执行第二个任务的时候,则第二个任务和第一个任务是用同一个线程池
调用thenRunAsync执行第二个任务的时候,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
也有可能处理太快,系统优化切换原则,直接使用main线程处理(把sleep去掉)
1 | //2-1 |
1 | //2-2 |
1 | public class CompletableFutureAPIDemo { |
源码
1
2
3
4
5
6
7
8//CompletableFuture.java 2009行
public CompletableFuture<Void> thenRun(Runnable action) {//传入值是一样的
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);//但是这里有个异步的线程池asyncPool
}1
2
3
4
5
6
7
8
9
10//进入asyncPool
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);//一般大于1都是成立的
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();//所以这里会调用forkJoin线程池
对计算速度进行选用
applyToEither
方法,快的那个掌权
1 | public class CompletableFutureDemo2 { |
对计算结果进行合并
thenCombine
合并- 两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理
- 先完成的先等着,等待其它分支任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class CompletableFutureDemo2
{
public static void main(String[] args) throws ExecutionException, InterruptedException
{
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
return 10;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
return 20;
});
CompletableFuture<Integer> thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
return x + y;
});
System.out.println(thenCombineResult.get());
}
}
//30合并版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public class CompletableFutureDemo2
{
public static void main(String[] args) throws ExecutionException, InterruptedException
{
CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");
return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");
return 20;
}), (x,y) -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");
return x + y;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");
return 30;
}),(a,b) -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");
return a + b;
});
System.out.println("-----主线程结束,END");
System.out.println(thenCombineResult.get());
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
}
}