This is Not an RxJava Tutorial

After solving the summer murder mystery in the previous blog entry, we found that Observable is a construct in RxJava that provides a powerful abstraction: composable sequences. It was also mentioned that every interaction with an existing codebase or framework can also be expressed as a sequence, also known as "making your code reactive". In this entry, we'll explore how to wrap your existing code into Observable.

The Observable contract

An Observable represents a sequence of 0-N values, that may or may not terminate, and allows the possibility of unexpected errors. This is well presented by the callback Observable use, named Observer:

interface Observer {
    void onNext(T t);

    void onCompleted();

    void onError(Throwable e);
}

onNext() receives any amount of values, and either onCompleted() or onError() are called once if the sequence terminates or gets an unexpected error inside.

Once a sequence terminates or throws an error, no values will be received onNext().

Sequences are not required to terminate, i.e. view clicks, Bluetooth status, or gyroscopes.

If you've been following the series you'll see that this Observer interface can be split into three separate generic Action1 and Action0 functions. Luckily every method that takes Observer as a parameter has an override for one, two, and three separate functions.

Operators

Operators are the way we interact with sequences. Operators are a key part when developing in functional reactive style, and there are many of them. An operator can change the values inside a sequence, make it terminating or non-terminating, continue it on a different thread, aggregate its values into a collection, combine multiple sequences serially or in parallel, and many other descriptive ways of composing behavior into the sequence.

In the following series about the passive view and functional business logic I will be covering some of its usages and pitfalls in context. If you prefer an in-depth overview of operators I'll add some links at the end of this entry with some of my favourite RxJava tutorials.

Meanwhile, in this entry I'll focus on the central concepts you'll need to wrap existing APIs.

Converting existing code into a sequence

The RxJava team has been very smart about the way the sequences are created, and most of them are geared towards allowing you to interface with the parts of your app that are purely imperative, like the Android code to create a view, or that efficient compression algorithm you've worked on.

Observables are generally not replacements for computations, but rather a way of composing them. While some of those heavy computations can be expressed with functional code with operators, it is often more performant to fall back to imperative code and use the Observable as the response delivery.

The no-nos

In most RxJava starter series, Observables are created by one of two constructors: create() and defer(). Those two are good when used with caution and mostly used internally by the RxJava team to create new features. They were also used extensively in the early days of RxJava.

As the usage of the library evolved they were seen as a liability because they require care to handle requirements like backpressure. So, the contributors added two new recommended constructors, fromCallable() and fromEmitter() (called fromAsync() until release 1.1.10), that handle those concerns for you.

Terminating sequences (a.k.a. Cold Observables)

A terminating sequence is an Observable that emits 1-N values, then terminates. Remember that any sequence can terminate prematurely with an error! We'll cover when those happen too.

Synchronous response

This one is quite simple, if your API is a single synchronous request that comes immediately, fromCallable() is your constructor. If your request can fail and throw an exception, fromCallable will catch it and forward it to onError()

int defrombulation(int a, int z) { /* */ }

Observable<Integer> reactiveDefrombulation() {
    return Observable.fromCallable(new Callable() {

        // Call your method. No try/catch required
        return defrombulation(1, 2);
    });

Asynschronous response

Starting in version 1.1.7 a new api for bridging from regular callbacks was added. The constructorfromEmitter() (called fromAsync() until release 1.1.10) provides an AsyncEmitter<T> object used to forward your API results. The AsyncEmitter provides methods to add any cleanup your API may require.

The reason why fromEmitter() has the upper hand over the no-nos is that it also requires a second parameter that establishes its handling of backpressure. If you don't want to think about it I recommend to choose AsyncEmitter.BackpressureMode.BUFFER for terminating sequences, and AsyncEmitter.BackpressureMode.LATEST for non-terminating sequences.

In the example below PermissionRequester is a class that holds an Activity object, which means that if it's retained for longer than said Activity's lifecycle, it'll cause a memory leak. Bad! Bad! Luckily, the library creator reads this blog and added a cancellation method to clear that reference. Our subscriber needs to call that cancellation correctly on unsubscription.

class PermissionAskTask {
    Activity activity;

    PermissionAskTask(Activity activity) {
        this.activity = activity;
    }

    void permissionResult(int a, int z, Callback<Integer> cb, CbError e) { 
        internalRequestPermission(activity, a, z, cb, e); 
    }

    void cancel() {
        activity = null;
    }
}

Observable<Integer> reactivePermission(
                        PermissionAskTask requester,
                        Activity activity) {
    return Observable.fromEmitter(new AsyncSubscriber() {

        // This object is responsible for your lifecycle
        AsyncEmitter<Integer> subscriber -> 

            // Do your call and forward the result to subscriber
            requester.permissionResult(activity, 1, 2
            , new Callback<Integer>() {
                callbackResult -> 
                    // Forward the result, and completion
                    subscriber.onNext(callbackResult);
                    subscriber.onComplete();
            }, new CbError() {
                error ->
                    // Forward the error wrapped into any Exception
                    subscriber.onError(new RuntimeException(error));
            })

            // Clear once we're done or there isn't anyone listening
            subscriber.setDisposable(
                new MainThreadDisposable() {
                    requester.cancel();
                })
    }, AsyncEmitter.BackpressureMode.BUFFER);
}

Non-terminating sequences (a.k.a. Hot Observables)

These sequences represent values that have an infinite number of results, and the only way for them to terminate is with an exception. Exceptions are a rarity in these cases, and they represent something wrong with the app. Making non-terminating APIs into reactive sequences is the key component to the stateless UI.

Asynschronous response

This approach looks exactly like the terminating asynchronous sequence, except the callback, never calls onComplete() because the original callback will have responses until unsubscribed. This is the approach taken by most UI components, as illustrated on the implementation RxBinding provides for view clicks.

The example below illustrates the same case with a Bluetooth Status Service from a sample library, that originally takes the Colonel Mustard approach to callbacks.

interface BtStatusService {
    void addCallback(Callback cb);

    void removeCallback(Callback cb);
}

Observable<BtState> reactiveBtStatus(BluetoothStatusService service) {
    return Observable.fromEmitter(new AsyncSubscriber() {

        // This object is responsible for your lifecycle
        AsyncEmitter<Integer> subscriber -> 

            final Callback cb = new Callback<BtState>() {
                state -> 
                    // Forward the result, but without completion
                    subscriber.onNext(state);
            }

            // Subscribe for the service
            service.addCallback(cb);

            // Clear once we're done or there isn't anyone listening
            subscriber.setDisposable(
                new MainThreadDisposable() {
                    service.removeCallback(cb);
                })
    }, AsyncEmitter.BackpressureMode.LATEST)
}

Inheritance-based and closed APIs

This approach is the most time-efficient for legacy codebases, and transforming any kind of existing inheritance scheme APIs into composition models. It uses the tool RxJava provides to bridge from imperative programming: Subject.

A Subject is a specialisation of both Observable and Observer that exposes their onNext(), onComplete(), and onError() methods so they can be called at any time, without worrying about construction. Subjects come in different flavours, where each has a different caching policy for new subscriptions. A rundown of the different types will be available in the next section. For now, wrapping UI widgets with PublishSubject is the best choice.

Subject may seem great at the beginning, but they come at a cost as their implementation doesn't handle concerns the way other options described above. It also uses the base interface Observer which doesn't compose. Luckily, Jake Wharton to the rescue: RxRelays. Relay has an upper hand over Subject in their handling of backpressure, and their base interface is only one Action1<T> method, which makes them very reusable. I'm yet to use a Subject in production.

The example is another really well know inheritance-based API you can't wrap in fromEmitter():

class ComposableActivity extends Activity {

    enum Lifecycle { CREATE, RESUME, PAUSE, DESTROY }

    final PublishRelay<Lifecycle> lifecycleObservable = 
        PublishRelay.create();

    final PublishRelay<Triplet<Integer, Integer, Intent>> onResultObservable =
        PublishRelay.create();

    @Override onCreate(Bundle bundle) {
        lifecycle.call(CREATE);
    }

    @Override onResume(Bundle bundle) {
        lifecycle.call(RESUME);
    }

    @Override onPause(Bundle bundle) {
        lifecycle.call(PAUSE);
    }

    @Override onDestroy(Bundle bundle) {
        lifecycle.call(DESTROY);
    }

    @Override onActivityResult(int requestCode, int resultCode, Intent data) {
        onActivityResult.call(Triplet.with(requestCode, resultCode, data));
    }
}

class MyPresenter {
    static Subscriber start(ComposableActivity activity) {
        return activity
                .lifecycleObservable
                .filter(lc -> lc == CREATE)
                .subscribe(lc -> 
                    Log.d("Paco", "Reactive Create!");
                );
    }
}

Efficiently handling Observable results

One thing to understand is that every time an Observable is started by calling subscribe() on it, a new sequence is created starting the operation from scratch by calling the AsyncSubscriber or Callable you passed on the constructor. So, by default every subscriber gets different results even if they're subscribed immediately or for the same value. This may be inefficient for some operations that always yield the same results for the same parameters, so some operators are provided that allow re-usage of the same Observable object.

share()

If you want any Observable object to serve the same results to multiple subscribers but without keeping them in memory, use the operator share(). The operator share() makes sure that one sequence is kept while it contains at least 1 subscriber, and restarted whenever it drops down to 0 subscribers and is subscribed again.

Observable rand = Observable.fromCallable(() -> return Math.random())

// A new value per subscriber
rand.subscribe(val -> System.out.println(val))
// 0.824587465872465
rand.subscribe(val -> System.out.println(val))
// 0.5409345987

// Let's share
Observable shared = rand.share();

Subscription s1 = shared.subscribe(val -> System.out.println(val));
// 0.13423235235
Subscription s2 = shared.subscribe(val -> System.out.println(val));
// 0.13423235235

// If we want share() to create a new value we have to make sure it has no subscribers

s1.unsubscribe();
s2.unsubscribe();

shared.subscribe(val -> System.out.println(val));
// 0.5654654

cache()

If you want any terminating Observable object to execute just once and store its results in memory, use the operator cache(). Cache saves every value seen in the sequence.

Beware if your result is memory-heavy like a long long list or using it on a non-terminating sequence, as its values will be retained in memory while the Observable reference and any of its siblings are alive.

Observable cached = 
    Observable.fromEmitter(
        subscriber -> {
            subscriber.onNext(Math.random());
            subscriber.onNext(Math.random());
            subscriber.onNext(Math.random());
            subscriber.onNext(Math.random());
            subscriber.onNext(Math.random());
            subscriber.onComplete();
        })
        .cache()

Subscription s1 = cached.subscribe(val -> System.out.println(val));
// 0.446546541
// 0.974455432
// 0.365456465
// 0.987652165
// 0.065465465

s1.unsubscribe();

cached.subscribe(val -> System.out.println(val));
// 0.446546541
// 0.974455432
// 0.365456465
// 0.987652165
// 0.065465465

Subjects and Relays

The specialised cases of Subject (or Relay) by definition implement internally the equivalent of share(). If you want them to cache some of your values there are several sub-types: PublishSubject doesn't cache any values, BehaviourSubject caches and replays the latest value seen on subscription, and ReplaySubject stores all or some of the previous values seen and replays them for every new subscriber.

PublishSubject<Integer> subject = PublishSubject.create();
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.subscribe(v -> System.out.println(v));
subject.onNext(4);
// 4

BehaviorSubject<Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println(v)); 
s.onNext(3);
// 2
// 3

ReplaySubject<Integer> s = ReplaySubject.create(); 
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println(v)); 
s.onNext(3);
// 0
// 1
// 2
// 3

ReplaySubject<Integer> s = ReplaySubject.createwithSize(2); 
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println(v)); 
s.onNext(3);
// 1
// 2
// 3

The RxJava tutorial link dump

If you want to get more familiar with all the RxJava machinery that we'll also cover in the next blog series, here are several links I found very useful:

This is the best and most complete tutorial I've found so far: Intro-To-RxJava

This one is the tutorial that inspired it and gives even more content. It uses the .NET version but is quite readable: IntroToRx

If you just need a short introduction in context: The introduction to Reactive Programming you've been missing

Or if you prefer a video, "Railway oriented programming" covers reactive concepts really well: https://vimeo.com/113707214

Closing the Introduction To Functional Reactive Programming Series

This entry covers as much introduction as required before diving into the passive and reactive use of the view layer. I initially intended it to be only a couple of entries long, but it ended up a full series where we started from basic programming and ended up understanding composable sequences. It was also an overview of how common API designs can be improved by reducing their surface and friction.

Here's a recollection of everything covered by the blog so far:

  1. Functional libraries for Java
  2. Advantages of using generic method references, Functions and Actions as parameters
  3. Transforming APIs based on inheritance to simpler, composable, ones by means of Functions
  4. Review of Android lifecycle issues with traditional callbacks
  5. The reactive approach to composable and cancellable callbacks with observable sequences
  6. The Observable in RxJava as a composable sequence for your existing code

In the upcoming series, we'll use those principles to create simpler APIs to use several Android framework components in passive reactive style: EditText, ViewPager, RecyclerView, ImageView... And in parallel, I'll start another series on how to express your business requirements on purely functional style by using this reactive view.

Meanwhile, I will be sharing tips and tidbits and looking for feedback on my twitter account.