Java线程池使用及原理分析

Java线程池使用及原理分析

Scroll Down

背景

我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:

如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?

1. 为什么要用线程池

池化技术相信大家都听说过,线程池、数据库连接池、HTTP连接池等等都是对这个思想的应用。

池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

线程池提供了一种限制和管理资源,每个线程池还维护了一些基本的统计信息,例如:已完成任务的数量。

在《Java并发编程的艺术》中提到使用线程池的好处:

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度:当任务到达时,任务可以不需要等线程创建就能立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的管理分配、调优和监控。

2. 实现Runnable接口和Callable接口的区别

Runnable接口自Java 1.0以来一直存在,但Callable仅在Java 1.5中引入,目的就是为了处理Runnable不支持的用例。Runnable接口不会返回结果或抛出检查异常,但Callable接口可以。如果任务不需要返回结果或者抛出异常推荐使用Runnable接口,这样代码看起来会更加简洁。

注:Executors 工具类可以实现Runnable和Callable对象直接的相互转换

@FunctionalInterface
public interface Runnable {
    /**
     * 该方法没有返回值,也无法抛出异常
     */
    public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {
    /**
     * 计算结果,或在无法这样做的时候抛出异常
     * @return computed result 计算得出的结果
     * @throws Exception if unable to compute a result 如果无法计算结果,则抛出异常
     */
    V call() throws Exception;
}

3. 执行execute() 方法和submit()方法区别

  1. execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否。
  2. submit() 方法用于提交需要返回值的任务,线程池会返回一个Future类型的对象,通过这个对象可以判断任务的执行成功与否。

查看 AbstractExecutorService 接口中的submit方法:

	/**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

上面方法调用了newTaskFor 方法返回了一个FutureTask对象。

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

在来看看execute() 方法:

    public void execute(Runnable task) {
    }

4. 如何创建线程池

《阿里巴巴Java开发手册》中强制线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险,也就常说的OOM。

方式一:通过构造方法实现

方式二:通过Executors 工具类来实现,本质都是调用ThreadPoolExecutor的构造方法。

我们可以创建三种类型的ThreadPoolExecutor:

  • FixedThreadPool:该方法返回一个固定线程数量的线程池。当有一个新任务提交时,线程池中如有空闲的线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便会处理在任务队列中的任务。
  • SingleThreadExecutor:该方法返回只有一个线程的线程池。当有一个新任务提交时,新任务会被暂存在一个任务队列中,待线程空闲时,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool:该方法返回一个可根据实际情况调整线程数量的线程池。如有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,此时有一个新任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

5. ThreadPoolExecutor 分析

ThreadPoolExecutor 类中提供了四个构造方法,我们拿最多参数的构造方法来分析。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

ThreadPoolExecutor 3个最重要的参数:

  • corePoolSize:核心线程数,线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize:当队列中存放的任务达到队列队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue:当新任务来的时候,会先判断当前运行的线程数量是否达到核心线程数,如果达到,新任务就会被存放在队列中。

ThreadPoolExecutor 其他常见参数:

  • keepAliveTime:线程空闲时间,当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize,如果allowCoreThreadTimeout=true,则会直到线程数量=0
  • unit:keepAliveTime 参数的时间单位
  • threadFactory:executor创建新线程的时候会用到。
  • handler:拒绝执行策略。当线程池的缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
// 丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.AbortPolicy
// 也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardPolicy
// 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.DiscardOldestPolicy
// 由调用线程处理该任务
ThreadPoolExecutor.CallerRunsPolicy

6. 动手实现一个线程池

/**
 * 这是一个Runnable类,规定5秒执行任务
 * @author linchaokun
 */
public class MyRunnable implements Runnable{

    private String msg;

    public MyRunnable(String msg) {
        this.msg = msg;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "StartTime:" + LocalDateTime.now());
        processMsg();
        System.out.println(Thread.currentThread().getName() + "EndTime:" + LocalDateTime.now());
    }

    private void processMsg() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return "MyRunnable{" +
                "msg='" + msg + '\'' +
                '}';
    }
}
/**
 * 创建线程池
 * @author linchaokun
 */
public class ThreadPoolExecutorDemo {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final long KEEP_ALIVE_TIME = 1L;
    private static final int QUEUE_CAPACITY = 100;


    public static void main(String[] args) {

        // 通过构造函数 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10; i++) {
            // 创建WorkerThread对象
            Runnable worker = new MyRunnable(i + "");
            // 执行Runnable
            executor.execute(worker);
        }

        // 终止线程
        executor.shutdown();

        while (!executor.isTerminated()){
        }
        System.out.println("停止所有线程。。。");

    }
}

可以看到我们上面的代码指定了:

  1. corePoolSize:核心线程数为5
  2. maximumPoolSize:最大线程数为10
  3. keepAliveTime:等待时间为1L
  4. unit:单位为TimeUnit.SECONDS
  5. workQueue:任务队列为ArrayBlockingQueue,并且容量为100
  6. handler:饱和此略为 CallerRunsPolicy

执行代码结果:

pool-1-thread-3StartTime:2020-07-14T11:09:04.301
pool-1-thread-4StartTime:2020-07-14T11:09:04.301
pool-1-thread-2StartTime:2020-07-14T11:09:04.301
pool-1-thread-1StartTime:2020-07-14T11:09:04.301
pool-1-thread-5StartTime:2020-07-14T11:09:04.301
pool-1-thread-4EndTime:2020-07-14T11:09:09.302
pool-1-thread-1EndTime:2020-07-14T11:09:09.302
pool-1-thread-3EndTime:2020-07-14T11:09:09.302
pool-1-thread-1StartTime:2020-07-14T11:09:09.302
pool-1-thread-3StartTime:2020-07-14T11:09:09.302
pool-1-thread-2EndTime:2020-07-14T11:09:09.302
pool-1-thread-5EndTime:2020-07-14T11:09:09.302
pool-1-thread-5StartTime:2020-07-14T11:09:09.302
pool-1-thread-2StartTime:2020-07-14T11:09:09.302
pool-1-thread-4StartTime:2020-07-14T11:09:09.302
pool-1-thread-1EndTime:2020-07-14T11:09:14.302
pool-1-thread-5EndTime:2020-07-14T11:09:14.302
pool-1-thread-3EndTime:2020-07-14T11:09:14.302
pool-1-thread-4EndTime:2020-07-14T11:09:14.302
pool-1-thread-2EndTime:2020-07-14T11:09:14.302
停止所有线程。。。

7. 线程池原理分析

通过上面执行结果我们可以看出:线程池每次会同时执行5个任务,这5个任务执行完之后,剩余的5个任务才会执行。

为了搞懂线程池的原理,我们需要分析一下 execute方法。我们先来看看它的源码:

// 存放线程池的运行状态(runState)和线程池内有效线程的数量(workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int workerCountOf(int c)  { return c & CAPACITY; }

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // ctl中保存的线程池当前的一些状态信息
        int c = ctl.get();
    	// 1.首先判断当前线程池中执行的任务数量是否小于corePoolSize
    	// 如果小于,通过addWorker方法新建一个线程,并将任务command添加到线程中,然后启动该线程执行任务。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
    	// 2.如果当前执行的任务数量大于等于corePoolSize的时候就会走到这里
    	// 通过isRunning方法判断线程池状态,线程池处于RUNNING状态,该任务才会被加进去
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次获取线程池状态,如果线程池状态不是RUNNING状态,就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕,同时执行拒绝策略。
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果当前线程池为空,就创建一个新的线程并执行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
    	// 3.通过addWorker新建一个线程,并将任务command添加到该线程中,然后启动该线程执行任务。
    	// 如果addWorker执行失败,则通过reject执行相应的拒绝策略。
        else if (!addWorker(command, false))
            reject(command);
    }

用一张图总结一下上面的3个步骤: