/ Android源码学习  

AsyncTask源码学习笔记

转载请附原文链接: AsyncTask 源码学习笔记

上一篇笔记分析了线程池的源码 ThreadPoolExecutor源码学习笔记

作为 Android 开发者不得不说说 Android 中的系统级异步框架 — AysncTask

几乎大部分 Android 开发者都用过AysncTask。这是一个专注于其他线程与 UI 线程通信的一个框架。官方的介绍:

AsyncTask enables proper and easy use of the UI thread. This class allows to
perform background operations and publish results on the UI thread without
having to manipulate threads and/or handlers.

本篇笔记依然会按照我的一贯风格从使用作为主线,捋一捋 AysncTask 的逻辑流程。

主要涉及到的类有(不包含内部类):

  • AsyncTask
  • Futuretask

AsyncTask使用

AsyncTask 是一个抽象类,在使用的时候需要继承这个类。

  • doInBackground — 必须实现,异步代码在这里面执行
  • onPostExecute(Result) — 在 UI 线程回调,传递数据
  • onProgressUpdate(Progress) — 任务执行时回调当前进度

以上是经常被重写的方法,AsyncTask 中其实在整个任务执行周期都有相应方法

  • onPreExecute — 执行前准备工作
  • onCancelled — 任务取消

AsyncTask 指定三个泛型,分别是异步任务所需要的参数,进度,和结果。通过 execute() 方法开始执行任务。

AsyncTask 的几种状态

AsyncTask 有一个内部类,用于标记当前状态

public enum Status {
/**
* Indicates that the task has not been executed yet.
*/
PENDING,//当前任务等待中
/**
* Indicates that the task is running.
*/
RUNNING,//当前任务执行中
/**
* Indicates that {@link AsyncTask#onPostExecute} has finished.
*/
FINISHED,//执行完毕
}

创建

十分简单的介绍了一下使用,准备进入源码的世界吧。按照一贯风格,先来看看 AsyncTask 在被创建的时候发生了什么事。首先来看几个比较重要的静态成员变量(常量)

// cpu核数
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
// 核心线程数 = cpu核数+1
private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
// 最大线程数 = cpu核数*2 +1
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
// 线程闲置超时时间 1s
private static final int KEEP_ALIVE = 1;
//当前状态为等待
private volatile Status mStatus = Status.PENDING;
// 任务队列
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
// 创建一个线程池
public static final Executor THREAD_POOL_EXECUTOR
= new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);

可以看到创建了一个线程池,指定了核心线程数,最大线程数,任务队列等等。到这儿应该要理解一件事情

  • 这里的线程池是一个全局静态,也就是所有的AsyncTask使用的其实是同一个线程池
  • AsyncTask 应该理解为一个单一的异步任务,本身并不维护线程池,事实上因为 execute() 方法的重载,严格来说应该理解为是一个异步任务队列的调度器

这里提一个内容外的问题,为什么核心线程数是 cpu 核数+1,而不是 cpu 核数?或者+2,+3等等。很惭愧,半路出家,对于这种问题我也没有特别好的理解,这里简单提一下线程数的设计吧

线程数与 CPU 核数的关系

cpu 的核数代表了同一时间能够同时运行的线程数,比如双核的 cpu 在同一个时间内是可以运行两个线程的,对于程序而言,如果 cpu 是多核的,而我们还在使用单线程编程,那么就有其他几个 cpu 是空闲的,这样对于资源是极大的浪费。

但如果线程数特别的多远超出 cpu 的核数,也会导致线程切换带来较大的开销,一样是不合理的。因此线程数的设计应该考虑在完全利用 cpu 资源的情况下减少线程的切换 。需要注意的是并不是所有的线程都是占用 CPU 资源的,这里就要讲到两个概念

  • CPU密集型 — 在程序中有大量处理逻辑运算的需求,这样的操作是需要占用 CPU 资源的
  • IO密集型 — 有大量的 IO 操作,这样的操作是不需要占用 CPU 资源的

假设有一个四核的 CPU ,我们创建四个线程,其中有三个线程在执行运算工作,还有一个在做 IO 操作,这个时候 CPU 有一个其实是空闲的,也就是浪费了 CPU 的资源。我忘了是在哪儿看到过一句话,即便是 CPU再密集,也会有那么一点 IO 操作,所以对应设计一个 IO 线程是很有必要的。

另外线程除了执行时间,还有等待时间,在等待时间里面,CPU 其实也是处于空闲状态的。有一个公式来设计线程数

线程数 = ()(任务执行时间+等待时间)/任务执行时间)*cpu核数

因为 AsyncTask 核心线程是需要长时间运行不会被回收,因此减少线程的切换开销,将数量设计为核数+1,也就不难理解了。而最大线程数可以根据上面的公式算出一个理论的最大值。

AsyncTask 的一个坑

我在上一篇文章 ThreadPoolExecutor源码学习笔记 中在讲 AbortPolicy 的时候说过,在 AysncTask 中引发了血崩,现在来看看是为什么。

在初始化的源码中可以看到,任务队列的容量设计是128个,在 API19 以前的 AsyncTask 设计的是并行,同时会有5个线程在工作。假设现在有一个多任务需求,很容易就超过了128个,猜猜会发生什么?

还记得在线程池中如果无法加入队列中的时候会调用 reject 方法,而这里创建的线程池并没有传入自己的 handler ,因此用的是默认的 handler — AbortPolicy ,这个类会直接抛一个运行时异常,于是就这样华丽丽的崩溃了。因此在之后修改了 AsyncTask 的策略,默认为串行,只有一个线程在运行,一次执行一个任务,具体实现在后面的源码中来看。

构造方法

前面说了许多题外话,现在才开始真正进入到源码中来~

public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
Result result = doInBackground(mParams);
Binder.flushPendingCommands();
return postResult(result);
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}

上面是 AsyncTask 的构造方法,看起来长长的代码,我一句注释也没写,因为现在看到的这段代码貌似没有任何的突破口,可以看到初始化了两个变量,mWorker 和 mFuture。注意这里的 Worker 就不是线程池里的那个啦~

private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}

WorkerRunnanble 是一个抽像类实现了 Callable 接口,这个接口中只有一个方法 call 返回指定的泛型对象,知道这个就可以啦。

创建的代码基本就告一段落了,后面我们还会回来~接下来看看任务的执行

任务执行

执行任务最先想到的就是 execute() 方法,这个方法有一个重载,先来看最常用的这个,也就是传入可变参数,注意这个方法只能在主线程调用,原因很简单,文章一开头就说了这个框架是为了与 UI 线程通信的

@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}

调用了 executeOnExecutor() 传入两个参数,第一从字面看是默认的任务执行器。

private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

SerialExecutor 是个什么鬼?这里打住,别着急点击去看,先把大体逻辑捋清楚,来看 executeOnExecutor(),这个方法同样也是需要在主线程中调用。代码稍微有点多,一句一句来看。

@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
// 前面提到过 AsyncTask 的状态,如果状态不是等待中,则根据状态的不同抛出不同的异常
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
// 状态标记为执行中
mStatus = Status.RUNNING;
// 调用准备方法
onPreExecute();
// 参数赋值给 mWorker
mWorker.mParams = params;
// 执行起开始执行,传入mFuture
exec.execute(mFuture);
return this;
}

首先判断了当前的状态,只有处于等待中的时候才会向下执行,也就是说 execute(params) 这个方法只应该被调用一次, 为什么把参数赋值给 mWorker 的成员变量后执行器使用的是 mFuture 呢?

还记得前面打住的那个地方,exec 就是 SerialExecutor,这会儿就可以看看它干了啥,做好心理准备,这段代码一开始我看的时候其实是拒绝的,对角懵逼。还是老方法,一句句来,先试试把逻辑理清。

该类实现了 Executor 接口,重写 execute 方法。参数是一个 Runnable 对象。

private static class SerialExecutor implements Executor {
// 任务队列
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
// 当前正在执行run方法的Runnable任务
Runnable mActive;
public synchronized void execute(final Runnable r) {
//拆开看,1.将一个Runnable对象插入到队列中去
// 2.这个Runnanble对象重写了 run方法,调用了 r.run()
// r 也就是从上一步中传入的 mFuture(按照上文的流程,实际按照传入的为准)
// 3.最终执行 scheduleNext()方法
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
// 判断如果当前正在执行的任务为空,调用scheduleNext方法
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
// 1.mActivite 从mTasks里面取出第一个任务,如果不为空,交给线程池处理
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}

这段代码不是那么好懂,再来捋一下逻辑,在执行 execute 的时候,

1.首先将任务添加到了队列中,这个队列是执行器的队列,并不是线程池的队列

2.接下来判断 mActive 是否为空,第一次执行时当然是空,就会执行 scheduleNext 方法

3.从队列中拿出第一个任务,并赋值给 mActive,如果不为空则交给线程池进行处理

4.当 mActive 所指向的这个 Runnable 执行完run方法,无论如何都会再次调用 scheduleNext 方法

通过这一套流程,实现了任务的串行

注意:这个默认的 Executor 也是全局静态,只要是使用默认的方法,就可以保证任务串行,线程池只有一个线程在运行

问题来了,doInBackground() 去哪儿了?别着急,我们从 mFuture 入手

FutureTask

mFuture 是一个 FutureTask 对象,实现了 RunnableFuture 接口,有一个方法 run(),先来看它的构造方法

public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

构造方法接收一个 callable 对象,并赋值给成员变量 callable,诶,想想 mFuture 初始化的地方,这里又得贴一次 AsyncTask 的构造方法代码,传入 mFuture 中的正是 mWorker 对象,而 doInBackground() 方法正是在 mWorker 的 call() 方法中调用的,看到这里先打住,这段代码我们后面还要再看(囧)~

public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
Result result = doInBackground(mParams);
Binder.flushPendingCommands();
return postResult(result);
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}

回到 FutureTask 中来看看 run() 中的代码,为了简洁,只贴部分核心代码

Callable<V> c = callable;// 获取到这个 callable 对象
if (c != null && state == NEW) { // 如果不为空,并且状态为新任务
V result;
boolean ran;
try {
result = c.call(); // 调用call方法获取到result
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
// 执行成功
if (ran)
set(result);
}

再追 set(result)

protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
// 将结果赋值给要输出的值
outcome = v;
U.putOrderedInt(this, STATE, NORMAL); // final state
// 结束
finishCompletion();
}
}
// 注意只贴了部分代码
private void finishCompletion() {
done();
callable = null; // to reduce footprint
}
protected void done() { }

最后调用了 done() 方法,还得看之前的构造方法中的代码,初始化时重写了 done() 方法

@Override
protected void done() {
try {
// 将结果发布出去
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}

又调用了一个方法 postResultIfNotInvoked(get()) ,如果我直接告诉你 get() 是获取到上面那个 outcome 你信么?。。。还是贴代码吧,严谨一点

public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

没骗你吧。。。FutureTask 中的任务就完成了,又要回到 AsyncTask 中去

消息传递

上一节 mFuture 把接力棒扔给了postResultIfNotInvoked() ,此时任务执行完毕,但还是在子线程中

private void postResultIfNotInvoked(Result result) {
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}

上面的代码出现了熟悉的对象,Message,既然有 Message 基本可以判断是使用 Handler 进行线程间通信

private static Handler getHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler();
}
return sHandler;
}
}
private static class InternalHandler extends Handler {
public InternalHandler() {
//绑定了住线程的 Looper
super(Looper.getMainLooper());
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}

看到熟悉的 onPostExecute() ,莫名的激动,这条路可算是走通了。别忘了还有一个 execute(Runnable) 方法的重载

execute(Runnable)

@MainThread
public static void execute(Runnable runnable) {
sDefaultExecutor.execute(runnable);
}

就这么点,没啥可说的。。。另外用 AsyncTask 用得早的同学还记得有一个 setDefaultExecutor() 的方法来指定默认执行器,这个方法现在也被 hide 了

/** @hide */
public static void setDefaultExecutor(Executor exec) {
sDefaultExecutor = exec;
}

有的同学说了那我就是想用自己的 Executor 怎么办?executeOnExecutor() 传入你自己的 Executor 就行。

总结

  • AsyncTask 只负责任务的调度和通信,其余工作由线程池来做
  • API19以后默认情况下 AsyncTask 任务是串行
  • AsyncTask 使用 Handler 进行通信

网上也有很多对 AsyncTask 的总结,今天的分析就结束了。希望在读源码的道路上越走越远~

如果你有不同的理解欢迎交流探讨,不喜请喷~欢迎来喷