Resources:
- What's different in 2.0
- Reactive Streams Specification for the JVM
- Droidcon NYC 2016 - Looking Ahead to RxJava 2
- GOTO 2016 - Exploring RxJava 2 for Android
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface Subscription {
public void request(long n);
public void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}Callbacks onSubscribe, onNext, onError, onComplete are not allowed to throw an exception. They must return normally except for throwing the NullPointerException for a null parameter (but RxJava 2 does not let null in anyway). RxJava 2 has a wrapper which handles an exceptions automatically (SafeSubscriber or SafeObserver).
onSubscribe onNext* (onError | onComplete)?
Flowableis an implementation of Reactive StreamsPublisher.- Observed with Reactive Streams
Subscriber. - Produces
Subscriptionon subscribe. FlowableProcessoris an implementation of Reactive StreamsProcessorwith backpressure support akaPublisherviaFlowable+Subscriber.
Observableis an implementation ofObservableSource.- Observed with an
Observer. - Produces
Disposableon subscribe. Subjectis an implementation ofObservable+Observer.
It's a "reactive scalar".
onSubscribe (onSuccess | onError)?
It's a "reactive runnable".
onSubscribe (onComplete | onError)?
It's a "reactive optional".
onSubscribe (onSuccess | onError | onComplete)?
How to subscribe:
observable.subscribe(new Observer<Object>() {
@Override public void onSubscribe(Disposable d) { ... }
@Override public void onNext(Object o) { ... }
@Override public void onError(Throwable t) { ... }
@Override public void onComplete() { ... }
});Better way:
Disposable disposable = observable.subscribeWith(new DisposableObserver<Object>() {
@Override public void onNext(Object o) { ... }
@Override public void onError(Throwable t) { ... }
@Override public void onComplete() { ... }
});
disposable.dispose();You can subscribe with DisposableObserver, DisposableSingleObserver, DisposableCompletableObserver, DisposableMaybeObserver, DisposableSubscriber. Method subscribeWith returns Disposable for all these observers/subscribers.
How to dispose all:
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(disposable);
disposables.dispose();All functional interfaces define throws Exception.
Flowable.just("file.txt")
.map(name -> Files.readLines(name))
.subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);If the file doesn't exist, the end consumer will print out IOException directly. Also the Files.readLines() is invoked without try-catch.
- Observable with backpressure -> Flowable
- Subject with backpressure -> FlowableProcessor
- Observable.Transformer -> ObservableTransformer
- Subscription without backpressure -> Disposable
- CompositeSubscription -> CompositeDisposable
- Subscriber without backpressure -> Observer | DisposableObserver
- Subscriber with backpressure -> Subscriber | DisposableSubscriber
- Subscriber.onCompleted -> Observer.onComplete | Subscriber.onComplete
- Action0 -> Action
- Action1 -> Consumer
- Action2 -> BiConsumer
- Observable.fromEmitter -> Observable.create
- Observable.doOnCompleted -> Observable.doOnComplete
- Observable.doOnUnsubscribe -> Observable.doOnDispose | Flowable.doOnCancel