Getting Started With ReactiveX
What is
Reactive?
ReactiveX
is a library composing asynchronous and event-based programs by using
observable pattern, Iterator pattern and functional programming. It is developed by NetFlix.
What
languages it supports?
You
can find implementation of ReactiveX in many different programming languages
like Java, JavaScript, C# etc. In every programming language it has different name
as mentioned below-
Java
- RxJava
JavaScript
- RxJS
C++
- RxCpp
C#
- Rx.NET
Scala
- RxScala
As
described above, its Android implementation is known as RxAndroid and below explanation
is based with the reference of RxJava/RxAndroid. It is one of the most
discussed libraries for enabling Reactive Programming in Android development.
What problem
it solves/ how it simplifies things?
When
you develop a complex Android application it contains lots of database
operation, network calls, bitmap processing which introduce nested callbacks.
Because of this application becomes complex and error prone. As a developer this
also leads application difficult to understand as well. So ReactiveX offers an
alternative approach that is both clear and concise, to manage asynchronous
tasks and events.
Basics of RxJava
There
are two main actors in RX world as mentioned:
Observables
– An object that emits a stream of data or events
Observers/Subscribers
– An object that acts upon the emitted data.
In
RxJava and RxAndroid, observers are instances of the Observer interface, and observables
are instances of the Observable class.
The
standard flow of an Observable is to emit one or more data items with either complete
successfully or with an error. An Observable can have multiple Observers so each
data item emitted by the Observable will be received by each Observer.
Setting up
RxAndroid
To
use RxAndroid, add its compile dependency in the Android Studio project build.gradle.
compile
'io.reactivex:rxandroid:1.1.0'
Basics of Observable
& Observer
Create Observable-
The
Observable class has many static methods known as operators which is use to
create Observable objects. There are different operators groups provided by
RxAndroid. Some of them are:
Creating
Observables
Create,
Defer, Empty/Never/Throw, From, Interval, Just, Range
·
Transforming
Observable Items
Buffer,
FlatMap, GroupBy, Map, Scan, and Window
·
Filtering
Observables
Debounce,
Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip,
SkipLast, Take, and TakeLast
·
Combining
Observables
And/Then/When,
CombineLatest, Join, Merge, StartWith, Switch, and Zip
·
Utility
Operators
Delay, Do,
Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn,
TimeInterval, Timeout, Timestamp, and Using
To
know all the operators groups refer Reactive doc.
The
following code shows you how to use the just operator to create a very simple
Observable that emits a String value.
Observable<String>
stringObservable = Observable.just("Hello Android");
Above
line of code emits "Hello Android"
Below
code emits multiple Integer values:
Observable<Integer>
integerObservable = Observable.just(2, 4, 8, 13, 23);
Create
Observer-
To
create an observer, create a class that implements the Observer interface. The
Observer interface has overridden methods which are used to handle different
types of notifications received from Observable.
Here
is an observer that receives String value emitted by the observable:
Observer<String>
stringObserver = new Observer<String>() {
@Override
public void onCompleted() {
// this method gets called when the
observable has no more data to emit
}
@Override
public void onError(Throwable e) {
// this method gets called when the
observable encounters an error
}
@Override
public void onNext(String s) {
// this method gets called each time
the observable emits data
}
};
As
explained onNext() method gets called to consume data events emitted by
Observable.
onCompleted()
gets called when no additional events coming and Observable finished its work
onError()
gets called in case of unrecoverable error.
Subscribe
Observer to Observable
To
assign Observer to Observable use Subscribe method which returns Subscription
object? The following code makes stringObserver observe stringObservable:
Subscription
stringSubscription = stringObservable.subscribe(stringObserver);
UnSubscribe
Observer to Observable
To
stop observing items simply need to call unsubscribe() method on the returned
Subscription object.
stringSubscription.unsubscribe();
Handling
Asynchronous operations
Till
now whatever we discussed on Observer and Observable will work perfectly with
single thread (Main Thread/UI Thread). So to manage multiple Thread, ReativeX
provides freedom from nested callbacks.
If
you want to introduce multithreading in Observable operators, you can instruct particular
Observables to operate on particular Schedulers.
By default, an Observable notifies its observers on the same thread on which
its Subscribe method is called.
The
SubscribeOn operator changes this
behavior by specifying a different Scheduler on which the Observable should
operate. The ObserveOn operator
specifies a different Scheduler that the Observable will use to send notifications
to its observers. Using subscribeOn() and observeOn() operators you can
explicitly specify which thread should run the background job and which thread
should handle the user interface updates.
Example-
stringObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(stringObserver);
Here
stringObservable (Observable) will use the Schedulers.io() to subscribe, and
will observe the results on the UI Thread using AndroidSchedulers.mainThread().
More on Scheduler refer here.
Conclusion
I hope this serves as a useful introduction
to ReactiveX and an overview of its basic capabilities. It also has much more
powerful concepts. To learn more about reactive extensions I encourage you to
browse the resources available at ReactiveX.
To find more interesting topics on Software development follow me at https://medium.com/@ankit.sinhal
You can also find my Android Applications on play store
Comments
Post a Comment