首页 雷火电竞官网正文

羽毛球,剖析源码,学会正确运用 Java 线程池-雷火电竞登录

admin 雷火电竞官网 2019-11-06 285 0

在日常的开发作业傍边,线程池往往承载着一个运用中最重要的事务逻辑,因而咱们有必要更多地去重视线程池的履行状况,包含反常的处理和剖析等。本文首要聚集在怎么正确运用线程池上,以及供给一些有用的主张。文中会略微涉及到一些线程池完成原理方面的常识,可是不会过多打开。

线程池的反常处理

UncaughtExceptionHandler

咱们都知道Runnable接口中的run办法是不允许抛出反常的,因而派生出这个线程的主线程或许无法直接取得该线程在履行过程中的反常信息。如下例:

为什么会这样呢?其实咱们看一下Thread中的源码就会发现,Thread在履行过程中假设遇到了反常,会先判别当时线程是否有设置UncaughtExceptionHandler,假设没有,则会从线程地点的ThreadGroup中获取。

留意:每个线程都有自己的ThreadGroup,即便你没有指定,并且它完成了UncaughtExceptionHandler接口。

咱们看下ThreadGroup中默许的对UncaughtExceptionHandler接口的完成:

这个ThreadGroup假设有父ThreadGroup,则调用父ThreadGroup的uncaughtException,不然调用大局默许的Thread.DefaultUncaughtExceptionHandler,假设大局的handler也没有设置,则仅仅简略地将反常信息定位到System.err中,这便是为什么咱们应当在创立线程的时分,去完成它的UncaughtExceptionHandler接口的原因,这么做能够让你更方便地去排查问题。

经过execute提交使命给线程池

回到线程池这个论题,假设咱们向线程池提交的使命中,没有对反常进行try...catch处理,并且运转的时分呈现了反常,那会对线程池形成什么影响呢?答案是没有影响,线程池仍旧能够正常作业,可是反常却被吞掉了。这一般来说不是一个好工作,由于咱们需求拿到原始的反常方针去剖析问题。

那么怎样才能拿到原始的反常方针呢?咱们从线程池的源码着手开端研讨这个问题。当然网上关于线程池的源码解析文章有许多,这儿限于篇幅,直接给出最相关的部分代码:

这个办法便是真实去履行提交给线程池的使命的代码。

这儿咱们省略其间不相关的逻辑,要点重视第19行到第32行的逻辑,其间第23行是真实开端履行提交给线程池的使命,那么第20行是干什么的呢?其实便是在履行提交给线程池的使命之前能够做一些前置作业,相同的,咱们看到第31行,这个是在履行完提交的使命之后,能够做一些后置作业。

beforeExecute这个咱们暂时不论,要点重视下afterExecute这个办法。咱们能够看到,在履行使命过程中,一旦抛出任何类型的反常,都会提交给afterExecute这个办法,可是检查线程池的源代码咱们能够发现,默许的afterExecute是个空完成,因而,咱们有必要承继ThreadPoolExecutor去完成这个afterExecute办法。

看源码咱们能够发现这个afterExecute办法是protected类型的,从官方注释上也能够看到,这个办法便是引荐子类去完成的。

当然,这个办法不能随意去完成,需求遵从必定的过程,详细的官方注释也有讲,这儿摘录如下

那么经过这种办法,就能够将原先或许被线程池吞掉的反常成功捕获到,然后便于排查问题。

可是这儿还有个小问题,咱们留意到在runWorker办法中,履行task.run();句子之后,各种类型的反常都被抛出了,那这些被抛出的反常去了哪里?事实上这儿的反常方针终究会被传入到Thread的dispatchUncaughtException办法中,源码如下:

能够看到它会去获取UncaughtExceptionHandler的完成类,然后调用其间的uncaughtException办法,这也就回到了咱们上一末节所剖析的UncaughtExceptionHandler完成的详细逻辑。那么为了拿到最原始的反常方针,除了完成UncaughtExceptionHandler接口之外,也能够考虑完成afterExecute办法。

经过submit提交使命到线程池

这个相同很简略,咱们仍是先回到submit办法的源码:

这儿的execute办法调用的是ThreadPoolExecutor中的execute办法,履行逻辑跟经过execute提交使命到线程池是相同的。咱们先要点重视这儿的newTaskFor办法,其源码如下:

能够看到提交的Callable方针用FutureTask封装起来了。咱们知道终究会履行到上述runWorker这个办法中,并且最中心的履行逻辑便是task.run();这行代码。咱们知道这儿的task其实是FutureTask类型,因而咱们有必要看一下FutureTask中的run办法的完成:

能够看到这其间跟反常相关的最要害的代码就在第17行,也便是setException(ex);这个当地。咱们看一下这个当地的完成:

这儿最要害的当地便是将反常方针赋值给了outcome,outcome是FutureTask中的成员变量,咱们经过调用submit办法,拿到一个Future方针之后,再调用它的get办法,其间最中心的办法便是report办法,下面给出每个办法的源码:

首要是get办法:

能够看到终究调用了report办法,其源码如下:

上面是一些状况判别,假设当时使命不是正常履行结束,或许被撤销的话,那么这儿的x其实便是原始的反常方针,能够看到会被ExecutionException包装。因而在你调用get办法时,或许会抛出ExecutionException反常,那么调用它的getCause办法就能够拿到最原始的反常方针了。

综上所述,针对提交给线程池的使命或许会抛出反常这一问题,首要有以下两种处理思路:

  1. 在提交的使命傍边自行try...catch,但这儿有个欠好的当地便是假设你会提交多种类型的使命到线程池中,每种类型的使命都需求自即将反常try...catch住,比较繁琐。并且假设你仅仅catch(Exception e),或许仍然会漏掉一些包含Error类型的反常,那为了稳妥起见,能够考虑catch(Throwable t)。
  2. 自行完成线程池的afterExecute办法,或许完成Thread的UncaughtExceptionHandler接口。

下面给出我个人创立线程池的一个示例,供咱们参阅:

BlockingQueue queue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE);
statisticsThreadPool = new ThreadPoolExecutor(DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE,
60, TimeUnit.SECONDS, queue, new ThreadFactoryBuilder()
.setThreadFactory(new ThreadFactory() {
private int count = 0;
private String prefix = "StatisticsTask";
@Override
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + count++);
}
}).setUncaughtExceptionHandler((t, e) -> {
String threadName = t.getName();
logger.error("statisticsThreadPool error occurred! threadName: {}, error msg: {}", threadName, e.getMessage(), e);
}).build(), (r, executor) -> {
if (!executor.isShutdown()) {
logger.warn("statisticsThreadPool is too busy! waiting to insert task to queue! ");
Uninterruptibles.putUninterruptibly(executor.getQueue(), r);
}
}) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future
try {
Future
future.get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null) {
logger.error("statisticsThreadPool error msg: {}", t.getMessage(), t);
}
}
};
statisticsThreadPool.prestartAllCoreThreads();

线程数的设置

咱们知道使命一般有两种:CPU密集型和IO密集型。那么面临CPU密集型的使命,线程数不宜过多,一般挑选CPU中心数+1或许中心数的2倍是比较合理的一个值。因而咱们能够考虑将corePoolSize设置为CPU中心数+1,maxPoolSize设置为中心数的2倍。

相同的,面临IO密集型使命时,咱们能够考虑以中心数乘以4倍作为中心线程数,然后中心数乘以5倍作为最大线程数的办法去设置线程数,这样的设置会比直接拍脑袋设置一个值会更合理一些。

当然总的线程数不宜过多,控制在100个线程以内比较合理,不然线程数过多或许会导致频频地上下文切换,导致体系功能反不如前。

怎么正确封闭一个线程池

说到怎么正确去封闭一个线程池,这儿面也有点考究。为了完成高雅停机的方针,咱们应当先调用shutdown办法,调用这个办法也就意味着,这个线程池不会再接纳任何新的使命,可是现已提交的使命还会持续履行,包含行列中的。所以,之后你还应当调用awaitTermination办法,这个办法能够设定线程池在封闭之前的最大超时时刻,假设在超时时刻结束之前线程池能够正常封闭,这个办法会回来true,不然,一旦超时,就会回来false。一般来说咱们不或许无限制地等候下去,因而需求咱们事前预估一个合理的超时时刻,然后去运用这个办法。

假设awaitTermination办法回来false,你又期望尽或许在线程池封闭之后再做其他资源收回作业,能够考虑再调用一下shutdownNow办法,此刻行列中一切尚未被处理的使命都会被丢掉,一起会设置线程池中每个线程的中止标志位。shutdownNow并不确保必定能够让正在运转的线程停止作业,除非提交给线程的使命能够正确呼应中止。到了这一步,能够考虑持续调用awaitTermination办法,或许直接抛弃,去做接下来要做的工作。

线程池中的其他有用办法

咱们或许有留意到,我在创立线程池的时分,还调用了这个办法:prestartAllCoreThreads。这个办法有什么效果呢?咱们知道一个线程池创立出来之后,在没有给它提交任何使命之前,这个线程池中的线程数为0。有时分咱们事前知道会有许多使命会提交给这个线程池,可是等它一个个去创立新线程开支太大,影响体系功能,因而能够考虑在创立线程池的时分就将一切的中心线程悉数一次性创立结束,这样体系起来之后就能够直接运用了。

其实线程池中还供给了其他一些比较有意思的办法。比方咱们现在想象一个场景,当一个线程池负载很高,快要撑爆导致触发回绝战略时,有没有什么办法能够缓解这一问题?其实是有的,由于线程池供给了设置中心线程数和最大线程数的办法,它们分别是setCorePoolSize办法setMaximumPoolSize办法。是的,线程池创立结束之后也是能够更改其线程数的!因而,面临线程池高负荷运转的状况,咱们能够这么处理:

  1. 起一个守时轮询线程(看护类型),守时检测线程池中的线程数,详细来说便是调用getActiveCount办法。
  2. 当发现线程数超过了中心线程数巨细时,能够考虑将CorePoolSize和MaximumPoolSize的数值一起乘以2,当然这儿不主张设置很大的线程数,由于并不是线程越多越好的,能够考虑设置一个上限值,比方50、100之类的。
  3. 一起,去获取行列中的使命数,详细来说是调用getQueue办法再调用size办法。当行列中的使命数少于行列巨细的二分之一时,咱们能够以为现在线程池的负载没有那么高了,因而能够考虑在线程池从前有扩容过的状况下,将CorePoolSize和MaximumPoolSize复原回去,也便是除以2。

详细来说如下图:

以上是我个人主张的一种运用线程池的办法。

线程池必定是最佳计划吗?

线程池并非在任何状况下都是功能最优的计划。假设是一个寻求极致功能的场景,能够考虑运用Disruptor,这是一个高功能行列。扫除Disruptor不谈,单纯根据JDK的话会不会有更好的计划?答案是有的。

咱们知道在一个线程池中,多个线程是共用一个行列的,因而在使命许多的状况下,需求对这个行列进行频频读写,为了避免抵触因而需求加锁。事实上在阅览线程池源代码的时分就能够发现,里边充满着各种加锁的代码,那有没有更好的完成办法呢?

其实咱们能够考虑创立一个由单线程线程池构成的列表,每个线程池都运用有界行列这种办法去完成多线程。这么做的优点是,每个线程池中的行列都只会被一个线程去操作,这样就没有竞赛的问题。

其实这种用空间换时刻的思路学习了Netty中EventLoop的完成机制。试想,假设线程池的功能真的有那么好,为什么Netty不必呢?

其他需求留意的当地

  1. 任何状况下都不该该运用可弹性线程池(线程的创立和毁掉开支是很大的)。
  2. 任何状况下都不该该运用无界行列,单测在外。有界行列常用的有ArrayBlockingQueue和LinkedBlockingQueue,前者根据数组完成,后者根据链表。从功能体现上来看,LinkedBlockingQueue的吞吐量更高可是功能并不安稳,实践状况下应当运用哪一种主张自行测验之后决议。趁便说一句,Executors的newFixedThreadPool选用的是LinkedBlockingQueue。
  3. 引荐自行完成RejectedExecutionHandler,JDK自带的都不是很好用,你能够在里边完成自己的逻辑。假设需求一些特定的上下文信息,能够在Runnable完成类中添加一些自己的东西,这样在RejectedExecutionHandler中就能够直接运用了。

怎样做到不丢使命

这儿其实指的是一种特殊状况,便是比方忽然遇到了一股流量尖峰,导致线程池负载现已十分高了,即快要触发回绝战略的时分,咱们能够怎么做来尽量避免提交的使命丢掉。一般来说当遇到这种状况的时分,应当赶快触发报警告诉研制人员来处理。之后不论是限流也好,仍是添加机器也好,乃至是上Kafka、Redis乃至是数据库用来暂存使命数据也是能够的,但毕竟远水救不了近火,假设咱们期望在正式处理这个问题之前,先尽或许地缓解,能够考虑怎么做呢?

首要能够考虑的便是我前面说到的动态增大线程池中的线程数,可是假设现已扩容过了,此刻不该持续扩容,不然或许导致体系的吞吐量更低。在这种状况下,应当自行完成RejectedExecutionHandler,详细来说便是在完成类中,独自开一个单线程的线程池,然后调用原线程池的getQueue办法的put办法,将塞不进去的使命再次测验塞进去。当然在行列满的时分是塞不进去的,但那至少也仅仅堵塞了这个独自的线程罢了,并不影响主流程。

当然,这种计划是治标不治本的,面临流量激增这种场景其实业界有许多老练的做法,仅仅单纯从线程池的视点来看的话,这种办法不失为一种暂时有用的处理计划。


重视我私信回复【材料】即可获取包含但不限于:分布式架构、高可扩展、高功能、高并发、Jvm功能调优、Spring,MyBatis,Nginx源码剖析,Redis,ActiveMQ、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多个常识点高档进阶干货

雷火电竞版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。

最近发表

    雷火电竞登录_雷火电竞安卓app_雷火苹果app

    http://www.tujidotimes.com/

    |

    Powered By

    使用手机软件扫描微信二维码

    关注我们可获取更多热点资讯

    雷火电竞出品