线程池

线程池:三大方法,七大参数,4中拒绝策略

Executors

是一个工具类,三个常用方法

// 创建一个线程
var es = Executors.newSingleThreadExecutor();
// 创建固定长度的线程池,比如4个
var expool = Executors.newFixedThreadPool(4);
// 创建弹性可伸缩的线程池
Executors.newCachedThreadPool();

关闭线程池

// 关闭线程池
es.shutdown();

使用案例

package com.starry.service.starrythreads;

import java.util.concurrent.Executors;

public class ThreadPoolDemo {
    public static void main(String[] args) {
        // 创建一个线程
        var es = Executors.newSingleThreadExecutor();
        // 创建固定长度的线程池,比如4个
        var expool = Executors.newFixedThreadPool(4);
        // 创建弹性可伸缩的线程池
        Executors.newCachedThreadPool();

        try {
            for (int i = 0; i < 10; i++) {
                final int temp = i;
                es.execute(() -> {
                    System.out.printf(Thread.currentThread().getName() + String.valueOf(temp)); // 永远是同一个线程执行
                });
            }

            for (int i = 0; i < 10; i++) {
                final int temp = i;
                expool.execute(() -> {
                    System.out.printf(Thread.currentThread().getName() + String.valueOf(temp)); // 最多4个线程协作执行
                });
            }
        } finally {
            // 关闭线程池
            es.shutdown();

            // 关闭线程池
            expool.shutdown();
        }
    }
}

源码分析

  public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
     public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

本质是ThreadPoolExecutor

ThreadPoolExecutor运行原理:

ThreadPoolExecutor将根据corePoolSize和maximumPoolSize设置的值调整线程池大小。当新任务调用方法execute(Runnable)提交时,如果运行的线程少于corePoolSize,则创建新线程来处理请求。如果正在运行的线程数等于corePoolSize时,则新任务被添加到队列中,直到队列满。当队列满了后,会继续开辟新线程来处理任务,但不超过最大线程数。当任务队列满了并且已开辟了最大线程数,此时又来了新任务,ThreadPoolExecutor会拒绝服务。

只能用ThreadPoolExecutor类,==不可以使用Executors创建==

阿里巴巴规范只能用ThreadPoolExecutor类,==不可以使用Executors创建==

为啥不允许呢?

public LinkedBlockingQueue() {
     this(Integer.MAX_VALUE);
 }

结论:

​ 1)FixedThreadPool 和 SingleThreadPool:

允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

 2)CachedThreadPool 和 ScheduledThreadPool: 

​ 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

七大参数

==先看源码:ThreadPoolExecutor类==

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize 核心线程池大小
  • maximumPoolSize 最大线程池大小
  • keepAliveTime 当线程空闲超过keepAliveTime,非核心线程会被回收,若allowCoreThreadTimeOut为true则核心线程也会被回收。
  • TimeUnit 超时单位
  • workQueue 阻塞队列,==当满负载的时候,再多的任务就会放在这个队列里等候执行,相当于等候区==
  • threadFactory 线程工厂,一般不用动
  • handler 拒绝策略 ==如果任务满载最大线程池且设置的队列中也满了,则执行这个拒绝策略处理超期的任务==

当线程空闲超过keepAliveTime,非核心线程会被回收,若allowCoreThreadTimeOut为true则核心线程也会被回收。

==默认情况下,核心线程不会被回收;当allowCoreThreadTimeOut为true,核心线程也会被回收。==

四大拒绝策略

线程池共包括4种拒绝策略,它们分别是:AbortPolicy, CallerRunsPolicy, DiscardOldestPolicyDiscardPolicy

AbortPolicy(jdk默认策略)

AbortPolicy         -- 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。(jdk默认策略)
CallerRunsPolicy    -- 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。
DiscardOldestPolicy -- 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
DiscardPolicy       -- 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。