线程池的核心思想就是实现线程的复用,避免线程反复创建和销毁带来的性能开销。这里主要介绍了基本的 ThreadPoolExecutor 以及用于定时循环任务的 ScheduledThreadPoolExecutor ,而且线程池是面试的重点。

ThreadPoolExecutor

线程池的核心思想就是实现线程的复用,避免线程反复创建和销毁带来的性能开销。

Java 通过 ThreadPoolExecutor 来创建线程池,其核心参数如下:

初始化状态下的线程池

线程池 execute 工作

1
public void execute(Runnable command){}

execute 方法的实现

线程在线程池内部被封装成内部类 Worker 创建的对象。

在线程池的 execute 方法中是通过 addWorker 方法来创建线程并执行任务的,Worker 会将线程和任务一同封装到 Worker 对象中,在通过 runWorker 方法来执行任务。

runWorker 方法部分实现

在 runWorker 方法内部,通过 while 循环保证执行完任务的线程不被销毁,而是复用不断地 getTask 获取任务,只要能获取到任务就会调用 run 方法继续执行。

每次在执行任务时,都会对 Worker 进行加锁,执行完任务再解锁,这样就可以通过 Worker 的加锁状态判断当前线程是否在执行任务(调用 Worker 的 tryLock)。

通过 getTask 获取任务时,可以设置获取任务的线程的超时退出机制:

当线程池线程数已经超过了核心线程数 or 允许超时退出,会调用阻塞队列的 poll 方法传入 keepAliveTime 参数,等待 keepAliveTime 时间后如果仍然没有获取任务则返回 null,当 getTask 返回 null 时线程就会退出了。反之则调用阻塞队列的 take 方法无限期的等待任务的到来。

通过 getTask 方法的超时等待退出机制的流程总结如下:

线程池的5种状态

在线程池运行过程中,大多数操作都需要判断当前线程池处于什么状态,再决定是否继续执行操作。

将上述五个状态的状态转换总结为下面的流程图:

线程池的关闭

就是上面说到的 shutdown 方法 和shutdownNow 方法。

其中 shutdown 方法中断所有闲置(已经不从任务队列中获取任务)的工作线程——通过 Worker 调用 tryLock 方法判断线程是否工作。

在更改线程池状态的过程中通过加锁来保证状态更新的原子性,防止在多线程环境中有其他线程对线程池进行更改或提交新任务等。

而 shutdownNow 方法中,则会将任务队列中的 task 都移除,直接进入 STOP 状态。

线程池的使用场景

使用了 JDK 内部提供的 Executors 工具类来快速创建线程池。

  1. 模拟 Web 服务器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class WebServer {
    private static final int NTHREADS = 100;
    private static final ExecutorService executor = Executors.newFixedThreadPool(NTHREADS);

    public static void main(String[] args) {
    while (true) {
    // 接受请求 -- 每一个请求都是一个新线程
    Runnable request = new Runnable() {
    @Override
    public void run() {
    // 处理请求
    System.out.println("处理请求: " + Thread.currentThread().getName());
    }
    };
    executor.execute(request);
    }
    }
    }
  2. 并行计算

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class ParallelCalculation {
    private static final int NTHREADS = 4;
    private static final ExecutorService executor = Executors.newFixedThreadPool(NTHREADS);

    public static void main(String[] args) {
    Callable<Double> task = new Callable<>() {
    @Override
    public Double call() throws Exception {
    // 模拟数值计算
    return Math.random() * 100;
    }
    };

    List<Future<Double>> results = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
    results.add(executor.submit(task));
    }

    for (Future<Double> result : results) {
    try {
    System.out.println(result.get());
    } catch (Exception e) {
    throw new RuntimeException(e);
    }
    }
    executor.shutdown();
    }
    }
  3. 模拟处理异步任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public class SyncTask {
    // private static final int NTHREADS = 1;
    private static final ExecutorService executor = Executors.newCachedThreadPool();

    public static void main(String[] args) {
    System.out.println("Main thread executes task...");

    executor.execute(() -> {
    System.out.println("Sub thread executes async task...");
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    System.out.println("Sub thread finishes async task...");
    });
    executor.shutdown();

    System.out.println("Main thread continues executing task...");
    }
    }

通过 Executors 工具类来创建线程池有多种方案,但是不推荐:

  • 固定线程数量的线程池

    1
    ExecutorService newFixedThreadPool(int nThreads)

    newFixedThreadPool 线程池由于使用了 LinkedBlockingQueue,队列的容量默认无限大,实际使用中出现任务过多时会导致内存溢出。

  • 单个线程数量的线程池

    1
    ExecutorService newSingleThreadExecutor()
  • 接近无限大线程数量的线程池

    1
    ExecutorService newCachedThreadPool()    

    newCachedThreadPool 线程池由于核心线程数无限大,当任务过多的时候会导致创建大量的线程,可能机器负载过高导致服务宕机。

  • 带定时调度功能的线程池

    1
    ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

实际项目中合理使用线程池

线程数

Java 中用来获取 CPU 核心数的方法是:

1
Runtime.getRuntime().availableProcessors();

线程工厂

建议自定义,可以设置线程的名称,方便日志查询。

有界队列

在构造时可以传入参数来限制任务队列的大小,防止任务数量过多而导致内存溢出。

利用自定义线程池来改造升级前面 web 服务器、并行计算、异步任务处理的例子。

  1. 模拟 web 服务器

    Web服务器通常需要处理 I/O 操作,比如网络I/O,因此它们被视为 I/O 密集型任务。因此,我们将线程数设置为 2 * CPU 核心数。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class WebServer {
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    private static final int CORE_POOL_SIZE = 2 * CPU_COUNT;
    // 只有在任务队列满了之后才会创建非核心线程
    private static final int MAX_POOL_SIZE = 2 * CPU_COUNT + 1;

    private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(
    CORE_POOL_SIZE,
    MAX_POOL_SIZE,
    60L,
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000)
    );
    public static void main(String[] args) { /* ... */}
    }
  2. 并行计算

    并行计算任务主要用于计算,没有 I/O 阻塞,所以它们是** CPU 密集型**的。线程数设置为CPU核心数 + 1。

  3. 异步任务处理

    异步任务通常涉及到 I/O 操作,比如数据库查询或文件读写,因此它们被视为** I/O 密集型任务**。因此,我们将线程数设置为2 * CPU核心数。

ScheduledThreadPoolExecutor

定时任务类。指定延迟时间后执行任务 or 周期性执行任务。

相对于 ExecutorService 接口,ScheduledExecutorService 接口增加了几个与定时相关的方法。

主要方法

  • schedule 方法

接口和类关系图

  • scheduleAtFixedRate 方法(新增定时周期)

    以后每隔 period 时长再次执行任务。注意,period 是从任务开始执行算起的。

  • scheduleWithFixedDelay 方法(新增定时周期)

    以后每当任务执行完成后,等待 delay 时长,再次执行任务。

  • delayedExecute 方法

    定时任务执行的主要方法,前面两个定时方法都用到了它。通过检查线程池中的状态,将任务加入到等待队列当中,底层调用 addWorker 方法启动并执行任务。

DelayedWorkQueue

在 ScheduledThreadPoolExecutor 中来保存等待的任务。

DelayedWorkQueue 是一个无界优先队列,使用数组存储,底层使用堆结构来实现优先队列的功能。

假设,索引值从 0 开始,子节点的索引值为 k,父节点的索引值为 p,则:

在延迟等待队列中的线程逻辑身份:

condition的 available 的作用:等待和唤醒机制,每个 leader 在执行任务之前会唤醒一个 follower 线程使其称为新的 leader 等待下一个定时任务的到来。

DelayedWorkQueue 是一个优先级队列(插入删除的时间复杂度是 O(logN) ),它可以保证每次出队的任务都是当前队列中执行时间最靠前)。

take 方法

这里的 take 方法和前面线程池介绍的 getTask 方法的区别是:getTask 方法循环从队列中取任务,取出来就执行了;但是这里的定时队列可能取出来的任务还没有到预期的执行时间,所以在 take 中要保证到指定的执行时间才能被取走。

在定时等待队列中选取 leader,作用是为了减少不必要的等待时间。leader 只等待下一个任务到来的时间间隔,其他 followers 无限期等待(我的理解是:每次只唤醒一个线程成为新 leader,而不是通过定时等待 delay 后所有线程都唤醒,造成系统资源的浪费和频繁地上下文切换)。

如果不设计 leader 的话会造成无效的阻塞

offer 方法

实现向延迟队列插入一个任务的操作,并保证整个队列仍然满足最小堆的性质(通过比较新插入节点和 parent 节点的延迟时间)。