Java线程池使用

本文源码基于JDK8

Java原生线程池

Java原生线程池核心类为java.util.concurrent.ThreadPoolExecutor

构造方法

public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue,  
                          ThreadFactory threadFactory,  
                          RejectedExecutionHandler handler) {  
    if (corePoolSize < 0 ||  
        maximumPoolSize <= 0 ||  
        maximumPoolSize < corePoolSize ||  
        keepAliveTime < 0)  
        throw new IllegalArgumentException();  
    if (workQueue == null || threadFactory == null || handler == null)  
        throw new NullPointerException();  
    this.corePoolSize = corePoolSize;  
    this.maximumPoolSize = maximumPoolSize;  
    this.workQueue = workQueue;  
    this.keepAliveTime = unit.toNanos(keepAliveTime);  
    this.threadFactory = threadFactory;  
    this.handler = handler;  
}

参数含义

  • corePoolSize 线程池核心线程数
  • maximumPoolSize 线程池最大线程数
  • keepAliveTime 线程最长空闲时间
  • unit 线程最长空闲时间的时间单位
  • workQueue 任务队列
  • threadFactory 线程工厂
  • handler 饱和策略

例子

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(10), 
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.CallerRunsPolicy());

线程池运作流程

  • 典型情况下,当一个线程池ThreadPoolExecutor被创建后,线程池内的线程数为0
  • 在向线程池提交第一个任务时,创建第一个线程并执行任务。之后不管线程池是否有空闲的线程,每次再向线程池提交任务时,都会再创建一个新的线程,直到线程池线程数等于corePoolSize
  • 线程池的线程数量小于等于corePoolSize时,即使有线程空闲时间达到keepAliveTime,也不会回收线程
  • 继续向线程池提交要执行的任务,则任务进入任务队列workQueue,假如有空闲的线程,则由空闲的线程执行任务且任务移出队列。假如没有空闲的线程,则任务停留在任务队列workQueue,完成了之前任务的线程会从任务队列中获取要执行的下一个任务
  • 假如任务队列是有界的,任务队列有可能会被填满,如果再向线程池提交任务,则该任务无法被加入到队列当中,这时会创建新的线程从任务队列里获取任务并执行,原来无法加入队列的任务现在可以加入队列了
  • 假如后面继续出现任务无法加入队列的情况,则会继续创建新的线程,直到线程池的线程数等于maximumPoolSize
  • 当线程池的线程数等于maximumPoolSize,且有任务无法加入队列,则线程池将拒绝任务,并根据饱和策略作响应
  • 当线程池的线程数量大于corePoolSize时,如果有线程的空闲时间到达keepAliveTime的话,该线程将被回收

饱和策略

  • AbortPolicy 丢弃新任务,抛出异常
  • CallerRunsPolicy 由提交新任务的线程执行新任务
  • DiscardPolicy 丢弃新任务,不抛出异常
  • DiscardOldestPolicy 丢弃队列头的第一个任务,也即最老的未被执行的任务,再提交新任务

预定义线程池

通过Executors的静态方法创建

以下线程池都存在内存被耗尽的风险,不建议使用

fixedThreadPool,线程池的核心线程数和最大线程数相等,任务队列是无界的,最大长度为Integer.MAX_VALUE

public static ExecutorService newFixedThreadPool(int nThreads) {  
    return new ThreadPoolExecutor(nThreads, nThreads,  
                                  0L, TimeUnit.MILLISECONDS,  
                                  new LinkedBlockingQueue<Runnable>());  
}

singleThreadExecutor,线程池核心线程数和最大线程数都为1,任务队列是无界的,最大长度为Integer.MAX_VALUE

public static ExecutorService newSingleThreadExecutor() {  
    return new FinalizableDelegatedExecutorService  
        (new ThreadPoolExecutor(1, 1,  
                                0L, TimeUnit.MILLISECONDS,  
                                new LinkedBlockingQueue<Runnable>()));  
}

cachedThreadPool,线程池最大线程数为Integer.MAX_VALUE,任务队列SynchronousQueue是同步移交的,即要将一个任务放入队列,必须要有另一个线程正在等待获取任务。cachedThreadPool在当前所有线程都忙碌的时候,会创建一个新的线程,等待任务队列中的任务

public static ExecutorService newCachedThreadPool() {  
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                  60L, TimeUnit.SECONDS,  
                                  new SynchronousQueue<Runnable>());  
}

创建线程池最佳实践

  • 一个线程池只执行一类任务,不同类型的任务使用不同的线程池
  • 确定最佳的线程数,定义$N_{cpu}$为计算机逻辑处理器数,$U_{cpu}$为处理器的目标使用率,$\cfrac WC$为等待时间与计算时间的比率
    • 计算密集型任务的最佳线程数为$N_{cpu}+1$(多出的一个线程确保当某一线程偶尔由于页缺失故障或者其他原因而暂停时,CPU的时钟周期不会被浪费)
    • I/O密集型任务的最佳线程数为$N_{cpu} \cdot U_{cpu} \cdot (1 + \cfrac WC)$,假如$U_{cpu}$为1,$\cfrac WC$为1(即有50%的等待时间),则最佳线程数为$2N_{cpu}$
    • 程序中可通过Runtime.getRuntime().availableProcessors()获取当前计算机的逻辑处理器数
  • 线程需要指定有意义的名称,通过自定义的ThreadFactory实现
  • 任务队列使用有界队列

线程名称指定

自定义ThreadFactory

public class MyThreadFactory implements ThreadFactory {
    private final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
    private final AtomicInteger threadNumber = new AtomicInteger(1);

    public Thread newThread(Runnable r) {
        Thread thread = defaultThreadFactory.newThread();
        thread.setName("thread name -" + threadNumber.getAndIncrement());
    }
}

使用guava

import com.google.common.util.concurrent.ThreadFactoryBuilder;

ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
threadFactoryBuilder.setNameFormat( "thread name -%d");
ThreadFactory threadFactory = threadFactoryBuilder.build();

向线程池提交任务

// Runnable
threadPoolExecutor.execute(() -> System.out.println(Thread.currentThread().getName()));
threadPoolExecutor.submit(() -> System.out.println(Thread.currentThread().getName()));
// Callable
threadPoolExecutor.submit(() -> "abc");

Spring线程池

Spring线程池核心类为org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor,设置的参数与ThreadPoolExecutor基本一致

创建线程池

ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setQueueCapacity(100);
threadPoolTaskExecutor.setKeepAliveSeconds(10);
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskExecutor.setThreadNamePrefix("thread name -");

向线程池提交任务

与Java原生线程池使用一致

// Runnable
threadPoolTaskExecutor.execute(() -> System.out.println(Thread.currentThread().getName()));
threadPoolTaskExecutor.submit(() -> System.out.println(Thread.currentThread().getName()));
// Callable
threadPoolTaskExecutor.submit(() -> "abc");

Spring Boot自动配置线程池

从自动配置类org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration可以看到,当类路径中存在ThreadPoolTaskExecutor这个类,且没有注册Executor类型的bean时,会自动注册一个ThreadPoolTaskExecutor

@Lazy  
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,  
      AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })  
@ConditionalOnMissingBean(Executor.class)  
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {  
   return builder.build();  
}

配置属性类org.springframework.boot.autoconfigure.task.TaskExecutionProperties中,有如下默认属性

private String threadNamePrefix = "task-";
private int queueCapacity = Integer.MAX_VALUE;
private int coreSize = 8;  
private int maxSize = Integer.MAX_VALUE;  
private boolean allowCoreThreadTimeout = true;  
private Duration keepAlive = Duration.ofSeconds(60);
private boolean awaitTermination;
private Duration awaitTerminationPeriod;

值得注意的是allowCoreThreadTimeout默认为true,表示线程池线程数小于等于corePoolSize时,空闲时间到达keepAliveTime的线程也将被回收

可通过application.yml修改配置属性

spring:  
  task:  
    execution:  
      pool:  
        core-size: 10  
        max-size: 20  
        queue-capacity: 2000  
        keep-alive: 60s  
  
      thread-name-prefix: doubucket-item-

ThreadPoolTaskExecutor的父类org.springframework.scheduling.concurrent.ExecutorConfigurationSupport可以看到默认的饱和策略为AbortPolicy,饱和策略无法通过application.yml修改

private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();

开启注解使用模式

Spring Boot启动类上加上注解@EnableAsync

@SpringBootApplication
@EnableAsync
public class TestApplication {  
    public static void main(String[] args) {  
        SpringApplication.run(DoubucketItemsApplication.class, args);  
    }  
}

在需要使用线程池执行的方法上加上注解@Async

@Service 
public class TestService {
    @Async 
    public void test() { 
        System.out.println("hello");
    }
}