线程池
约 8541 字大约 28 分钟
2025-03-12
什么是线程池?为什么要用线程池?
在 Java 中,线程池是一种用于管理和复用线程的机制,属于并发编程的重要组成部分。线程池的主要目的是控制线程的数量,重用已创建的线程,减少线程创建和销毁的开销,提高性能。Java 提供了 java.util.concurrent
包下的线程池框架,以便简化多线程编程。
以下是 Java 线程池的基本概念、实现方式、常用类型以及它的优缺点。
1. 为什么要使用线程池
使用线程池的主要原因包括:
- 降低资源消耗:通过重用线程,减少频繁创建和销毁线程的开销。
- 提高响应速度:当任务到达时,如果有空闲线程可用,可以立即执行任务。
- 提高线程管理的可控性:线程池可以控制线程的数量,避免过多线程导致系统资源耗尽。
- 便于管理:线程池可以提供各种管理机制,比如任务队列、线程的调度、超时处理等。
2. Java 中的线程池框架
Java 中的线程池框架主要通过 Executor
接口以及 ExecutorService
和 Executors
类提供。
Executor
:是一个顶层接口,定义了执行任务的基本方法execute(Runnable command)
。ExecutorService
:继承自Executor
,增加了管理线程池的生命周期和任务提交的方法,比如submit()
、shutdown()
等。Executors
:提供了静态工厂方法,用于创建不同类型的线程池。
3. 常见的线程池类型
Java 提供了几种常见的线程池实现,通过 Executors
类的静态方法创建:
3.1 newFixedThreadPool(int nThreads)
- 描述:创建一个固定大小的线程池,线程池中的线程数固定为
nThreads
,无论多少任务提交到线程池,最多同时有nThreads
个线程在执行。 - 特点:空闲的线程会被重用,如果所有线程都在忙,新的任务会被放入等待队列。有上限。
- 适用场景:适合需要限制线程数量的场景,比如服务器处理有限数量的请求。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
fixedThreadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is executing.");
});
}
fixedThreadPool.shutdown();
3.2 newCachedThreadPool()
- 描述:创建一个可缓存的线程池,线程池会根据需要创建新线程,闲置的线程会被回收。
- 特点:如果线程池中有空闲线程可以复用,就会使用空闲线程,否则创建新的线程。如果线程在 60 秒内没有被使用,将被终止并从池中移除。无上限+自动回收,伸缩性强。
- 适用场景:适合执行大量的短期任务,或负载较轻的服务器。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
cachedThreadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is executing.");
});
}
cachedThreadPool.shutdown();
3.3 newSingleThreadExecutor()
- 描述:创建一个单线程的线程池,线程池中只有一个线程。
- 特点:如果该线程异常终止,会创建新的线程继续执行后续任务。所有任务会被顺序执行(FIFO,LIFO,优先级)。
- 适用场景:适用于需要确保任务按顺序执行的场景,比如某些顺序性要求高的任务。
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
singleThreadExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is executing.");
});
}
singleThreadExecutor.shutdown();
3.4 newScheduledThreadPool(int corePoolSize)
- 描述:创建一个定时任务线程池,支持定时和周期性任务执行。
- 特点:用于执行延时任务和周期性任务。
- 适用场景:适用于需要定期执行的任务,或者需要延时执行的任务,比如定时发送通知。
import java.util.concurrent.*;
public class ScheduledThreadPoolExample {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
// 延迟3秒后执行一次任务
scheduledThreadPool.schedule(() -> {
System.out.println("Task executed after 3 seconds delay.");
}, 3, TimeUnit.SECONDS);
// 每隔5秒执行一次任务,不考虑上次任务是否完成
scheduledThreadPool.scheduleAtFixedRate(() -> {
System.out.println("Task executed at fixed rate of 5 seconds.");
}, 0, 5, TimeUnit.SECONDS);
// 每隔3秒执行一次任务,在上次任务完成后才开始新的任务
scheduledThreadPool.scheduleWithFixedDelay(() -> {
System.out.println("Task executed with a delay of 3 seconds after the previous task completion.");
}, 0, 3, TimeUnit.SECONDS);
}
}
4. 线程池的工作原理
线程池的工作原理通常包括以下几个部分:
- 任务提交:当有任务提交到线程池时,线程池会首先检查是否有空闲的线程。如果有,则直接使用空闲线程执行任务;否则,新任务进入任务队列。
- 任务队列:线程池内部有一个任务队列,保存着等待执行的任务。线程池中的线程从任务队列中取出任务并执行。如果所有线程都在忙,新的任务会被放入任务队列等待。
- 线程的创建与销毁:线程池中的线程数根据配置和负载情况动态调整。对于
newCachedThreadPool()
类型的线程池,空闲时间超过 60 秒的线程会被销毁。 - 任务执行与回收:当线程完成任务后,如果线程池还需要该线程执行新任务,则该线程会被重用;否则会被回收。
上面的1有错误,当有任务提交到线程池,且任务队列已满,且当前线程数小于核心线程数时,线程池会直接创建新线程来执行任务。因为找空闲线程是很慢的,而创建新线程很快。
5. 线程池的参数
使用 ThreadPoolExecutor
类可以创建更灵活的线程池,允许指定更多的参数:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, // 核心线程数
maximumPoolSize, // 最大线程数
keepAliveTime, // 线程空闲保活时间
unit, // 时间单位
workQueue, // 任务队列
threadFactory, // 线程工厂
handler // 拒绝策略
);
参数解释:
- corePoolSize:核心线程数,线程池中会保持的最小线程数。
- maximumPoolSize:线程池中允许的最大线程数。
- keepAliveTime:当线程池中线程数超过核心线程数时,多余的空闲线程最大等待时间,超过这个时间会被销毁。
- unit:
keepAliveTime
的时间单位。 - workQueue:任务等待队列,用于存储等待执行的任务。
- threadFactory:用于创建新线程的工厂。
- handler:当任务无法执行时的拒绝策略。
6. 拒绝策略
当线程池无法处理任务时,可以使用拒绝策略来处理。常见的拒绝策略包括:
- AbortPolicy(默认):直接抛出
RejectedExecutionException
异常。 - CallerRunsPolicy:由提交任务的线程执行任务(一般是主线程),避免任务丢失。
- DiscardPolicy:直接丢弃任务,不抛出异常。AbortPolicy的silent版本。
- DiscardOldestPolicy:丢弃队列中最早的任务,然后尝试执行新任务。
7. 线程池的优缺点
优点
- 性能提升:重用线程减少了线程创建和销毁的开销,特别是在高频率并发任务中效果明显。
- 资源管理:控制线程数量,避免系统资源耗尽。
- 任务调度:线程池支持多种任务调度机制,便于管理和控制任务的执行。
缺点
- 资源消耗:线程池本身也占用资源,特别是在高并发时,线程池大小和任务队列的合理配置尤为重要。
- 复杂性:合理配置线程池参数需要经验,错误的参数配置可能会导致性能问题甚至系统崩溃。
- 死锁与资源泄露:不当使用线程池可能导致线程池中的线程因死锁而阻塞,或者由于线程未正确回收而造成资源泄露。
8. 使用注意事项
- 选择合适的线程池类型:根据任务类型选择固定线程池、缓存线程池、单线程池或定时线程池。
- 合理配置核心参数:根据系统资源和任务特性配置核心线程数、最大线程数、队列长度等参数。
- 线程池的关闭:任务完成后,使用
shutdown()
或shutdownNow()
关闭线程池,防止资源泄露。 - 异常处理:在线程池中运行的任务可能会抛出异常,确保任务中的异常得到妥善处理,以防线程提前退出。
总结
Java 中的线程池通过 Executor
框架提供了多种线程管理方式,通过合理配置线程池可以提高系统性能,减少资源消耗。线程池可以帮助程序更好地控制线程的数量和执行顺序,是并发编程中的重要工具。在实际使用时,选择合适的线程池类型和参数,并合理处理拒绝策略和异常处理,能够避免许多并发编程中的常见问题。
线程池的拒绝策略有哪些
AbortPolicy, 默认
该策略是线程池的默认策略。使用该策略时,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
DiscardPolicy
这个策略和AbortPolicy的slient版本,如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。
DiscardOldestPolicy
这个策略从字面上也很好理解,丢弃最老的。也就是说如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列。 因为队列是队尾进,队头出,所以队头元素是最老的,因此每次都是移除对头元素后再尝试入队。
CallerRunsPolicy
使用此策略,如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行。就像是个急脾气的人,我等不到别人来做这件事就干脆自己干。
线程池都有哪些状态?
RUNNING:这是最正常的状态,接受新的任务,处理等待队列中的任务。
SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务。
STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程。
TIDYING:所有的任务都销毁了,workerCount 为 0,线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()。
TERMINATED:terminated()方法结束后,线程池的状态就会变成这个。
线程池中 submit() 和 execute() 方法有什么区别?
相同点:
相同点就是都可以开启线程执行池中的任务。
不同点:
接收参数:execute()只能执行 Runnable 类型的任务。submit()可以执行 Runnable 和 Callable 类型的任务。
返回值:submit()方法可以返回持有计算结果的 Future 对象(Callable 类型),而execute()没有
异常处理:submit()方便Exception处理。通过返回的Future对象的get方法获取。
分析线程池的实现原理和线程的调度过程
提交一个任务到线程池中,线程池的处理流程如下:
- 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
- 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
- 如果向任务队列满了,但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。最大线程是在必要时才创建的。
- 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,调用拒绝策略方法。
无界阻塞队列永远都不会满,所以用无界阻塞队列的线程池,就算核心线程数小于最大线程数也永远不会去新增线程来执行任务。
线程池的最大线程数目根据什么确定
确定线程池的最大线程数目是一个关键的设计决策,它直接影响系统的性能和资源利用。线程池的大小设置过大或过小都可能导致性能问题。过小的线程池会导致任务排队等待时间过长,而过大的线程池会耗尽系统资源。因此,合理地设置线程池的最大线程数需要考虑多个因素,如任务的类型、系统的资源、吞吐量等。
以下是确定线程池大小的几个重要因素和方法:
1. 任务类型
首先需要明确任务是CPU 密集型还是I/O 密集型。两种类型的任务对线程池大小的需求不同:
- CPU 密集型任务:CPU 密集型任务主要是指计算密集型任务,比如数据处理、数学运算等。这类任务主要消耗 CPU 资源,几乎不涉及 I/O 操作,且需要频繁使用 CPU。
- 建议的线程池大小:对于 CPU 密集型任务,线程池大小一般设置为
CPU 核心数 + 1
。多一个线程可以应对偶尔的线程上下文切换或阻塞。 - 公式:
最大线程数 = CPU 核心数 + 1
- 建议的线程池大小:对于 CPU 密集型任务,线程池大小一般设置为
- I/O 密集型任务:I/O 密集型任务会涉及大量的 I/O 操作,比如文件读写、网络请求等,这类任务会频繁等待 I/O 操作完成,而在等待期间不占用 CPU。
- 建议的线程池大小:对于 I/O 密集型任务,线程池大小可以设置为
CPU 核心数 * 2
或更多,具体值取决于 I/O 等待时间和并发需求。 - 公式:
最大线程数 = CPU 核心数 * (1 + I/O 耗时 / CPU 耗时)
,这个公式可以根据任务的 I/O 等待时间比率进行调整。
- 建议的线程池大小:对于 I/O 密集型任务,线程池大小可以设置为
2. 系统资源
线程池大小受限于系统资源,特别是 CPU 和内存。如果线程池设置过大,系统资源(如 CPU、内存)可能被耗尽,导致系统崩溃或显著的性能下降。因此,线程池的大小需要根据系统资源进行合理配置。
- CPU 资源:过多的线程会导致频繁的线程上下文切换,增加 CPU 开销,反而降低了系统的效率。对于 CPU 密集型任务,可以将线程数限制在 CPU 核心数附近。
- 内存资源:每个线程的运行都需要内存,特别是在高并发场景中,线程池中线程过多会导致内存耗尽。因此,需要确保线程数不会超过系统内存可以支撑的范围。
3. 吞吐量需求
不同系统对于吞吐量的要求不同。如果系统需要处理大量请求或任务,可以考虑适当增大线程池。但也要注意在高吞吐量要求下,必须保证系统的稳定性,避免因线程数过多而导致资源耗尽。
- 测试:可以通过压力测试来确定线程池的最佳大小。通过模拟不同并发量的请求,观察系统的响应时间和资源使用情况,找到一个合适的线程数。线程池的最佳大小通常是保证系统达到较高吞吐量的同时,资源消耗在合理范围内的配置。
4. 基于公式计算线程池大小
可以使用以下公式来近似计算线程池的最佳大小:
公式:
Nthreads=NCPU×(1+CW)
- Nthreads:最佳线程数。
- NCPU:CPU 核心数。
- W:等待时间(I/O 或网络请求的等待时间)。
- C:计算时间(CPU 时间)。
此公式的核心思想是增加线程数,以弥补 I/O 操作造成的等待时间,从而更好地利用 CPU 资源。I/O 密集型任务通常有较大的 CW 值,因此需要更多的线程来覆盖等待时间。而对于 CPU 密集型任务,W≈0,所以线程数接近 CPU 核心数即可。
5. 动态调整线程池大小
在生产环境中,任务的类型和数量可能会动态变化。可以使用支持动态调整的线程池(如 ThreadPoolExecutor
),根据系统的负载自动调整线程池的大小。这样可以在高负载时增加线程数,而在低负载时减少线程数,从而更加高效地利用系统资源。
示例:动态线程池配置
使用 ThreadPoolExecutor
类可以手动配置线程池的核心线程数、最大线程数、队列大小等参数,提供更灵活的线程池管理。
import java.util.concurrent.*;
public class DynamicThreadPoolExample {
public static void main(String[] args) {
int corePoolSize = 4; // 核心线程数
int maximumPoolSize = 10; // 最大线程数
long keepAliveTime = 60; // 线程空闲时间
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100) // 阻塞队列的大小
);
for (int i = 0; i < 20; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is executing.");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
在这个例子中,我们设置了核心线程数为 4,最大线程数为 10,空闲线程的存活时间为 60 秒。根据任务数量和执行时间,线程池会动态调整线程数。
6. 使用合理的任务队列
线程池中的任务队列(如 LinkedBlockingQueue
、ArrayBlockingQueue
)也会影响线程池的最大线程数。不同类型的队列会影响线程的创建和调度:
- 无界队列(如
LinkedBlockingQueue
):可以容纳任意多的任务,线程池会根据核心线程数来控制线程的数量,不会轻易创建新的线程。这适合核心线程数足够多、任务执行时间较短的场景,但可能导致队列任务过多,内存溢出。 - 有界队列(如
ArrayBlockingQueue
):设置队列大小限制,任务超出队列大小时,线程池会创建新的线程,直到达到最大线程数。适合任务数量较大、执行时间较长的场景,能有效控制内存使用。
7. 实际建议
- CPU 密集型任务:建议将线程数设置为 CPU 核心数的数量,可以稍微增加一个线程以应对上下文切换。
- I/O 密集型任务:一般可以设置为 CPU 核心数的 2 倍或更大,因为 I/O 操作会导致线程在等待,增加线程数可以更好地利用 CPU。
- 混合型任务:如果一个系统包含 CPU 密集型和 I/O 密集型任务,可以考虑将两者分开处理,使用不同的线程池来管理不同类型的任务,分别设置合理的线程池大小。
8. 线程池大小的测试与优化
确定线程池大小的一个重要方法是通过性能测试和调优。在测试环境中,模拟实际的并发场景,通过压力测试找到线程池的最佳配置,观察以下几个指标:
- CPU 使用率:CPU 使用率应该接近合理范围(通常为 70%-90%),过高可能导致过多的线程上下文切换,过低则意味着 CPU 没有得到充分利用。
- 内存消耗:观察内存消耗是否稳定,如果线程池太大,可能会导致内存不足。
- 响应时间:保证响应时间在可接受范围内,测试不同线程池大小对任务处理时间的影响。
- 吞吐量:观察系统能处理的最大任务数,找到系统的瓶颈并适当调整线程池大小。
通过测试和优化,可以找到适合系统需求的线程池大小配置。实际应用中,由于任务的负载会动态变化,因此根据负载调整线程池大小(如 ThreadPoolExecutor
的动态配置)是一种更灵活的策略。
总结
- CPU 密集型任务:设置线程池大小为
CPU 核心数 + 1
。 - I/O 密集型任务:设置线程池大小为
CPU 核心数 * (1 + I/O 时间 / CPU 时间)
,通常约为CPU 核心数 * 2
或更大。 - 混合任务:分开处理 CPU 密集型和 I/O 密集型任务,使用不同线程池。
- 动态调整和测试:通过压力测试找到最佳配置,并结合实际负载进行动态调整。
线程池如何调优
线程数:
CPU 密集型任务配置尽可能小的线程,cpu核数+1。
IO 密集型任务则由于线程并不是一直在执行任务,则配置尽可能多的线程,如2*cpu核数。
依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,线程数应该设置得较大,这样才能更好的利用 CPU。
拆分任务:
混合型任务,如果可以拆分,则将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务。只要这两个任务执行的时间相差不是太大,那么分解后并发执行的吞吐率要高于串行执行的吞吐率;如果这两个任务执行时间相差太大,则没必要进行分解。
队列:
优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理,它可以让优先级高的任务先得到执行。
执行时间不同的任务可以交给不同线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
建议使用有界队列,有界队列能增加系统的稳定性和预警能力。可以根据需要设大一点,比如几千。使用无界队列,线程池的队列就会越来越大,有可能会撑满内存,导致整个系统不可用。
线程池如何实现动态修改?
首先线程池提供了部分setter方法可以设置线程池的参数,修改核心线程数,最大线程数,空闲线程停留时间,拒绝策略等。
可以将线程池的配置参数放入配置中心(nacos),当需要调整的时候,去配置中心修改就行。
什么时候修改呢?
这里需要监控报警策略,获取线程池状态指标,当指标判定为异常之后进行报警
分析指标异常原因,评估处理策略,最后通过上述线程池提供的接口进行动态修改。
运行时更改核心线程数不需要重建线程池。
在 Java 中,可以通过 ThreadPoolExecutor
类提供的方法实现线程池的动态调整。ThreadPoolExecutor
允许我们在运行时动态修改线程池的核心线程数、最大线程数等参数,以便根据实际情况对线程池进行优化。这种动态调整可以有效地应对任务负载的变化,提高系统资源利用率。
以下是 Java 线程池实现动态修改的几种方式:
1. 动态修改核心线程数和最大线程数
ThreadPoolExecutor
提供了以下两个方法,可以在运行时动态调整线程池的核心线程数和最大线程数:
setCorePoolSize(int corePoolSize)
:设置线程池的核心线程数。核心线程数决定了线程池在空闲时的最低线程数量,低于此数量的线程不会被回收。setMaximumPoolSize(int maximumPoolSize)
:设置线程池的最大线程数。最大线程数决定了线程池在高负载时最多能创建的线程数。
示例代码:
import java.util.concurrent.*;
public class DynamicThreadPoolExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个线程池,初始核心线程数为2,最大线程数为4
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 初始核心线程数
4, // 初始最大线程数
60, // 空闲线程存活时间
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10) // 阻塞队列容量
);
// 提交几个任务到线程池中
for (int i = 0; i < 5; i++) {
executor.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " is executing.");
Thread.sleep(2000); // 模拟任务执行
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 动态调整核心线程数和最大线程数
System.out.println("Modifying core and max pool size...");
executor.setCorePoolSize(3); // 动态调整核心线程数
executor.setMaximumPoolSize(6); // 动态调整最大线程数
// 提交额外任务以观察线程池调整效果
for (int i = 0; i < 5; i++) {
executor.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " is executing.");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}
}
说明:
- 线程池创建时核心线程数为 2,最大线程数为 4。
- 运行时动态将核心线程数调整为 3,最大线程数调整为 6。
- 新任务提交后,线程池将根据新的配置调整线程的创建和使用。
2. 动态调整线程池的队列容量
在某些场景下,除了调整线程数,我们还可能需要调整任务队列的大小。ThreadPoolExecutor
默认不提供动态调整队列大小的直接方法,但是可以通过替换队列来实现这一目的。
示例代码:
import java.util.concurrent.*;
public class DynamicQueueExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个线程池,使用初始队列
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60, // 空闲线程存活时间
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5) // 初始阻塞队列容量为5
);
// 提交一些任务到线程池中
for (int i = 0; i < 7; i++) {
executor.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " is executing.");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 创建一个新的更大的队列
BlockingQueue<Runnable> newQueue = new LinkedBlockingQueue<>(10);
// 替换旧队列(注意:需要确保没有正在等待的任务在队列中)
executor.getQueue().drainTo(newQueue); // 将现有队列的任务转移到新队列
executor.setQueue(newQueue); // 使用反射设置新队列
System.out.println("Queue size adjusted to 10.");
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}
}
注意:由于
ThreadPoolExecutor
没有公开的setQueue()
方法,因此在某些场景下直接替换队列比较复杂。通常推荐在创建线程池时选择合适的队列大小,而不是在运行时动态更改队列大小。
3. 动态调整线程空闲时间
可以通过 setKeepAliveTime``(long time, TimeUnit unit)
方法调整线程的空闲存活时间。此方法设置了非核心线程在空闲时等待新任务的最长时间,当时间超过指定值时,空闲线程将被终止。
executor.setKeepAliveTime(30, TimeUnit.SECONDS);
- 如果任务量变化较大,可以适当缩短空闲时间,以便非核心线程更快地释放资源。
- 对于需要长期保持线程的任务,可以增加空闲时间,确保线程保持活动状态。
4. 动态调整线程池拒绝策略
在高负载时,线程池可能会无法处理所有任务,导致任务被拒绝。可以通过 setRejectedExecutionHandler
****(RejectedExecutionHandler handler)
方法设置新的拒绝策略,以动态应对高负载的情况。Java 提供了以下内置拒绝策略:
- AbortPolicy(默认):直接抛出
RejectedExecutionException
异常。 - CallerRunsPolicy:由调用线程执行被拒绝的任务(通常是主线程)。
- DiscardPolicy:直接丢弃被拒绝的任务,不抛出异常。
- DiscardOldestPolicy:丢弃等待队列中最早的任务,然后尝试重新执行被拒绝的任务。
示例:
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
在运行时根据实际需求调整拒绝策略,可以帮助系统在高负载时更平稳地处理任务。例如,当任务量突然增加时,可以使用 CallerRunsPolicy
来减轻线程池的压力。
5. 定期动态调整线程池大小
如果任务负载经常变化,可以通过定期检测系统的负载,自动调整线程池的大小。以下是使用定时任务动态调整线程池大小的示例:
import java.util.concurrent.*;
public class ScheduledThreadPoolAdjustment {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
10,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100)
);
// 定时任务定期调整线程池大小
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
int queueSize = executor.getQueue().size();
System.out.println("Current queue size: " + queueSize);
if (queueSize > 50) {
executor.setCorePoolSize(5); // 增加核心线程数
executor.setMaximumPoolSize(15); // 增加最大线程数
System.out.println("Increased thread pool size due to high load.");
} else if (queueSize < 10) {
executor.setCorePoolSize(2); // 减少核心线程数
executor.setMaximumPoolSize(10); // 减少最大线程数
System.out.println("Decreased thread pool size due to low load.");
}
}, 0, 5, TimeUnit.SECONDS); // 每 5 秒调整一次
// 模拟提交任务
for (int i = 0; i < 100; i++) {
executor.execute(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
Thread.sleep(30000); // 运行 30 秒
scheduler.shutdown();
executor.shutdown();
}
}
说明:
- 使用
ScheduledExecutorService
定时检查线程池的任务队列长度。 - 根据队列长度动态调整线程池的核心线程数和最大线程数。
- 当任务量大时,增加线程池大小;当任务量减少时,恢复到默认大小。
总结
Java 的 ThreadPoolExecutor
提供了多种动态调整线程池的方法。动态调整线程池的常用方法包括:
- 动态调整核心线程数和最大线程数:使用
setCorePoolSize
和setMaximumPoolSize
。 - 动态调整线程空闲时间:使用
setKeepAliveTime
。 - 动态调整拒绝策略:使用
setRejectedExecutionHandler
。 - 定期动态调整:结合
ScheduledExecutorService
定期检测系统负载并动态调整线程池大小。
通过合理使用这些方法,可以在运行时更高效地管理线程池资源,提高系统的响应速度和性能。
使用无界队列的线程池会导致什么问题?
例如newFixedThreadPool使用了无界的阻塞队列LinkedBlockingQueue,如果线程获取一个任务后,任务的执行时间比较长,会导致队列的任务越积越多,导致机器内存使用不停飙升,最终导致OOM。
如果线程池当前处于空闲的状态,核心线程数量是不会被销毁的,那这几个核心线程处于什么状态?为什么处于这个状态?
考察线程复用的逻辑,以及对线程状态机的理解。
首先线程本身创建和销毁都是成本比较高的,那就排除 new 和 terminated 状态,没有任务运行排除 runnable 状态,剩下阻塞和等待,因为线程不会销毁需要一直等待执行任务,超时等待也不太可能,最后同步锁(重量级锁)才会进入阻塞状态,所以我猜是一直等待。
你能设计实现一个线程池吗?
⭐这道题在阿里的面试中出现频率比较高
我们自己的实现就是完成这个核心流程:
- 线程池中有N个工作线程
- 把任务提交给线程池运行
- 如果线程池已满,把任务放入队列
- 最后当有空闲时,获取队列中任务来执行
实现代码[6]:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
public class MyThreadPoolExecutor implements Executor {
// 用于记录线程池中当前线程数量
private final AtomicInteger ctl = new AtomicInteger(0);
// 核心线程数,即最小线程池大小
private volatile int corePoolSize;
// 最大线程数,即线程池允许的最大线程数
private volatile int maximumPoolSize;
// 存放待执行任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
/**
* 构造线程池
*
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param workQueue 阻塞队列,用于存放任务
*/
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || corePoolSize > maximumPoolSize) {
throw new IllegalArgumentException("Invalid pool size");
}
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
}
/**
* 执行提交的任务
*
* @param command 任务
*/
@Override
public void execute(Runnable command) {
if (command == null) throw new NullPointerException("Task cannot be null");
int currentPoolSize = ctl.get();
// 如果当前线程数小于核心线程数,则优先创建核心线程
if (currentPoolSize < corePoolSize) {
if (addWorker(command)) {
return;
}
}
// 如果核心线程已满,则尝试将任务加入队列
if (!workQueue.offer(command)) {
// 如果任务队列已满,尝试创建新的线程执行任务
if (!addWorker(command)) {
// 如果线程池已满,执行拒绝策略
reject();
}
}
}
/**
* 拒绝策略:在无法执行任务时抛出异常
*/
private void reject() {
throw new RuntimeException("Task rejected! Current pool size: "
+ ctl.get() + ", workQueue size: " + workQueue.size());
}
/**
* 添加一个工作线程
*
* @param firstTask 要执行的任务
* @return 添加是否成功
*/
private boolean addWorker(Runnable firstTask) {
while (true) {
int currentPoolSize = ctl.get();
// 如果线程数量超过最大线程数,不创建新线程
if (currentPoolSize >= maximumPoolSize) return false;
// 通过CAS增加线程计数,避免并发问题
if (ctl.compareAndSet(currentPoolSize, currentPoolSize + 1)) {
Worker worker = new Worker(firstTask);
worker.thread.start(); // 启动线程
return true;
}
}
}
/**
* 内部工作线程,用于执行任务
*/
private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
/**
* Worker构造方法
*
* @param firstTask 首个要执行的任务
*/
public Worker(Runnable firstTask) {
this.thread = new Thread(this);
this.firstTask = firstTask;
}
/**
* Worker线程的run方法,循环执行任务
*/
@Override
public void run() {
Runnable task = firstTask;
try {
// 持续获取并执行任务,直到线程池大小超过最大线程数
while (task != null || (task = getTask()) != null) {
try {
task.run();
} catch (RuntimeException e) {
System.out.println("Task execution failed: " + e.getMessage());
} finally {
task = null; // 设置为null,以便下次循环中重新获取任务
}
// 如果线程池当前大小超过最大线程数,终止循环
if (ctl.get() > maximumPoolSize) {
break;
}
}
} finally {
// 线程完成后,减少线程计数
ctl.decrementAndGet();
}
}
/**
* 从任务队列中获取任务
*
* @return 返回任务队列中的任务
*/
private Runnable getTask() {
try {
// 阻塞获取队列中的任务
return workQueue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
}
// 测试线程池
public static void main(String[] args) {
// 创建线程池,核心线程数为2,最大线程数为4,任务队列容量为10
MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor(2, 4,
new ArrayBlockingQueue<>(10));
// 提交多个任务到线程池
for (int i = 0; i < 15; i++) {
int taskNum = i;
myThreadPoolExecutor.execute(() -> {
try {
Thread.sleep(1500); // 模拟任务执行
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务编号:" + taskNum + " 由线程:" + Thread.currentThread().getName() + " 执行");
});
}
}
}
这样,一个实现了线程池主要流程的类就完成了。
单机线程池执行断电了应该怎么处理?
- 阻塞队列持久化,服务器重启后阻塞队列中的数据再加载。
- 正在处理任务事务控制
- 断电之后正在处理任务的回滚,undo
- 通过日志恢复该次操作,redo