Tag Archives: RxJava

Introduction to RxJava for Android by examples (I)

rxjava

Reactive programming is one of the hottest topics amongst the Android community these days. Lot of big guys in the industry are embracing and using RxJava in their platforms and applications, even Google has release their own version of a reactive library called Agera.

One of the problems of Rx programming is the initial learning curve, the complexity of the paradigm itself and the extension of the library operators and classes.

The goal of this post is to make a very simple and easy to understand introduction with some examples and be able to start programming and design our apps using RxJava.
 

What is RxJava?

RxJava is just a library that implements ReactiveX extensions, which basically facilitate the implementation of  asynchronous programming using observables data streams.

It can be described as a combination of the Observer (there is one entity emitting data and one or many listening or it) and Iterator (the data flow are dispatched in onNext events sequences)  patterns using the Functional Programming paradigm (we get data in declarative style methods and avoiding mutable objects).

RxJava is one of the language implementations of the ReactiveX libraries. At the moment there are also other versions available for languages like JavaScript, C#, C++, Swift, Kotlin, etc…
 

Why RxJava?

So if there is the learning curve and complexity issue, why to bother learning RxJava? These are some of the main reasons:

  • Abstraction of complexity of async data flows and events. It’s quite common to fall into the hell of nested and multiple listeners, the hell of multiple events from everywhere when using an event bus, etc. With RxJava we can simplify all this using just asynchronous stream of transformable data.
  • Filtering, composition, transformation. Thanks to the massive amount of operators we can easily transform any stream of data transmitted. As we will see later we can filter, combine, transform, create, etc. any kind of data stream.
  • Error handling.  Very straightforward error handling by centralising in a single callback method any error, stopping the data sequence.
  • Easy testing. Having a single source/destination of the data source and testing helper classes make it quite easy to test.
  • Light and high performance. RxJava is light and fast, at the moment it weights around 1 MB and it’s quite fast streaming our data.

 

The key components

So let’s take a quick look to the key elements that compose the RxJava library.

The Observable

This is the data source, the object that will be emitting the sequences of data items that the entities who are interested in the data they provide, will be subscribing.  It implements all the operators mentioned before to be able to transform the data streams. Observables can be divided in two types: “Hot” observables which might emit data as soon as they are created  and “Cold” observables which wait until an Observer subscribes to it.

The Observer

This is the type of object that will be subscribing and “observing” the data emitted by the Observable, they will “react” somehow using the data provided. Typically they implement the three main events OnNext, OnError, OnComplete as we will see later.

More info in the official reference here.

Operators

These is one of the key and most powerful features of RxJava. There are quite a lot operators of different types that will help us to manipulate and transform in many ways the data streams. We can group them in different categories such as Creating, Transforming, Filtering, Combining, Mathematical, etc.

For a complete list of operators take a look to the documentation following this link.

Schedulers

The Schedulers will help us to implement multithreading in our data streaming. Calling the Observable method “SubscribeOn” and indicating one of the Schedulers we can select in which thread the Observable will do it’s data stream job. On the other hand, calling the method “ObserveOn”, will set up the thread where we will be receiving and observing its data stream.

In RxJava the most common Schedulers are “NewThread” (create a new thread for each job), “io” (for input/output operations), “computation” (event-loops, callbacks processing, etc), “immediate” (perform the job immediately in the current thread), etc.

Subject

A Subject is object that is able to act as Observable and Observer. It can emit data streams and at the same time can subscribe to another data stream source (Observable), so we can think in it like a sort of bridge between two data streams.
 

Example 1: Simple subscription to a source of data

For our first example, we have a method that create a list of numbers and in order to simulate a long operation, we have added a 100 milliseconds thread pause in the loop.

The method look like this:

private List<Integer> getOddNumbers() {
    List<Integer> numbers = new ArrayList<>();
    for(int i=0; i<100; i++) {
        if(i%2 == 0) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    return numbers;
}

 
Now we create the Observable that is going to emit that list of numbers. To accomplish this, in this example we use the “fromCallable()” method that is going to build an observable which will emit the list of numbers retrievedby the callable method:

private Observable<List<Integer>> getOddNumbersObservable() {
    return Observable.fromCallable(new Callable<List<Integer>>() {
        @Override
        public List<Integer> call() throws Exception {
            return getOddNumbers();
        }
    });
}

 
Right, so now that we have an Observable which is going to emit some data, what we need next is to create an Observer that is going to subscribe to that source of data and receive the numbers list. Let’s have a look to a standard Observer:

private Observer getOddNumbersObserver() {
    return new Observer<List<Integer>>() {
        @Override
        public void onCompleted() {
            LogHelper.addLog(mTvLog, "getOddNumbers() - onCompleted()");
        }

        @Override
        public void onError(Throwable e) {
            LogHelper.addLog(mTvLog, "getOddNumbers() - onError()");
        }

        @Override
        public void onNext(List<Integer> integer) {
            LogHelper.addLog(mTvLog, "getOddNumbers() - onNext()" + integer);
        }
    };
}

 
As we can see, the Observer implements three fundamental methods: onNext will be called when the Observable emit an item. onCompleted will be called when the Observable has finished emitting data. And finally onError will be called if there has been any exception or error when emitting data.

We can simplify the Observer in case we just want a single event that will receive the list of numbers and we don’t need to know about the OnComplete and OnError events. To accomplish this we just need to create an Action instead of the Observable, something like this:

private Action<List<Integer>> getOddNumbersAction() {
    return new Action<List<Integer>() {
        @Override
        public void call(List<Integer> integers) {
            LogHelper.addLog(mTvLog, "getOddNumbersAction() - call: " + integers);
        }
    };
}

 
So the last step would be to subscribe this Observer to the Observable, and it would be as simple as this:

    observable.subscribe(observer);

 
And that’s it! When the getOddNumbers() method has finished the calculation of numbers and return, the Observable will dispatch the list of numbers and the Observer will receive the data.
 

Summary

So this has been the introduction and basic operations with RxJava.
You can find all the examples in this public repository.

In next posts we will be looking deeper into more elements of the Reactive extension, like the Operators and Schedulers, etc.

In the mean time please leave here any comments, suggestion or corrections you might find!
 

Tagged ,