ThreadPoolExecutor 的使用


2/6/2016 JavaSE Concurrent

使用线程池主要为了解决一下几个问题:

  • 通过重用线程池中的线程, 来减少每个线程创建和销毁的性能开销.
  • 对线程进行一些维护和管理, 比如定时开始, 周期执行, 并发数控制等等.

Executor

Executor是一个接口, 跟线程池有关的基本都要跟他打交道. 下面是常用的ThreadPoolExecutor的关系.

Executor接口很简单, 只有一个execute方法.

ExecutorService是Executor的子接口, 增加了一些常用的对线程的控制方法, 之后使用线程池主要也是使用这些方法.

AbstractExecutorService是一个抽象类. ThreadPoolExecutor就是实现了这个类.

ThreadPoolExecutor

构造方法

ThreadPoolExecutor是线程池的真正实现, 他通过构造方法的一系列参数, 来构成不同配置的线程池. 常用的构造方法有下面四个:

ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue) 
1
2
3
4
5
ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory)
1
2
3
4
5
6
ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        RejectedExecutionHandler handler)
1
2
3
4
5
6
ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler)
1
2
3
4
5
6
7

构造方法参数说明

  • corePoolSize

    核心线程数, 默认情况下核心线程会一直存活, 即使处于闲置状态也不会受存keepAliveTime限制. 除非将allowCoreThreadTimeOut设置为true.

  • maximumPoolSize

    线程池所能容纳的最大线程数. 超过这个数的线程将被阻塞. 当任务队列为没有设置大小的LinkedBlockingDeque时, 这个值无效.

  • keepAliveTime

    非核心线程的闲置超时时间, 超过这个时间就会被回收.

  • unit

    指定keepAliveTime的单位, 如TimeUnit.SECONDS. 当将allowCoreThreadTimeOut设置为true时对corePoolSize生效.

  • workQueue

    线程池中的任务队列.

    常用的有三种队列, SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue.

  • threadFactory

    线程工厂, 提供创建新线程的功能. ThreadFactory是一个接口, 只有一个方法

    public interface ThreadFactory {
      Thread newThread(Runnable r);
    }
    
    1
    2
    3

    通过线程工厂可以对线程的一些属性进行定制.

    默认的工厂:

    static class DefaultThreadFactory implements ThreadFactory {
      private static final AtomicInteger poolNumber = new AtomicInteger(1);
      private final ThreadGroup group;
      private final AtomicInteger threadNumber = new AtomicInteger(1);
      private final String namePrefix;
    
      DefaultThreadFactory() {
          SecurityManager var1 = System.getSecurityManager();
          this.group = var1 != null?var1.getThreadGroup():Thread.currentThread().getThreadGroup();
          this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
      }
    
      public Thread newThread(Runnable var1) {
          Thread var2 = new Thread(this.group, var1, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
          if(var2.isDaemon()) {
              var2.setDaemon(false);
          }
    
          if(var2.getPriority() != 5) {
              var2.setPriority(5);
          }
    
          return var2;
      }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
  • RejectedExecutionHandler

    RejectedExecutionHandler也是一个接口, 只有一个方法

    public interface RejectedExecutionHandler {
      void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
    }
    
    1
    2
    3

    当线程池中的资源已经全部使用, 添加新线程被拒绝时, 会调用RejectedExecutionHandler的rejectedExecution方法.

线程池规则

线程池的线程执行规则跟任务队列有很大的关系.

  • 下面都假设任务队列没有大小限制:

    1. 如果线程数量核心线程数, 但核心线程数, 但核心线程数, 并且>最大线程数, 当任务队列是LinkedBlockingDeque, 会将超过核心线程的任务放在任务队列中排队. 也就是当任务队列是LinkedBlockingDeque并且没有大小限制时, 线程池的最大线程数设置是无效的, 他的线程数最多不会超过核心线程数.
    2. 如果线程数量>核心线程数, 并且>最大线程数, 当任务队列是SynchronousQueue的时候, 会因为线程池拒绝添加任务而抛出异常.
  • 任务队列大小有限时

    1. 当LinkedBlockingDeque塞满时, 新增的任务会直接创建新线程来执行, 当创建的线程数量超过最大线程数量时会抛异常.
    2. SynchronousQueue没有数量限制. 因为他根本不保持这些任务, 而是直接交给线程池去执行. 当任务数量超过最大线程数时会直接抛异常.

规则验证

前提

所有的任务都是下面这样的, 睡眠两秒后打印一行日志:

Runnable myRunnable = new Runnable() {
    @Override
    public void run() {
        try {
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + " run");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
};
1
2
3
4
5
6
7
8
9
10
11
12

所有验证过程都是下面这样, 先执行三个, 再执行三个, 8秒后, 各看一次信息

executor.execute(myRunnable);
executor.execute(myRunnable);
executor.execute(myRunnable);
System.out.println("---先开三个---");
System.out.println("核心线程数" + executor.getCorePoolSize());
System.out.println("线程池数" + executor.getPoolSize());
System.out.println("队列任务数" + executor.getQueue().size());
executor.execute(myRunnable);
executor.execute(myRunnable);
executor.execute(myRunnable);
System.out.println("---再开三个---");
System.out.println("核心线程数" + executor.getCorePoolSize());
System.out.println("线程池数" + executor.getPoolSize());
System.out.println("队列任务数" + executor.getQueue().size());
Thread.sleep(8000);
System.out.println("----8秒之后----");
System.out.println("核心线程数" + executor.getCorePoolSize());
System.out.println("线程池数" + executor.getPoolSize());
System.out.println("队列任务数" + executor.getQueue().size());
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

验证1

  1. 核心线程数为6, 最大线程数为10. 超时时间为5秒

    ThreadPoolExecutor executor = new ThreadPoolExecutor(6, 10, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    
    1
    ---先开三个---
    核心线程数6
    线程池线程数3
    队列任务数0
    ---再开三个---
    核心线程数6
    线程池线程数6
    队列任务数0
    pool-1-thread-1 run
    pool-1-thread-6 run
    pool-1-thread-5 run
    pool-1-thread-3 run
    pool-1-thread-4 run
    pool-1-thread-2 run
    ----8秒之后----
    核心线程数6
    线程池线程数6
    队列任务数0
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18

可以看到每个任务都是是直接启动一个核心线程来执行任务, 一共创建了6个线程, 不会放入队列中. 8秒后线程池还是6个线程, 核心线程默认情况下不会被回收, 不收超时时间限制.

验证2

  1. 核心线程数为3, 最大线程数为6. 超时时间为5秒,队列是LinkedBlockingDeque

    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
    
    1
    ---先开三个---
    核心线程数3
    线程池线程数3
    队列任务数0
    ---再开三个---
    核心线程数3
    线程池线程数3
    队列任务数3
    pool-1-thread-3 run
    pool-1-thread-1 run
    pool-1-thread-2 run
    pool-1-thread-3 run
    pool-1-thread-1 run
    pool-1-thread-2 run
    ----8秒之后----
    核心线程数3
    线程池线程数3
    队列任务数0
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18

    当任务数超过核心线程数时, 会将超出的任务放在队列中, 只会创建3个线程重复利用.

验证3

  1. 核心线程数为3, 最大线程数为6. 超时时间为5秒,队列是SynchronousQueue
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
1
---先开三个---
核心线程数3
线程池线程数3
队列任务数0
---再开三个---
核心线程数3
线程池线程数6
队列任务数0
pool-1-thread-2 run
pool-1-thread-3 run
pool-1-thread-6 run
pool-1-thread-4 run
pool-1-thread-5 run
pool-1-thread-1 run
----8秒之后----
核心线程数3
线程池线程数3
队列任务数0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

当队列是SynchronousQueue时, 超出核心线程的任务会创建新的线程来执行, 看到一共有6个线程. 但是这些线程是费核心线程, 收超时时间限制, 在任务完成后限制超过5秒就会被回收. 所以最后看到线程池还是只有三个线程.

验证4

  1. 核心线程数是3, 最大线程数是4, 队列是LinkedBlockingDeque

    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
    
    1
---先开三个---
核心线程数3
线程池线程数3
队列任务数0
---再开三个---
核心线程数3
线程池线程数3
队列任务数3
pool-1-thread-3 run
pool-1-thread-1 run
pool-1-thread-2 run
pool-1-thread-3 run
pool-1-thread-1 run
pool-1-thread-2 run
----8秒之后----
核心线程数3
线程池线程数3
队列任务数0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

LinkedBlockingDeque根本不受最大线程数影响.

但是当LinkedBlockingDeque有大小限制时就会受最大线程数影响了

4.1 比如下面, 将队列大小设置为2.

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(2));
1
---先开三个---
核心线程数3
线程池线程数3
队列任务数0
---再开三个---
核心线程数3
线程池线程数4
队列任务数2
pool-1-thread-2 run
pool-1-thread-1 run
pool-1-thread-4 run
pool-1-thread-3 run
pool-1-thread-1 run
pool-1-thread-2 run
----8秒之后----
核心线程数3
线程池线程数3
队列任务数0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

首先为三个任务开启了三个核心线程1, 2, 3, 然后第四个任务和第五个任务加入到队列中, 第六个任务因为队列满了, 就直接创建一个新线程4, 这是一共有四个线程, 没有超过最大线程数. 8秒后, 非核心线程收超时时间影响回收了, 因此线程池只剩3个线程了.

4.2 将队列大小设置为1

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(1));
1
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.sunlinlin.threaddemo.Main$1@677327b6 rejected from java.util.concurrent.ThreadPoolExecutor@14ae5a5[Running, pool size = 4, active threads = 4, queued tasks = 1, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at com.sunlinlin.threaddemo.Main.main(Main.java:35)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
---先开三个---
核心线程数3
线程池线程数3
队列任务数0
pool-1-thread-1 run
pool-1-thread-2 run
pool-1-thread-3 run
pool-1-thread-4 run
pool-1-thread-1 run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

直接出错在第6个execute方法上. 因为核心线程是3个, 当加入第四个任务的时候, 就把第四个放在队列中. 加入第五个任务时, 因为队列满了, 就创建新线程执行, 创建了线程4. 当加入第六个线程时, 也会尝试创建线程, 但是因为已经达到了线程池最大线程数, 所以直接抛异常了.

验证5

  1. 核心线程数是3 , 最大线程数是4, 队列是SynchronousQueue

    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    
    1
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.sunlinlin.threaddemo.Main$1@14ae5a5 rejected from java.util.concurrent.ThreadPoolExecutor@7f31245a[Running, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at com.sunlinlin.threaddemo.Main.main(Main.java:34)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
---先开三个---
核心线程数3
线程池线程数3
队列任务数0
pool-1-thread-2 run
pool-1-thread-3 run
pool-1-thread-4 run
pool-1-thread-1 run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

这次在添加第五个任务时就报错了, 因为SynchronousQueue各奔不保存任务, 收到一个任务就去创建新线程. 所以第五个就会抛异常了.

Last Updated: 7/3/2019, 6:17:56 PM