public class MyThread extends Thread {
public void run() {
System.out.println("关注一角钱技术,获取Java架构资料");
}
}
MyThread myThread1 = new MyThread();
MyThread myThread2 = new MyThread();
myThread1.start();
myThread2.start();
public void run() {
if (target != null) {
target.run();
}
}
实现Callable接口通过FutureTask包装器来创建Thread线程
Callable接口(也只有一个方法)定义如下:
public interface Callable {
V call() throws Exception;
}
//Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容
public class SomeCallable implements Callable {
// 相对于run方法的带有返回值的call方法
@Override
public V call() throws Exception {
// TODO Auto-generated method stub
return null;
}
}
Callable oneCallable = new SomeCallable();
//由Callable创建一个FutureTask对象:
FutureTask oneTask = new FutureTask(oneCallable);
//注释:FutureTask是一个包装器,它通过接受Callable来创建,它同时实现了Future和Runnable接口。
//由FutureTask创建一个Thread对象:
Thread oneThread = new Thread(oneTask);
oneThread.start();
//至此,一个线程就创建完成了。
*/
public class MyThread {
public static void main(String[] args) throws ExecutionException,
InterruptedException {
System.out.println(("----程序开始运行----"));
Date date1 = new Date();
int taskSize = 5;
// 创建一个线程池
ExecutorService pool = Executors.newFixedThreadPool(taskSize);
// 创建多个有返回值的任务
List list = new ArrayList();
for (int i = 0; i >>" + f.get().toString());
list.add(f);
}
// 关闭线程池
pool.shutdown();
// 获取所有并发任务的运行结果
for (Future f : list) {
// 从Future对象上获取任务的返回值,并输出到控制台
System.out.println(">>>" + f.get().toString());
}
Date date2 = new Date();
System.out.println("----程序结束运行----,程序运行时间【"
+ (date2.getTime() - date1.getTime()) + "毫秒】");
}
}
class MyCallable implements Callable
public abstract class AbstractExecutorService implements ExecutorService {
//此方法很简单就是对runnable保证,将其包装为一个FutureTask
protected RunnableFuture newTaskFor(Runnable runnable, T value) {
return new FutureTask(runnable, value);
}
//包装callable为FutureTask
//FutureTask其实就是对Callable的一个封装
protected RunnableFuture newTaskFor(Callable callable) {
return new FutureTask(callable);
}
//提交一个Runnable类型的任务
public Future> submit(Runnable task) {
//如果为null则抛出NPE
if (task == null) throw new NullPointerException();
//包装任务为一个Future
RunnableFuture ftask = newTaskFor(task, null);
//将任务丢给执行器,而此处会抛出拒绝异常,在讲述ThreadPoolExecutor的时候有讲述,不记得的读者可以去再看看
execute(ftask);
return ftask;
}
//与上方方法相同只不过指定了返回结果
public Future submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
//与上方方法相同只是换成了callable
public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
//执行集合tasks结果是最后一个执行结束的任务结果
//可以设置超时 timed为true并且nanos是未来的一个时间
//任何一个任务完成都将会返回结果
private T doInvokeAny(Collection extends Callable> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
//传入的任务集合不能为null
if (tasks == null)
throw new NullPointerException();
//传入的任务数不能是0
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
//满足上面的校验后将任务分装到一个ArrayList中
ArrayList> futures = new ArrayList>(ntasks);
//并且创建一个执行器传入this
//这里简单讲述他的执行原理,传入this会使用传入的this(类型为Executor)作为执行器用于执行任务,当submit提交任务的时候回将任务
//封装为一个内部的Future并且重写他的done而此方法就是在future完成的时候调用的,而他的写法则是将当前完成的future添加到esc
//维护的结果队列中
ExecutorCompletionService ecs =
new ExecutorCompletionService(this);
try {
//创建一个执行异常,以便后面抛出
ExecutionException ee = null;
//如果开启了超时则计算死线时间如果时间是0则代表没有开启执行超时
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//获取任务的迭代器
Iterator extends Callable> it = tasks.iterator();
//先获取迭代器中的第一个任务提交给前面创建的ecs执行器
futures.add(ecs.submit(it.next()));
//前面记录的任务数减一
--ntasks;
//当前激活数为1
int active = 1;
//进入死循环
for (;;) {
//获取刚才提价的任务是否完成如果完成则f不是null否则为null
Future f = ecs.poll();
//如果为null则代表任务还在继续
if (f == null) {
//如果当前任务大于0 说明除了刚才的任务还有别的任务存在
if (ntasks > 0) {
//则任务数减一
--ntasks;
//并且再次提交新的任务
futures.add(ecs.submit(it.next()));
//当前的存活的执行任务加一
++active;
}
//如果当前存活任务数是0则代表没有任务在执行了从而跳出循环
else if (active == 0)
break;
//如果当前任务执行设置了超时时间
else if (timed) {
//则设置指定的超时时间获取
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
//等待执行超时还没有获取到则抛出超时异常
if (f == null)
throw new TimeoutException();
//否则使用当前时间计算剩下的超时时间用于下一个循环使用
nanos = deadline - System.nanoTime();
}
//如果没有设置超时则直接获取任务
else
f = ecs.take();
}
//如果获取到了任务结果f!=null
if (f != null) {
//激活数减一
--active;
try {
//返回获取到的结果
return f.get();
//如果获取结果出错则包装异常
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
//如果异常不是null则抛出如果是则创建一个
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
//其他任务则设置取消
for (int i = 0, size = futures.size(); i T invokeAny(Collection extends Callable> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
//对上方法的封装
public T invokeAny(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
//相对于上一个方法执行成功任何一个则返回结果而此方法是全部执行完然后统一返回结果
public List> invokeAll(Collection extends Callable> tasks)
throws InterruptedException {
//传入的任务集合不能是null
if (tasks == null)
throw new NullPointerException();
//创建一个集合用来保存获取到的执行future
ArrayList> futures = new ArrayList>(tasks.size());
//任务是否执行完成
boolean done = false;
try {
//遍历传入的任务并且调用执行方法将创建的future添加到集合中
for (Callable t : tasks) {
RunnableFuture f = newTaskFor(t);
futures.add(f);
execute(f);
}
//遍历获取到的future
for (int i = 0, size = futures.size(); i f = futures.get(i);
//如果当前任务没有成功则进行f.get方法等待此方法执行成功,如果方法执行异常或者被取消将忽略异常
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
//到这一步则代表所有的任务都已经有了确切的结果
done = true;
//返回任务结果集合
return futures;
} finally {
//如果不是true是false 则代表执行过程中被中断了则需要对任务进行取消操作,如果正常完成则不会被取消
if (!done)
for (int i = 0, size = futures.size(); i List> invokeAll(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
//计算设置时长的纳秒时间
long nanos = unit.toNanos(timeout);
ArrayList> futures = new ArrayList>(tasks.size());
boolean done = false;
try {
for (Callable t : tasks)
futures.add(newTaskFor(t));
//计算最终计算的确切时间点,运行时长不能超过此时间也就是时间死线
//这里是个细节future创建的时间并没有算作执行时间
final long deadline = System.nanoTime() + nanos;
//获取当前结果数
final int size = futures.size();
//遍历将任务进行执行
for (int i = 0; i f = futures.get(i);
if (!f.isDone()) {
//如果在等待过程中已经超时则返回当前等待结合
if (nanos
线程池的具体实现
ThreadPoolExecutor 默认线程池
ScheduledThreadPoolExecutor 定时线程池 (下篇再做介绍)
ThreadPoolExecutor
线程池重点属性
//用来标记线程池状态(高3位),线程个数(低29位)
//默认是RUNNING状态,线程个数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程个数掩码位数,并不是所有平台int类型是32位,所以准确说是具体平台下Integer的二进制位数-3后的剩余位数才是线程的个数,
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程最大个数(低29位)000 11111111111111111111111111111
private static final int CAPACITY = (1
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
线程池监控
public long getTaskCount() //线程池已执行与未执行的任务总数
public long getCompletedTaskCount() //已完成的任务数
public int getPoolSize() //线程池当前的线程数
public int getActiveCount() //线程池中正在执行任务的线程数量
线程池原理
核心方法分析
execute方法
execute 方法是提交任务 command 到线程池进行执行
public void execute(Runnable command) {
//如果任务为null,则抛出NPE异常
if (command == null)
throw new NullPointerException();
/*
* clt记录着runState和workerCount
*/
int c = ctl.get();
/*
* workerCountOf方法取出低29位的值,表示当前活动的线程数;
* 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;
* 并把任务添加到该线程中。
*/
if (workerCountOf(c) = corePoolSize并且workQueue已满。
* 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
* 如果失败则拒绝该任务
*/
else if (!addWorker(command, false))
reject(command);
}
public static Callable callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter(task, result);
}
RunnableAdapter类
/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter implements Callable {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
Callable callable(final PrivilegedAction action)
返回 Callable 对象,调用它时可运行给定特权的操作并返回其结果。
public static Callable callable(final PrivilegedAction> action) {
if (action == null)
throw new NullPointerException();
return new Callable() {
public Object call() { return action.run(); }
};
}
public static Callable privilegedCallable(Callable callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallable(callable);
}
public static Callable privilegedCallableUsingCurrentClassLoader(Callable callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallableUsingCurrentClassLoader(callable);
}
关于ThreadFactory的支持
根据需要创建新线程的对象。使用线程工厂就无需再手工编写对 new Thread 的调用了,从而允许应用程序使用特殊的线程子类、属性等等。
// 返回用于创建新线程的默认线程工厂。
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
// 返回用于创建新线程的线程工厂,这些新线程与当前线程具有相同的权限。
public static ThreadFactory privilegedThreadFactory() {
return new PrivilegedThreadFactory();
}
DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
从源码看出DefaultThreadFactory就是创建一个普通的线程,非守护线程,优先级为5。
新线程具有可通过 pool-N-thread-M 的 Thread.getName() 来访问的名称,其中 N 是此工厂的序列号, M 是此工厂所创建线程的序列号。
PrivilegedThreadFactory
static class PrivilegedThreadFactory extends DefaultThreadFactory {
private final ClassLoader ccl;
private final AccessControlContext acc;
PrivilegedThreadFactory() {
super();
this.ccl = Thread.currentThread().getContextClassLoader();
this.acc = AccessController.getContext();
acc.checkPermission(new RuntimePermission("setContextClassLoader"));
}
public Thread newThread(final Runnable r) {
return super.newThread(new Runnable() {
public void run() {
AccessController.doPrivileged(new PrivilegedAction() {
public Object run() {
Thread.currentThread().setContextClassLoader(ccl);
r.run();
return null;
}
}, acc);
}
});
}
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()));
}
//使用自己的线程工厂
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(),
threadFactory));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//使用自定义的线程工厂
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}