Android并发工具

HandlerThread、AsyncTask、IntentService

Posted by candy1126xx on May 24, 2017

HandlerThread

原理

HandlerThread继承自Thread,在其执行体内会创建一个消息队列。

public void run() {
    // 创建MessageQueue
    Looper.prepare();
    synchronized (this) {
        mLooper = Looper.myLooper();
        // 与getLooper()中的wait()对应
        notifyAll();
    }
    Process.setThreadPriority(mPriority);
    // 子类可重写以在MessageQueue开始运转前做些什么
    onLooperPrepared();
    // 使MessageQueue运转起来
    Looper.loop();
}

若要创建向这个消息队列发送消息的Handler,需要先获取它的Looper。

public Looper getLooper() {
    // 没有调用过start()则返回null
    if (!isAlive()) {
        return null;
    }
        
    // 调用过start()而mLooper还未创建,则阻塞直到创建好
    synchronized (this) {
        while (isAlive() && mLooper == null) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
    }
    return mLooper;
}

使用

// 创建HandlerThread
HandlerThread thread = new HandlerThread(“Thread1”);
thread.start();
// 获取Looper
Looper looper = thread.getLooper();
// 创建Handler
Handler handler = new Handler(looper){...};
// 发送信息
handler.sendMessage(...);

AsyncTask

原理

AsyncTask用于执行几秒种的后台任务。内部封装了ThreadPoolExecutor、FutureTask和Callable。

public AsyncTask() {
    // WorkerRunnable是Callable的实现类
    mWorker = new WorkerRunnable<Params, Result>() {
        public Result call() throws Exception {
            // mTaskInvoked是AtomicBoolean类型
            mTaskInvoked.set(true);
            Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
            // 子类重写doInBackground()以实现任务处理逻辑
            Result result = doInBackground(mParams);
            Binder.flushPendingCommands();
            // 这里获取的Handler是向主线程发送信息的
            Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                new AsyncTaskResult<Result>(this, result));
            message.sendToTarget();
            return result;
        }
    };

    mFuture = new FutureTask<Result>(mWorker) {
        @Override
        protected void done() {
            try {
                Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                    new AsyncTaskResult<Result>(this, get()));
                message.sendToTarget();
            } catch (InterruptedException e) {
                ...
            }
        }
    };
}

// 内部封装的线程池
private static final int CORE_POOL_SIZE = CPU_COUNT + 1;

private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;

private static final int KEEP_ALIVE = 1;

private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);
            
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    private final AtomicInteger mCount = new AtomicInteger(1);

    public Thread newThread(Runnable r) {
        return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
    }
};
public static final Executor THREAD_POOL_EXECUTOR
            = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
                    TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);

// 缓存队列。把THREAD_POOL_EXECUTOR包装一层以实现串行化。
private static class SerialExecutor implements Executor {
    // 缓存队列
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    // 正在执行的Runnable
    Runnable mActive;

    public synchronized void execute(final Runnable r) {
        // 添加到缓存队列
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }
    // 从缓存队列中取出并执行
    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}

// Handler处理MESSAGE_POST_RESULT的逻辑
// isCancelled()是一个AtomicBoolean类型的值
if (isCancelled()) {
   // 子类重写以实现任务取消时的逻辑
	onCancelled(result);
} else {
   // 子类重写以实现任务完成时的逻辑
   onPostExecute(result);
}

使用

static AsyncTask<...> task = new AsyncTask<>(){
   // 任务执行前
	onPreExecute(){...}
	// 任务执行中
	doInBackground(){...}
	// 任务进度回调。默认不会回调,必须在doInBackground()中主动调用publishProgress()
	onProgressUpdate(){...}
	// 任务执行完
	onPostExecute(){...}
	// 任务取消
	onCancelled(){...}
};
task.开始执行

其中“开始执行”的方法有:

  • get():不使用线程池
  • execute():使用内部封装的线程池串行执行
  • executeOnExecutor():使用用户指定的线程池或THREAD_POOL_EXECUTOR并行执行

IntentService

原理

普通的Service是运行在主线程的。IntentService继承自Service,内部封装了HandlerThread,使其工作在子线程。并且在执行完后会自动停止服务。

// 在onCreate()中创建HandlerThread
public void onCreate() {
    super.onCreate();
    HandlerThread thread = new HandlerThread("IntentService[" + mName + "]");
    thread.start();

    mServiceLooper = thread.getLooper();
    mServiceHandler = new ServiceHandler(mServiceLooper);
}

// 在onStart()中用Handler向工作线程发送信息
public void onStart(Intent intent, int startId) {
    Message msg = mServiceHandler.obtainMessage();
    msg.arg1 = startId;
    msg.obj = intent;
    mServiceHandler.sendMessage(msg);
}

// ServiceHandler中处理消息的逻辑
public void handleMessage(Message msg) {
    // 子类重写onHandleIntent()以实现任务逻辑
    onHandleIntent((Intent)msg.obj);
    // 任务执行完后自动停止服务
    stopSelf(msg.arg1);
}

使用

和普通服务相似。注意只能startService()。所以也无法通信。

如果要通信,考虑使用Service + Messenger + Handler的组合。