本文共 10357 字,大约阅读时间需要 34 分钟。
上个章节我们了解到了线程池类ThreadPoolExecutor,这次我们来看一个类:Executors,那么他是干嘛的呢?
Executors可以当做一个创建ThreadPoolExecutor线程池的一个工厂类,他提供了一系列的静态方法,用来创建ThreadPoolExecutor线程池。
老规矩,先看源码,源码注释太多,这次来个截图:
如图所示,我们重点来关注红色框框内的几种类型的线程池。1)newCachedThreadPool:缓存线程池
源码:public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue (), threadFactory);}说明:1:第一个方法使用默认的线程工厂:DefaultThreadFactory2:第二个方法使用自定义传入的线程工厂3:核心线程数为0,即根据需要创建线程,最大值为Inter.MAX_VALUE4:空闲超时为60秒,超过60秒空闲的线程就会从线程池中删除5:使用SynchronousQueue队列,空队列,来多少任务,创建多少线程,可能导致cpu占满
2)newFixedThreadPool:固定大小线程池
源码:public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue (), threadFactory);}说明:1:第一个方法使用默认的线程工厂:DefaultThreadFactory2:第二个方法使用自定义传入的线程工厂3:核心线程数等于最大线程数,只能创建固定数量的线程池,所有线程均为核心线程4:KeepAliveTime=0,即该线程池中中的所有线程一直处于存活5:当添加一个新任务,如果所有线程都处于工作状态,那么就会新建一个新的线程来执行该任务;如果新建失败,那么将该任务添加到阻塞队列中进行等待6:使用LinkedBlockingQueue阻塞队列,这个不多解释,有序队列
3)newScheduledThreadPool:固定大小线程池,带延迟执行和定时执行
源码:public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }说明:1:第一个方法使用默认的线程工厂:DefaultThreadFactory2:第二个方法使用自定义传入的线程工厂3:返回值为ScheduledExecutorService,他也是继承了ExecutorService接口4:看一下new ScheduledThreadPoolExecutor(corePoolSize)的源码:public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);}KeepAliveTime=0,根据需要创建线程且线程始终处于存活状态5:使用DelayedWorkQueue作为阻塞队列
4)newSingleThreadExecutor:单线程线程池
源码:public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue (), threadFactory)); } 说明:1:第一个方法使用默认的线程工厂:DefaultThreadFactory2:第二个方法使用自定义传入的线程工厂3:核心线程数和最大线程数都为1,根据需要创建一个线程4:KeepAliveTime=0,该单线程一直存活,执行队列中的任务5:使用LinkedBlockingQueue作为阻塞队列
5)newSingleThreadScheduledExecutor:单线程线程池,带延迟执行和定时执行
源码:public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); }说明:1:第一个方法使用默认的线程工厂:DefaultThreadFactory2:第二个方法使用自定义传入的线程工厂3:该线程池和上面介绍的**newScheduledThreadPool**很类似,唯一的区别就是这里是只能创建1个单线程来执行任务4:该单线程被创建一直存活5:当线程发生异常终止,会另外创建1个线程来代替执行新的任务
针对线程池创建,来总结一下:
(到了不爱画图环节)1)execute
该方法是顶层接口Executor接口的唯一接口方法,代表执行一个任务。public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command);}
他被在很多类型的ExecutorService中被实现和间接调用,该方法源码是线程池的核心方法,整个线程池的源码分析也是从整个方法的入口开始的,源码分析我们在后面一篇文章中阐述,这里只说他的作用;
execute方法的作用是执行一个新的任务,参数为Runnable接口,方法返回为void。2)submit
该方法代表提交一个任务,我们来看一下源码:/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFutureftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, result); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask; }
该方法重载了3种,这里我们不细说他们的区别,我们只说他和上面execute方法的区别。
submit方法他是有返回值的,并且参数可以为Runnable,也可以是Callable。 我们就要来说一下Runnable和Callable的区别: 1)Runnable 源码:@FunctionalInterfacepublic interface Runnable { /** * When an object implementing interfaceRunnable
is used * to create a thread, starting the thread causes the object's *run
method to be called in that separately executing * thread. ** The general contract of the method
run
is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ public abstract void run();}说明:1:Runnable提供run方法,该方法是一个抽象方法,没有返回值2:无法thorws异常3:可以由线程单独调用Thread.run();
2)Callable
源码:@FunctionalInterfacepublic interface Callable{ /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception;}说明:1:Callable提供的是call方法,该方法有返回值2:可以抛出异常3:无法由线程Thread单独执行,必须由线程池来发起调用
3)Future和FutureTask的用法和区别
1.Future 1.1:它是一个接口,提供了一个get方法。public interface Future{ V get() throws InterruptedException, ExecutionException;}
1.2:future只能用来接收线程池执行任务的结果,无法当做参数交给线程池执行。
2.FutureTask
2.1:它是一个类,实现了Runnable接口和Future接口。public class FutureTaskimplements RunnableFuture { }RunnableFuture源码:public interface RunnableFuture extends Runnable, Future { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run();}
2.2:futureTask既可以用来接收线程池执行任务的结果,还可以传入Runnable或Callable子任务包装成FutureTask任务类,把这个类当做线程池的参数(因为它实现了Runnable接口)提交给线程池去执行。
代码示例:
package com.test;import org.omg.CORBA.PRIVATE_MEMBER;import java.util.concurrent.*;public class Test { public static void main(String[] args) { //创建一个单线程线程池 ExecutorService executorService = Executors.newSingleThreadExecutor(); //使用Future Futurefuture = executorService.submit(new Callable () { @Override public Integer call() throws Exception { return 666; } }); //使用FutureTask FutureTask futureTask = new FutureTask<>(new Callable () { @Override public Integer call() throws Exception { return 666; } }); executorService.submit(futureTask); //关闭线程池,使处于队列中的任务全部执行完毕 executorService.shutdown(); //获取线程执行结果 try { //使用future获取结果 int result = future.get(); //使用futureTask获取结果 int result2 = futureTask.get(); System.out.println(result); System.out.println(result2); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }}
看完这篇,相信你对于线程池的创建和分类用了不错的认识吧,纯干货。
提示:一般来说这几种只是默认的线程池创建方法,很多参数包括线程工厂都是固定的,在实际开发当中,一般还是使用ThreadPoolExecutor构造函数来构造线程池,并且线程工厂也可以自定义,这样方便日志跟踪,举个简单的例子:创建线程池:ExecutorService executor = new ThreadPoolExecutor(1, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(100), new MyThreadPoolFactory());自定义线程工厂:import java.util.concurrent.ThreadFactory;import java.util.concurrent.atomic.AtomicInteger;/** * 自定义线程工厂 * * Created by wujian on 2017/8/31. */public class MyThreadPoolFactoryimplements ThreadFactory { private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; public NamedAfterParentThreadPoolFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); String name = Thread.currentThread().getName(); namePrefix = name + "-我的线程-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) { t.setDaemon(false); } if (t.getPriority() != Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); } return t; }}
好了,准备下一篇:
转载地址:http://pikmi.baihongyu.com/