observeOn
还是先说observeOn
,直接看源码:
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
复制代码
这段代码我们上篇看到过,这里再重复一下。obsererOn
是切换下游观察者线程,我们看ObserveOnObserver
中的onNext
方法是如何切换线程的。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//ObserveOnObserver继承了runnable接口,意味着可以当做是线程任务来执行。这里代表着在新线程中执行run方法。
worker.schedule(this);
}
}
//ObserveOnObserver继承了runnable接口
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
····//省略一些判断的代码
v = q.poll();
//这里就可以看到,将下游的onNext方法,切换到新线程执行。
a.onNext(v);
}
···
}
}
}
复制代码
这是上游的处理器执行onNext
,传到这里,使用之前设置的线程执行下游的onNext
方法。
Worker
这个worker
到底是什么?我们先看schedule
r的createWorker
方法:
public abstract Worker createWorker();
复制代码
在Scheduler
类中,createWorker
只是一个接口,子类会重写这个方法,我们就以Schedulers.newThread()
这个方法创建的Scheduler
为例,来看看这里面的原理。
//Schedulers类中的newThread静态方法,这里的hock我们暂且不理,直接返回NEW_THREAD
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
//Schedulers类中定义了NEW_THREAD和其他THREAD
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
//NewThreadTask是Schedulers的静态内部类,继承自Callable接口,其中call方法返回一个Scheduler
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
//NewThreadHolder同样是一个静态内部类,里面只有一个静态参数DEFAULT,这里我们就找到了newThread方法返回的本尊NewThreadScheduler
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
复制代码
如上面代码和注释所示,我们直接看NewThreadScheduler
的源码:
/**
* Schedules work on a new thread.
*/
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
//这里Thread.MIN_PRIORITY为1,Thread.MAX_PRIORITY为10.Thread.NORM_PRIORITY为5.如果我们不做任何更改,这里的priority的值就为5.
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
复制代码
这里createWorker
方法返回的是一个NewThreadWorker
对象。我们总算找到了worker
的来源,需要注意这里的构造参数是threadFactory
。来看看NewThreadWorker
的源码。
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable run) {
return schedule(run, 0, null);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//hock机制
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//用一个ScheduledRunnable把传入的runnable包装一下,本质上没区别。
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
....//省略判断性代码
Future<?> f;
try {
if (delayTime <= 0) {
//executor由构造方法中创建
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
....
}
复制代码
这里我们就可以看到,前面调用worker.schedule(this)
,最终走到了executor.submit(sr)
。这里的sr
只是前面ObserveOnObserver
的包装。executor
在构造方法中创建。来看看executor
是什么:
//SchedulerPoolFactory类中的静态方法
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
return exec;
}
复制代码
//Executors类的静态方法
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
复制代码
OK,executor
是一个ScheduledThreadPoolExecutor
,标准的工作线程池。核心线程数为1,threadFactory
是前面NewThreadWorker
构造参数中的RxThreadFactory
。他会给thread
按照命名格式进行命名。
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
public RxThreadFactory(String prefix) {
this(prefix, Thread.NORM_PRIORITY, false);
}
public RxThreadFactory(String prefix, int priority) {
this(prefix, priority, false);
}
public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {
this.prefix = prefix;
this.priority = priority;
this.nonBlocking = nonBlocking;
}
@Override
public Thread newThread(Runnable r) {
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
String name = nameBuilder.toString();
Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
t.setPriority(priority);
t.setDaemon(true);
return t;
}
...
}
复制代码
总结一下:
observeOn
在subscribe
方法中,新建一个worker
对象。这个worker
对象是根据设置的scheduler
创建的。然后在新建一个ObserveOnObserver
对象,将上游与之订阅。- 在
ObserveOnObserver
的onNext
方法中,会调用worker.schedule(this)
,将本身作为runnable
传入到worker
中。 - 以
newThreadScheduler
为例,他创建的worker
是一个NewThreadWorker
实例。在实例构造方法中,会根据传入的threadFactory
新建一个ScheduledThreadPool
线程池。 NewThreadWorker
的shedule
方法,就是将ObserveOnObserver
作为一个runnable
放在一个新的线程池中执行。ObserveOnObserver
的run
方法,就是用来执行下游的onNext
,将数据传输下去。从而达到了,切换下游onNext
线程的目的。
subscribeOn
subscribeOn
是用来切换上游发射器线程。切换原理上一篇有说过,其中线程池相关跟上面observeOn
差不多,这里就不赘述了。
总结
上面就是rxjava2
线程切换原理分析了,后面再有人面试问你rxjava2
里面的线程池是哪一种,你就可以自信的说出:ScheduledThreadPool
。
最后贴出我做的一张类图:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)