After solving the summer murder mystery in the previous blog entry, we found that Observable
is a construct in RxJava that provides a powerful abstraction: composable sequences. It was also mentioned that every interaction with an existing codebase or framework can also be expressed as a sequence, also known as "making your code reactive". In this entry, we'll explore how to wrap your existing code into Observable
.
The Observable contract
An Observable
represents a sequence of 0-N values, that may or may not terminate, and allows the possibility of unexpected errors. This is well presented by the callback Observable
use, named Observer:
interface Observer {
void onNext(T t);
void onCompleted();
void onError(Throwable e);
}
onNext()
receives any amount of values, and either onCompleted()
or onError()
are called once if the sequence terminates or gets an unexpected error inside.
Once a sequence terminates or throws an error, no values will be received onNext()
.
Sequences are not required to terminate, i.e. view clicks, Bluetooth status, or gyroscopes.
If you've been following the series you'll see that this Observer
interface can be split into three separate generic Action1
and Action0
functions. Luckily every method that takes Observer
as a parameter has an override for one, two, and three separate functions.
Operators
Operators are the way we interact with sequences. Operators are a key part when developing in functional reactive style, and there are many of them. An operator can change the values inside a sequence, make it terminating or non-terminating, continue it on a different thread, aggregate its values into a collection, combine multiple sequences serially or in parallel, and many other descriptive ways of composing behavior into the sequence.
In the following series about the passive view and functional business logic I will be covering some of its usages and pitfalls in context. If you prefer an in-depth overview of operators I'll add some links at the end of this entry with some of my favourite RxJava tutorials.
Meanwhile, in this entry I'll focus on the central concepts you'll need to wrap existing APIs.
Converting existing code into a sequence
The RxJava team has been very smart about the way the sequences are created, and most of them are geared towards allowing you to interface with the parts of your app that are purely imperative, like the Android code to create a view, or that efficient compression algorithm you've worked on.
Observables
are generally not replacements for computations, but rather a way of composing them. While some of those heavy computations can be expressed with functional code with operators, it is often more performant to fall back to imperative code and use the Observable
as the response delivery.
The no-nos
In most RxJava starter series, Observables
are created by one of two constructors: create()
and defer()
. Those two are good when used with caution and mostly used internally by the RxJava team to create new features. They were also used extensively in the early days of RxJava.
As the usage of the library evolved they were seen as a liability because they require care to handle requirements like backpressure. So, the contributors added two new recommended constructors, fromCallable()
and fromEmitter()
(called fromAsync()
until release 1.1.10), that handle those concerns for you.
Terminating sequences (a.k.a. Cold Observables)
A terminating sequence is an Observable that emits 1-N values, then terminates. Remember that any sequence can terminate prematurely with an error! We'll cover when those happen too.
Synchronous response
This one is quite simple, if your API is a single synchronous request that comes immediately, fromCallable()
is your constructor. If your request can fail and throw an exception, fromCallable will catch it and forward it to onError()
int defrombulation(int a, int z) { /* */ }
Observable<Integer> reactiveDefrombulation() {
return Observable.fromCallable(new Callable() {
// Call your method. No try/catch required
return defrombulation(1, 2);
});
Asynschronous response
Starting in version 1.1.7 a new api for bridging from regular callbacks was added. The constructorfromEmitter()
(called fromAsync()
until release 1.1.10) provides an AsyncEmitter<T>
object used to forward your API results. The AsyncEmitter
provides methods to add any cleanup your API may require.
The reason why fromEmitter()
has the upper hand over the no-nos is that it also requires a second parameter that establishes its handling of backpressure. If you don't want to think about it I recommend to choose AsyncEmitter.BackpressureMode.BUFFER
for terminating sequences, and AsyncEmitter.BackpressureMode.LATEST
for non-terminating sequences.
In the example below PermissionRequester is a class that holds an Activity
object, which means that if it's retained for longer than said Activity
's lifecycle, it'll cause a memory leak. Bad! Bad! Luckily, the library creator reads this blog and added a cancellation method to clear that reference. Our subscriber needs to call that cancellation correctly on unsubscription.
class PermissionAskTask {
Activity activity;
PermissionAskTask(Activity activity) {
this.activity = activity;
}
void permissionResult(int a, int z, Callback<Integer> cb, CbError e) {
internalRequestPermission(activity, a, z, cb, e);
}
void cancel() {
activity = null;
}
}
Observable<Integer> reactivePermission(
PermissionAskTask requester,
Activity activity) {
return Observable.fromEmitter(new AsyncSubscriber() {
// This object is responsible for your lifecycle
AsyncEmitter<Integer> subscriber ->
// Do your call and forward the result to subscriber
requester.permissionResult(activity, 1, 2
, new Callback<Integer>() {
callbackResult ->
// Forward the result, and completion
subscriber.onNext(callbackResult);
subscriber.onComplete();
}, new CbError() {
error ->
// Forward the error wrapped into any Exception
subscriber.onError(new RuntimeException(error));
})
// Clear once we're done or there isn't anyone listening
subscriber.setDisposable(
new MainThreadDisposable() {
requester.cancel();
})
}, AsyncEmitter.BackpressureMode.BUFFER);
}
Non-terminating sequences (a.k.a. Hot Observables)
These sequences represent values that have an infinite number of results, and the only way for them to terminate is with an exception. Exceptions are a rarity in these cases, and they represent something wrong with the app. Making non-terminating APIs into reactive sequences is the key component to the stateless UI.
Asynschronous response
This approach looks exactly like the terminating asynchronous sequence, except the callback, never calls onComplete()
because the original callback will have responses until unsubscribed. This is the approach taken by most UI components, as illustrated on the implementation RxBinding provides for view clicks.
The example below illustrates the same case with a Bluetooth Status Service from a sample library, that originally takes the Colonel Mustard approach to callbacks.
interface BtStatusService {
void addCallback(Callback cb);
void removeCallback(Callback cb);
}
Observable<BtState> reactiveBtStatus(BluetoothStatusService service) {
return Observable.fromEmitter(new AsyncSubscriber() {
// This object is responsible for your lifecycle
AsyncEmitter<Integer> subscriber ->
final Callback cb = new Callback<BtState>() {
state ->
// Forward the result, but without completion
subscriber.onNext(state);
}
// Subscribe for the service
service.addCallback(cb);
// Clear once we're done or there isn't anyone listening
subscriber.setDisposable(
new MainThreadDisposable() {
service.removeCallback(cb);
})
}, AsyncEmitter.BackpressureMode.LATEST)
}
Inheritance-based and closed APIs
This approach is the most time-efficient for legacy codebases, and transforming any kind of existing inheritance scheme APIs into composition models. It uses the tool RxJava provides to bridge from imperative programming: Subject
.
A Subject
is a specialisation of both Observable
and Observer
that exposes their onNext()
, onComplete()
, and onError()
methods so they can be called at any time, without worrying about construction. Subjects
come in different flavours, where each has a different caching policy for new subscriptions. A rundown of the different types will be available in the next section. For now, wrapping UI widgets with PublishSubject
is the best choice.
Subject
may seem great at the beginning, but they come at a cost as their implementation doesn't handle concerns the way other options described above. It also uses the base interfaceObserver
which doesn't compose. Luckily, Jake Wharton to the rescue: RxRelays.Relay
has an upper hand overSubject
in their handling of backpressure, and their base interface is only oneAction1<T>
method, which makes them very reusable. I'm yet to use aSubject
in production.
The example is another really well know inheritance-based API you can't wrap in fromEmitter()
:
class ComposableActivity extends Activity {
enum Lifecycle { CREATE, RESUME, PAUSE, DESTROY }
final PublishRelay<Lifecycle> lifecycleObservable =
PublishRelay.create();
final PublishRelay<Triplet<Integer, Integer, Intent>> onResultObservable =
PublishRelay.create();
@Override onCreate(Bundle bundle) {
lifecycle.call(CREATE);
}
@Override onResume(Bundle bundle) {
lifecycle.call(RESUME);
}
@Override onPause(Bundle bundle) {
lifecycle.call(PAUSE);
}
@Override onDestroy(Bundle bundle) {
lifecycle.call(DESTROY);
}
@Override onActivityResult(int requestCode, int resultCode, Intent data) {
onActivityResult.call(Triplet.with(requestCode, resultCode, data));
}
}
class MyPresenter {
static Subscriber start(ComposableActivity activity) {
return activity
.lifecycleObservable
.filter(lc -> lc == CREATE)
.subscribe(lc ->
Log.d("Paco", "Reactive Create!");
);
}
}
Efficiently handling Observable results
One thing to understand is that every time an Observable
is started by calling subscribe()
on it, a new sequence is created starting the operation from scratch by calling the AsyncSubscriber
or Callable
you passed on the constructor. So, by default every subscriber gets different results even if they're subscribed immediately or for the same value. This may be inefficient for some operations that always yield the same results for the same parameters, so some operators are provided that allow re-usage of the same Observable
object.
share()
If you want any Observable
object to serve the same results to multiple subscribers but without keeping them in memory, use the operator share()
. The operator share()
makes sure that one sequence is kept while it contains at least 1 subscriber, and restarted whenever it drops down to 0 subscribers and is subscribed again.
Observable rand = Observable.fromCallable(() -> return Math.random())
// A new value per subscriber
rand.subscribe(val -> System.out.println(val))
// 0.824587465872465
rand.subscribe(val -> System.out.println(val))
// 0.5409345987
// Let's share
Observable shared = rand.share();
Subscription s1 = shared.subscribe(val -> System.out.println(val));
// 0.13423235235
Subscription s2 = shared.subscribe(val -> System.out.println(val));
// 0.13423235235
// If we want share() to create a new value we have to make sure it has no subscribers
s1.unsubscribe();
s2.unsubscribe();
shared.subscribe(val -> System.out.println(val));
// 0.5654654
cache()
If you want any terminating Observable
object to execute just once and store its results in memory, use the operator cache()
. Cache saves every value seen in the sequence.
Beware if your result is memory-heavy like a long long list or using it on a non-terminating sequence, as its values will be retained in memory while the Observable reference and any of its siblings are alive.
Observable cached =
Observable.fromEmitter(
subscriber -> {
subscriber.onNext(Math.random());
subscriber.onNext(Math.random());
subscriber.onNext(Math.random());
subscriber.onNext(Math.random());
subscriber.onNext(Math.random());
subscriber.onComplete();
})
.cache()
Subscription s1 = cached.subscribe(val -> System.out.println(val));
// 0.446546541
// 0.974455432
// 0.365456465
// 0.987652165
// 0.065465465
s1.unsubscribe();
cached.subscribe(val -> System.out.println(val));
// 0.446546541
// 0.974455432
// 0.365456465
// 0.987652165
// 0.065465465
Subjects and Relays
The specialised cases of Subject
(or Relay
) by definition implement internally the equivalent of share()
. If you want them to cache some of your values there are several sub-types: PublishSubject
doesn't cache any values, BehaviourSubject
caches and replays the latest value seen on subscription, and ReplaySubject
stores all or some of the previous values seen and replays them for every new subscriber.
PublishSubject<Integer> subject = PublishSubject.create();
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.subscribe(v -> System.out.println(v));
subject.onNext(4);
// 4
BehaviorSubject<Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println(v));
s.onNext(3);
// 2
// 3
ReplaySubject<Integer> s = ReplaySubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println(v));
s.onNext(3);
// 0
// 1
// 2
// 3
ReplaySubject<Integer> s = ReplaySubject.createwithSize(2);
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println(v));
s.onNext(3);
// 1
// 2
// 3
The RxJava tutorial link dump
If you want to get more familiar with all the RxJava machinery that we'll also cover in the next blog series, here are several links I found very useful:
This is the best and most complete tutorial I've found so far: Intro-To-RxJava
This one is the tutorial that inspired it and gives even more content. It uses the .NET version but is quite readable: IntroToRx
If you just need a short introduction in context: The introduction to Reactive Programming you've been missing
Or if you prefer a video, "Railway oriented programming" covers reactive concepts really well: https://vimeo.com/113707214
Closing the Introduction To Functional Reactive Programming Series
This entry covers as much introduction as required before diving into the passive and reactive use of the view layer. I initially intended it to be only a couple of entries long, but it ended up a full series where we started from basic programming and ended up understanding composable sequences. It was also an overview of how common API designs can be improved by reducing their surface and friction.
Here's a recollection of everything covered by the blog so far:
- Functional libraries for Java
- Advantages of using generic method references, Functions and Actions as parameters
- Transforming APIs based on inheritance to simpler, composable, ones by means of Functions
- Review of Android lifecycle issues with traditional callbacks
- The reactive approach to composable and cancellable callbacks with observable sequences
- The Observable in RxJava as a composable sequence for your existing code
In the upcoming series, we'll use those principles to create simpler APIs to use several Android framework components in passive reactive style: EditText
, ViewPager
, RecyclerView
, ImageView
... And in parallel, I'll start another series on how to express your business requirements on purely functional style by using this reactive view.
Meanwhile, I will be sharing tips and tidbits and looking for feedback on my twitter account.