ForkJoinPool使用介绍 CompletableFuture使用介绍

2018-05-06 09:41:00
admin
原创 6232
摘要:ForkJoinPool使用介绍 CompletableFuture使用介绍

一、ForkJoinPool使用介绍

abstract class ForkJoinTask<V> implements Future<V>:

abstract class RecursiveTask<V> extends ForkJoinTask<V>:

abstract class RecursiveAction extends ForkJoinTask<Void>:

1、RecursiveTask任务需要返回结果,RecursiveAction任务不需要返回结果;

2、ForkJoinTask<V> fork(),fork位于FJ任务中,则任务被放入线程关联的队列,否则任务被放入虚拟机自带FJ线程池;

3、V join(),任务还在队列则直接执行,否则帮助窃取者执行任务,任务执行完成则不再帮助窃取者;

4、V join(),最后等待任务执行完成,可能抛出RuntimeException或Error;


class ForkJoinWorkerThread extends Thread:

1、protected void onStart(),线程开始回调函数;

2、protected void onTermination(Throwable exception),线程结束回调函数;


class ForkJoinPool extends AbstractExecutorService:

1、ForkJoinPool(),创建一个线程池,并发级别是CPU核数,异步模式为假;

2、ForkJoinPool commonPool(),返回虚拟机自带FJ线程池,并发级别是CPU核数减1,异步模式为假;

3、ExecutorService newWorkStealingPool(),创建ForkJoinPool,并发级别是CPU核数,异步模式为真;

4、ForkJoinTask<T> submit(ForkJoinTask<T> task),提交任务,并返回任务监视器;

5、void execute(ForkJoinTask<?> task),提交任务,但不返回任务监视器;

6、T invoke(ForkJoinTask<T> task),提交任务,然后等待执行结果;

7、int getPoolSize(),获取线程池存活的线程数量;

8、void shutdown(),已提交任务继续执行,不再接受新任务;

9、List<Runnable> shutdownNow(),已执行任务强制终止,返回emptyList;


ForkJoinPool使用详解:

1、ForkJoinPool主要用分治法来解决问题,少量线程处理大量任务,适合计算密集型任务,任务最好不要有阻塞行为;

2、ForkJoinPool包含一个任务队列数组,数组长度是大于等于并发级别的最小2的幂的两倍,其中最多64个外部队列;

3、外部队列用于提交外部任务,即是用于submit|excute|invoke提交任务,外部队列不绑定具体线程;

4、线程池包含一个全局锁RSLOCK,每个队列包含一个队列锁QLOCK,这样效率比较高;

5、提交外部任务时,通过线程PROBE找到外部队列,PROBE是线程的一个随机数;

6、外部队列没有被锁,线程插入任务,否则线程修改PROBE,找到其他外部队列插入任务;

7、外部任务插入之后,signalWork方法创建或唤醒线程执行任务,然后整个提交任务完成;

8、创建线程是后台线程,同时创建线程对应任务队列,队列在队列数组中下标是奇数,线程名包含的数字是下标除二;

9、线程每次先窃取一个任务执行,然后再执行关联队列任务,关联队列任务全部执行完成之后,再重复上面步骤执行;

10、线程池是异步模式,线程按FIFO方式执行关联队列任务,线程池非异步模式,线程按LIFO方式执行关联队列任务


abstract class CountedCompleter<T> extends ForkJoinTask<T>:

1、CountedCompleter<?> getRoot(),获取根任务;

2、CountedCompleter<?> getCompleter(),获取父任务;

3、void setPendingCount(int count),设置依赖任务数量;

4、void addToPendingCount(int delta),增加依赖任务数量;

5、void propagateCompletion(),向上查找任务链中第一个不为零pending减一,或者完成根任务;

6、void tryComplete(),功能类似propagateCompletion,但是任务pending为零时会调用onCompletion;

7、CountedCompleter<?> nextComplete(),父任务pending不为零则减一,父任务不存在则调用quietlyComplete

8、void quietlyCompleteRoot(),直接完成根任务,即找到根任务并调用quietlyComplete


CountedCompleter使用详解:

1、CountedCompleter用于支持其他依赖方式,ForkJoinTask只能支持父子依赖

2、CountedCompleter实现compute时不会调用join,各个任务之间是完全并行关系;

3、任务完成之后没有设置状态为NORMAL,调用join会一直阻塞线程,直到调用quietlyComplete;

4、propagateCompletion和tryComplete在所有任务完成之后会调用quietlyComplete;

5、任务每次要么能拆分成2个子任务,要么是叶子任务,假如拆分n次,那么总任务是2n+1,叶子任务是n+1;

6、每个任务生成一个子任务,所有任务完成根任务才能完成,addToPendingCount(1)+所有任务propagateCompletion实现;

7、每个任务生成两个子任务,所有任务完成根任务才能完成,addToPendingCount(2)+所有任务propagateCompletion实现;

8、每个任务生成两个子任务,叶子任务完成根任务就算完成,addToPendingCount(1)+叶子任务propagateCompletion实现;

9、每个任务生成两个子任务,叶子任务完成根任务就算完成,addToPendingCount(1)+叶子任务nextComplete实现;


二、CompletableFuture使用介绍

class CompletableFuture<T> implements Future<T>, CompletionStage<T>:

1、static CompletableFuture<U> completedFuture(U value),生成一个已经完成的任务监控;

2、static CompletableFuture<U> supplyAsync(Supplier<U> supplier),生成一个异步任务并进行监控,任务有返回值;

3、static CompletableFuture<Void> runAsync(Runnable runnable),生成一个异步任务并进行监控,任务无返回值;

4、static CompletableFuture<Object> anyOf(CompletableFuture...cfs),组合异步任务,一个异步任务成功即可;

5、static CompletableFuture<Void> allOf(CompletableFuture...cfs),组合异步任务,所有异步任务都要成功;

6、CompletableFuture<T> whenComplete(BiConsumer),设置异步任务结束回调,可以处理异步任务异常;

7、CompletableFuture<Void> thenAccept(Consumer),设置异步任务结束回调,异步任务成功时才会执行;

8、CompletableFuture<Void> thenRun(Runnable),设置异步任务结束回调,异步任务成功时才会执行;

9、CompletableFuture<U> thenApply(Function),设置异步任务结束回调,异步任务成功时才会执行;

10、CompletableFuture<T> exceptionally(Function),设置异步任务结束回调,异步任务异常时才会执行;

11、CompletableFuture<U> thenCompose(Function),连接两个异步任务,前一个异步任务成功时才会执行

12、CompletableFuture<V> thenCombine(other,BiFunction),组合两个异步任务,两个异步任务都成功时才会执行;

13、T get(),获取元素,会抛出非运行时异常;
14、T join(),获取元素,只会抛出运行时异常;

15、boolean complete(T value),完成任务;


使用详解:

1、CompletableFuture包含一个线程池静态变量,ForkJoinPool默认线程池并发度大于1,使用此默认线程池初始化;

2、ForkJoinPool默认线程池并发度等于1,则创建一个线程池初始化此静态变量,每个任务创建一个线程;

3、CompletableFuture自身被提交到包含的线程池,通过包含的线程池来执行;

4、添加同步回调过程中,如果任务执行完成,直接执行回调;
5、添加异步回调过程中,如果任务执行完成,回调被提交到线程池;

6、添加回调过程中,如果任务未执行完成,回调被放到CF包含的stack

7、任务执行完成后会调用postComplete,从包含的stack获取回调执行,后设置的回调先执行;

8、如果是同步回调,postComplete直接执行如果是异步回调,postComplete提交回调到线程池;

9、同步回调执行完成之后,同步回调的回调被放到当前CF包含的stack,同步回调的回调先设置先执行

10、如果回调执行异常,异常会被吞噬,不会打印任何异常信息,不影响后续回调执行,但影响子回调执行;

发表评论
评论通过审核之后才会显示。