Python微信订餐小程序课程视频
https://edu.csdn.net/course/detail/36074
Python实战量化交易理财系统
https://edu.csdn.net/course/detail/35475
线程池的使用
本章将介绍对线程池进行配置与调优的一些高级选项, 并分析在使用任务执行框架时需要注意的各种危险, 以及一些使用 Executor的高级示例。
一、任务与执行策略之间的隐形耦合
1.1 隐形耦合关系
我们已经知道,Executor框架可以将任务的提交与任务的执行策略解耦开来。这为制定和修改执行策略都提供了相当大的灵活性,但并非所有的任务都能适用所有的执行策略。有些类型的任务需要明确地指定执行策略,包括:
a.依赖性任务。
大多数行为正确的任务都是独立的: 它们不依赖于其他任务的执行时序、 执行结果或其他效果。 当在线程池中执行独立的任务时, 可以随意地改变线程池的大小和配置, 这些修改只会对执行性能产生影响。 然而,如果提交给线程池的任务需要依赖其他的任务, 那么就隐含地给执行策略带来了约束, 此时必须小心地维持这些执行策略以避免产生活跃性问题。
b.使用线程封闭机制的任务。
与线程池相比, 单线程的Executor能够对并发性做出更强的承诺。 它们能确保任务不会并发地执行, 使你能够放宽代码对线程安全的要求。 对象可以封闭在 任务线程中,使得在该线程中执行的任务在访问该对象时不需要同步, 即使这些资源不是线程 安全的也没有问题。 这种情形将在任务与执行策略之间形成隐式的耦合---任务要求其执行所在的Executor是单线程的e。如果将Executor从单线程环境改为线程池环境, 那么将会失去线程安全性。
c.对响应时间敏感的任务。
如果将一个运行时间较长的任务提交到单线程的Executor中,或者将多个运行时间较长的任务提交到一个只包含少量线程的线程池中,那么将降低由该Executor管理的服务的响应性。
d.使用ThreadLocal的任务
ThreadLocal使每个线程都可以拥有某个变量的一个私有“版本“。然而,只要条件允许,Executor可以自由地重用这些线程。在标准的Executor实现中,当执行需求较低时将回收空闲线程,而当需求增加时将添加新的线程,并且如果从任务中抛出了一个未检查异常,那么将用一个新的工作者线程来替代抛出异常的线程。只有当线程本地值 的生命周期受限于任务的生命周期时,在线程池的线程中使用ThreadLocal才有意义,而在线 程池的线程中不应该使用 ThreadLocal在任务之间传递值。
只有当任务都是同类型的并且相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则将可能造成 “拥塞 ”。如果提交的任务依赖于其他任务,那么除非线程池无限大,否则将可能造成死锁。
1.2 线程饥饿死锁
在线程池中,如果任务依赖于其他任务,那么可能产生死锁。在单线程的Executor中,如 果一个任务将另一个任务提交到同一个Executor,并且等待这个被提交任务的结果,那么通常会引发死锁。第二个任务停留在工作队列中,并等待第一个任务完成,而第一个任务又无法完 成,因为它在等待第二个任务的完成。
在更大的线程池中, 如果所有正在执行任务的线程都由于等待其他仍处在工作队列中的任务而阻塞,那么会发生同样的问题。这种现象被称为饥饿死锁(Thread Starvation Deadlock)。
1.3 运行时间较长的任务
如果任务阻塞的时间过长, 那么即使不出现死锁, 线程池的响应性也会变得糟糕。执行时 间较长的任务不仅会造成线程池堵塞, 甚至还会增加执行时间较短任务的服务时间。如果线程 池中线程的数量远小于在稳定状态下执行时间较长任务的数量, 那么到最后可能所有的线程都会运行这些执行时间较长的任务, 从而影响整体的响应性。
有一项技术可以缓解执行时间较长任务造成的影响, 即限定任务等待资源的时间, 而不要无限制地等待。在平台类库的大多数可阻塞方法中, 都同时定义了限时版本和无限时版本, 例如Thread.join、BlockingQueue.put、CountDownLatch.await以及Selector.select等。如果等待超时,那么可以把任务标识为失败,然后中止任务或者将任务重新放回队列以便随后执行。这样, 无论任务的最终结果是否成功, 这种办法都能确保任务总能继续执行下去, 并将线程释放 出来以执行一些能更快完成的任务。如果在线程池中总是充满了被阻塞的任务, 那么也可能表明线程池的规模过小。
二、设置线程池的大小
要设置线程池的大小也并不困难, 只需要避免 “过大” 和 “过小” 这两种极端情况。如果线程池过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争, 这不仅会导致更高的内存使用量, 而且还可能耗尽资源。如果线程池过小, 那么将导致许多空闲的处理器无法执行工作,从而降低吞吐率。
要想正确地设置线程池的大小,必须分析计算环境、资源预算和任务的特性。在部署的系统中有多少个CPU? 多大的内存?任务是计算密集型、I/0密集型还是二者皆可?等等。
-
对于计算密集型的任务,在拥有N个处理器的系统上,当线程池的大小为N+1时,通常能实现最优的利用率。(即使当计算密集型的线程偶尔由于页缺失故障或者其他原因而暂停时,这个 “额外 ” 的线程也能确保CPU的时钟周期不会被浪费。)
-
对于I/O密集型或者其他阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大。要正确地设置线程 池的大小,一种方法是通过另来调节线程池的大小:在某个基准负载下,分别设置不同大小的线程池来运行应用程序,并观察CPU利用率的水平。还可以估算出任务的等待时间与计算时间的比值,通过计算获得合适的线程池大小:
Ncpu = number of CPUs
Ucpu = target CPU utilization, 0 ≤ Ucpu ≤ 1
W / C = ratio of wait time to compute time
要使处理器达到期望的使用率,线程池的最优大小等于
Nthread = Ncpu Ucpu (1 + W / C)
可以通过 Runtime获得CPU的数目:
Int N_CPUS = Runtime.getRuntime().availableProcessors();
当然,CPU周期并不是唯一影响线程池大小的资源,还包括内存、文件句柄、套接字句柄和数据库连接等。计算这些资源对线程池的约束条件是更容易的:计算每个任务对该资源的需求量,然后用该资源的可用总量除以每个任务的需求量,所得结果就是线程池大小的上限。
当任务需要某种通过资源池来管理的资源时,例如数据库连接,那么线程池和资源池的大小将会相互影响。如果每个任务都需要一个数据库连接,那么连接池的大小就限制了线程池的 大小。同样,当线程池中的任务是数据库连接的唯一使用者时,那么线程池的大小又将限制连 接池的大小。
三、配置 ThreadPoolExecutor
ThreadPoolExecutor为一些Executor提供了基本的实现,这些Executor是由 Executors 中 的newCachedThreadPool、newFixedThreadPool和newScheduledThreadExecutor等工厂方法返回的。 ThreadPoolExecutor是一个灵活的、稳定的线程池,允许进行各种定制。
如果默认的执行策略不能满足需求,那么可以通过 ThreadPoolExecutor的构造函数来实例化一个对象,并根据自己的需求来定制,并且可以参考Executors的源代码来了解默认配置下的执行策略, 然后再以这些执行策略为基础进行修改。ThreadPoolExecutor定义了很多构造数, 在程序清单8-2中给出了最常见的形式。
| 代码 8-2 ThreadPoolExecutor**的通用构造函数** |
| public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { … } |
3.1 线程的创建与销毁
线程池的基本大小(Core Pool Size)、最大大小(Maximum Pool Size)以及存活时间等因素共同负责线程的创建与销毁。
- 基本大小也就是线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。
- 线程池的最大大小表示可同时活动的线程数最的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。
通过调节线程池的基本大小和存活时间,可以帮助线程池回收空闲线程占有的资源,从而使得这些资源可以用于执行其他工作。显然,这是种折衷: 回收空闲线程会产生额外的延迟,因为当需求增加时,必须创建新的线程来满足需求。
- newFixedThreadPool工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时。
- newCachedThreadPool工厂方法将线程池的最大大小设置为Integer.MAX_VALUE, 而将基本大小设置为零,并将超时设置为1分钟,这种方法创建出来的线程池可以被无限扩展,并且当需求降低时会自动收缩。
其他形式的线程池可以通过显式的 ThreadPoolExecutor构造函数来构造。
3.2 管理队列任务
在有限的线程池中会限制可并发执行的任务数量。(单线程的Executor是一种值得注意的特例:它们能确保不会有任务并发执行,因为它们通过线程封闭来实现线程安全性。)
如果无限制地创建线程,那么将导致不稳定性,并通过采用固定大小的线程池来解决这个问题,而不是每收到一个请求就创建一个新线程 。然而,这个方案并不完整。在高负载情况下,应用程序仍可能耗尽资源,只是出现问题的概率较小。
- 如果新请求的到达速率超过了线程池的处理速率,那么新到来的请求将累积起来。在线程池中,这些请求会在一个由 Executor管理的 Runnable队列中等待,而不会像线程那样去竞争CPU资源。通过一个Runnable和一个链表节点来表现一个等待中的任务,当然比使用线程来表示的开销低很多,但如果客户提交给服务器请求的速率超过了服务器的处理速率,那么仍可能会耗尽资源。
- 即使请求的平均到达速率很稳定,也仍然会出现请求突增的情况。尽管队列有助于缓解任务的突增问题,但如果任务持续高速地到来,那么最终还是会抑制请求的到达率以避免耗尽内存。甚至在耗尽内存之前,响应性能也将随着任务队列的增长而变得越来越糟。
ThreadPoolExecutor 允许提供一个 BlockingQueue 来保存等待执行的任务。 基本的任务排队方法有 3 种:无界队列、有界队列和同步移交 (Synchronous Handoff)。队列的选择与其他的配置参数有关,例如线程池的大小等。
newFixedThreadPool 和 newSingleThreadExecutor在默认情况下将使用一个无界的 LinkedBlockingQueue。
- 如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。
- 如果任务持续快速地到达,并且超过了线程池处理它们的速度, 那么队列将无限制地增加。
一种更稳妥的资源管理策略是使用有界队列,例如 ArrayBlockingQueue、有界的LinkedBlockingQueue、 PriorityBlockingQueue。在使用有界的工作队列时,队列的大小与线程池的大小必须一起调节。如果线程池较小而队列较大,那么有助于减少内存使用量,降低 CPU的使用率,同时还可以减少上下文切换,但付出的代价是可能会限制吞吐量。
有界队列有助于避免资源耗尽的情况发生,但它又带来了新的问题:当队列填满后,新的任务该怎么办?(有许多饱和策略 (Saturation Policy] 可以解决这个问题)
对于非常大的或者无界的线程池,可以通过使用 SynchronousQueue 来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue 不是一个真正的队列,而是一 种在线程之间进行移交的机制。要将一个元素 放入 SynchronousQueue 中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor 将创建一个新的线程, 否则根据饱和策略,这个任务将被拒绝。
使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是被首先放在队列中,然后由工作者线程从队列中提取该任务。但是,只有当线程池是无界的或者可以拒绝任务时, SynchronousQueue才有实际价值。在newCachedThreadPool 工厂方法中就使用了 SynchronousQueue。
当使用像 LinkedBlockingQueue 或 ArrayBlockingQueue 这样的 FIFO(先进先出)队列时,任务的执行顺序与它们的到达顺序相同。如果想进一步控制任务执行顺序,还可以使用PriorityBlockingQueue,这个队列将根据优先级来安排任务。任务的优先级是通过自然顺序或Comparator(如果任务实现了Comparable)来定义的。
3.3 饱和策略
当有界队列被填满后,饱和策略开始发挥作用。ThreadPoolExecutor的饱和策略可以通过调用setRejectedExecutionHandler来修改。(如果某个任务被提交到一个巳被关闭的Executor时,也会用到饱和策略。)JDK提供了几种不同的RejectedExecutionHandler实现,每种实现都包含有不固的饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy。
- "中止(Abort)"策略是默认的饱和策略,该策略将抛出未检查的RejectedExecution-Exception。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。
- 当新提交的任务法保存到队列中等待执行时,“抛弃(Discard)"策略会悄悄抛弃该任务。
- “抛弃最旧的( Discard-Oldest)"策略则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。如果工 作队列是一个优先队列,那么“抛弃最旧的”策略将导致抛弃优先级最高的任务,因此最好不要将“抛弃最旧的"饱和策略和优先级队列放在一起使用。
- “调用者运行(Caller-Runs)"策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常, 而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务, 而是在一个调用了execute的线程中执行该任务。
我们可以将WebServer示例修改为使用有界队列和“调用者运行” 饱和策略,当线程池中的所有线程都被占用,并且工作队列被填满后,下一个任务会在调用execute时在主线程中执行由于执行任务需要一定的时间,因此主线程至少在一段时间内不能提交任何任务,从而使得工作者线程有时间来处理完正在执行的任务。在这期间,主线程不会调用accept, 因此到达的请求将被保存 在TCP层的队列中而不是在应用程序的队列中。如果持续过载,那么TCP层将最终发现它的请求队列被填满,因此同样会开始抛弃请求。当服务器过载时,这种过载情况会逐渐向外蔓延开来-从线程池到工作队列到应用程序再到TCP层,最终达到客户端,导致服务器在高负载下实现一种平缓的性能降低。
| 代码 8-3 创建一个固定大小的线程池,采用有届队列以及**“调用者运营”**饱和策略 |
| ThreadPoolExecutor executor = new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); |
当工作队列被填满后,没有预定义的饱和策略来阻塞execute。然而,通过使用Semaphore(信号量)来限制任务的到达率,就可以实现这个功能。在程序清单 8-4 的BoundedExecutor中给出了这种方法。该方法使用了一个无界队列(因为不能限制队列的大小和任务的到达率),并设置信号量的上界设置为线程池的大小加上可排队任务的数量,这是因为信号量需要控制正在执行的和等待执行的任务数量。
| 代码 8-4 使用**Semaphore**来控制任务的提交速率 |
| class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command) {
try {
//提交任务前请求信号量
semaphore.acquire();
exec.execute(new Runnable() {
@Override
public void run() {
try {
command.run();
} finally {
//执行完释放信号
semaphore.release();
}
}
});
} catch (InterruptedException e) {
// handle exception
}
}
} |
3.4 线程工厂
每当线程池需要创建一个线程时,都是通过线程工厂方法(请参见程序清单8-5)来完成的。默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊的配置信息。通过指定一个线程工厂方法,可以定制线程池的配置信息。在ThreadFactory 中只定义了一个方法newThread, 每当线程池需要创建一个新线程时都会调用这个方法。
然而,在许多情况下都需要使用定制的线程工厂方法。例如,
- 为线程池中的线程指定一个UncaughtExceptionHandler,
- 实例化一个定制的Thread 类用于执行调试信息的记录。
- 修改线程的优先级(这通常并不是一个好主意。请参见10.3.1节)或者守护状态(同样, 这也不是一个好主意。请参见7.4.2节)。
- 给线程取一个更有意义的名称,用来解释线程的转储信息和错误日志。
| 代码 8-5 ThreadFactory**接口 |
| public interface ThreadFactory {
/
- Constructs a new {@code Thread}. Implementations may also initialize
- priority, name, daemon status, {@code ThreadGroup}, etc.
- @param r a runnable to be executed by new thread instance
- @return constructed thread, or {@code null} if the request to
- create a thread is rejected
*/
Thread newThread(Runnable r);
}
|
在程序清单8-6 的MyThreadFactory 中给出了一个自定义的线程工厂。它创建了一个新的My App Thread 实例, 并将一个特定千线程池的名字传递给MyAppThread的构造函数,从而可以在线程转储和错误日志信息中区分来自不同线程池的线程。在应用程序的其他地方也可以使用MyAppThread, 以便所有线程都能使用它的调试
在MyAppThread中还可以定制其他行为,如程序清单8-6所示,包括:为线程指定名字,设置自定义UncaughtExceptionHandler 向Logger 中写入信息,维护一些统计信息(包括有多少个线程被创建和销毁),以及在线程被创建或者终止时把调试消息写入日志。
代码 8-6 |
---|
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String poolName) {
super();
this.poolName = poolName;
}
@Override
public Thread newThread(Runnable r) {
return new MyAppThread(r);
}
}
public class MyAppThread extends Thread {
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger log = Logger.getAnonymousLogger();
public MyAppThread(Runnable r) {
this(r, DEFAULT\_NAME);
}
public MyAppThread(Runnable r, String name) {
super(r, name + "-" + created.incrementAndGet());
setUncaughtExceptionHandler( //设置未捕获的异常发生时的处理器
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
}
});
}
@Override
public void run() {
boolean debug = debugLifecycle;
if (debug)
log.log(Level.FINE, "running thread " + getName());
try {
alive.incrementAndGet();
super.run();
} finally {
alive.decrementAndGet();
if (debug)
log.log(Level.FINE, "existing thread " + getName());
}
}
public static boolean getDebug() { return debugLifecycle; }
public static void setDebug(boolean b) { debugLifecycle = b; }
} |
如果在应用程序中需要利用安全策略来控制对某些特殊代码库的访问权限, 那么可以通过 Executor 中的 privilegedThreadFactory 工厂来定制自己的线程工厂。 通过这种方式创建出来的 线程, 将与创建 privilegedThreadFactory 的线程拥有相同的访问权限、 AccessControlContext 和 contextClassLoader。
如果不使用 privilegedThreadFactory, 线程池创建的线程将从在需要新 线程时调用 execute 或 submit 的客户程序中继承访问权限, 从而导致令人困惑的安全性异常。
3.5 在调用构造函数后再定制**ThreadPoolExecutor**
在调用完 ThreadPoolExecutor 的构造函数后, 仍然可以通过设置函数 (Setter) 来修改大多数传递给它的构造函数的参数(例如线程池的基本大小、 最大大小、 存活时间、 线程工厂以及拒绝执行处理器 (Rejected Execution Handler)) 。 如果Executor 是通过 Executors 中 的某个 (newSingleTbreadExecutor 除外)工厂方法创建的, 那么可以将结果的类型转换为 ThreadPoolExecutor 以访问设置器, 如程序清单 8-7 所示。
| 代码 8-7 对标准工厂方法创建的线程池进行修改 |
| ExecutorService exec = Executors.newCachedThreadPool();
if (exec instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor) exec).setCorePoolSize(10);
} else {
throw new AssertionError("不能转换");
} |
在 Executors中包含一个 unconfiurableExecutorService 工厂方法, 该方法对一个现有的 ExecutorService 进行包装, 使其只暴露出 ExecutorService 的方法, 因此不能对它进行配置。 newSingleThreadExecutor 返回按这种方式封装的 ExecutorService, 而不是最初的 ThreadPoolExecutor。虽然单线程的 Executor 实际上被实现为一个只包含唯一线程的线程池,但它同样确保了不会并发地执行任务。如果在代码中增加单线程 Executor 的线程池大小, 那么将破坏它的执行语义。
你可以在自己的 Executor 中使用这项技术以防止执行策略被修改。如果将 ExecutorService 暴露给不信任的代码, 又不希望对其进行修改,就可以通过 unconfigurableExecutorService 来 包装它。·
四、扩展 ThreadPoolExecutor
ThreadPoolExecutor 是可扩展的, 它提供了几个可以在子类化中改写的方法: beforeExecute、 afteExecute 和 terminated, 这些方法可以用于扩展 ThreadPoolExecutor 的行为。
在执行任务的线程中将调用 beforeExecute 和 afterExecute 等方法,在这些方法中还可以添加日志、计时、监视或统计信息收集的功能。无论任务是从 run 中正常返回,还是抛出一个 异常而返回, afterExecute 都会被调用。(如果任务在完成后带有一个 Error, 那么就不会调用 after Execute。)如果 beforeExecute 抛出一个 RuntimeException, 那么任务将不被执行, 并且 afterExecute 也不会被调用。JDK版本不同,此处逻辑也有所不同。
在线程池完成关闭操作时调用 terminated, 也就是在所有任务都已经完成并且所有工作者线程也巳经关闭后。 terminated 可以用来释放 Executor 在其生命周期里分配的各种资源, 此外还可以执行发送通知、 记录日志或者收集 finalize 统计信息等操作。
示例:给线程池添加统计信息,如代码 8-8,TimingThreadPool增加了日志记录和任务运行时间统计,并记录已经处理的任务数和总的时间,以及输出任务平均执行时间的日志消息。
| 代码 8-8 增加了日志和计时等功能的线程池 |
| public class TimingThreadPool extends ThreadPoolExecutor {
private final ThreadLocal startTime = new ThreadLocal(); private final Logger log = Logger.getAnonymousLogger();
private final AtomicLong numTasks = new AtomicLong(); //统计任务数
private final AtomicLong totalTime = new AtomicLong(); //线程池运行总时间
public TimingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s", t, r));
startTime.set(System.nanoTime());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));
} finally {
super.afterExecute(r, t);
}
}
@Override
protected void terminated() {
try {
//任务执行平均时间
[log.info](https://blog.csdn.net/biggbang)(String.format("Terminated: average time=%dns", totalTime.get() / numTasks.get()));
} finally {
super.terminated();
}
}
} |
转载请注明:xuhss » 深入浅出线程池+高级选项的使用