Observables & Subscriptions


Now that we are familiar with reactive programming concepts and that Observable resides at the core of it, let’s find out about the ways we can create and use simple observables.

Review

As mentioned earlier, observables are lazy by default and accordingly in order to begin seeing observables outputs there should be at least one subscriber such as the one mentioned below.

1
2
3
4
5
6
Observable<T> obsevable = //...
observable.subscribe(
(i) -> System.out.println(i),
(e) -> System.out.println(e),
( ) -> System.out.println("completed")
);

Notice that the event callback is invoked each time upstream pushes an event to downstream and that the callback will not be invoked from more than one thread at a time. In the output given for each snippet, the thread name on which the code being executed is mentioned.

1- Observables

Factory Methods

There are a few factory methods to create fixed Observables. Except interval(interval, unit) method, the rest of factory methods execute on the client thread meaning that they block the client thread until all events are received.

Observable.just(value)

This method creates an Observable instance that only emits the given values and then completes afterward.

1
2
Observable<String> observable =
Observable.just("hello world!");
1
2
[main] - hello world!
[main] - completed

1
2
Observable<String> observable =
Observable.just("one", "two", "three");
1
2
3
4
[main] - one
[main] - two
[main] - three
[main] - completed

1
2
Observable<Integer> observable =
Observable.just(1, 2, 3);
1
2
3
4
[main] - 1
[main] - 2
[main] - 3
[main] - completed

Observable.from(values)

This method creates an Observable from the given collection or iterable and emits its values.

1
2
Observable<Integer> fibonacci =
Observable.from(new Integer[] { 1, 2, 3, 5, 8 });
1
2
3
4
5
6
[main] - 1
[main] - 2
[main] - 3
[main] - 5
[main] - 8
[main] - completed
1
2
3
4
5
6
MongoCollection<Document> employees = db.getCollection("employees");
Observable
.from(employees.find())
.doAfterTerminate(() -> {
// what needs to be done after termination
});
1
2
[main] - Document{{_id=589a3ebfb371aa47cfa03e1c, name=John Doe, salary=1234.0}}
[main] - Document{{_id=589a3edeb371aa47cfa03e1d, name=James Brown, salary=4321.0}}

Observable.range(from, n)

Produces an Observable and emits n integer numbers starting from ‘from’. For example, range(5, 3) will emit 5, 6, and 7 and then complete normally.

1
2
Observable<Integer> observable =
Observable.range(10, 5);
1
2
3
4
5
6
7
[main] - 10
[main] - 11
[main] - 12
[main] - 13
[main] - 14
[main] - 15
[main] - completed

Observable.interval(interval, unit)

This factory method creates a cold observable that emits long numbers starting from zero and a fixed delay between each number including the first one. Notice that since numbers are emitted on the computation thread, in order to see some results, you need to block the main thread.

1
2
3
Observable<Long> observable =
Observable.interval(2, TimeUnit.SECONDS);
// 1, 2, 3, 4, 5, ...
1
2
3
4
[RxComputationScheduler-2] - 0
[RxComputationScheduler-2] - 1
[RxComputationScheduler-2] - 2
...

Observable.create(…)

Use this method, if you need more control over unsubscribes as well as production and consumption of events. The following snippet rapidly produces integers starting from zero.

1
2
3
4
5
6
7
8
9
10
11
12
Observable<Integer> observable = Observable.create(
subscriber -> {
Runnable r = () -> {
int i = 0;
while (!subscriber.isUnsubscribed())
subscriber.onNext(i++);
};
new Thread(r).start();
});
pause(1);
subscription.unsubscribe();
pause(0);
1
2
3
4
5
6
7
[Thread-0] - 0
[Thread-0] - 1
[Thread-0] - 2
[Thread-0] - 3
[Thread-0] - 4
[Thread-0] - 5
[Thread-0] - 6

When creating observables by create() method, in order to avoid sending unnecessary events when there are no subscribers, check the isUnsubscribed() as often as possible.
Also notice that these custom implementations of Observable should ensure that events coming from multiple threads are called in a serialized fashion.

Observable.fromCallable(…)

This factory method creates an Observable from the value that is returned by another method (callable). The following snippet create a product Observable by fetching the product by its ID. This method is semantically the same as create() but much more concise.

1
2
3
Observable<Product> load(int productId) {
return Observable.fromCallable(() -> findById(productId));
}

Composing Observables

while composing observables, we can:

  • A: Wait for corresponding events, process them and emit the results.

In the following example the corresponding events of the first two Observable are added and emitted as the third one.

1
2
3
4
5
6
Observable<Integer> one = Observable.range(10, 100);
Observable<Long> two = Observable.interval(2, SECONDS);
Observable
.zip(one, two,
(l1, l2) -> l1 + l2)
.subscribe(log::info);
1
2
3
4
5
6
[RxComputationScheduler-2] - 10 # 10 + 0
[RxComputationScheduler-2] - 12 # 11 + 1
[RxComputationScheduler-2] - 14 # 12 + 2
[RxComputationScheduler-2] - 16 # 13 + 3
[RxComputationScheduler-2] - 18 # 14 + 4
...
  • B: Compose a stream from the first two Observable which emits events they are arrive.
1
2
3
4
5
Observable<Integer> one = Observable.range(10, 2);
Observable<Long> two = Observable.interval(1, SECONDS);
Observable
.merge(one, two)
.subscribe(log::info);
1
2
3
4
5
6
[main] - 10
[main] - 11
[RxComputationScheduler-1] - 0
[RxComputationScheduler-1] - 1
[RxComputationScheduler-1] - 2
...

2- Subscriptions

As stated earlier, having a reference to a subscriber instance together with using add() method we can be notified of unsubscribe and perform suitable action. We only need to attach the unsubscribe action to a subscription and add it to the list of subscriber’s subscriptions.

1
2
3
Subscription onUnsubscribe =
Subscriptions.create(() -> in.close());
subscriber.add(onUnsubscribe);

It is important to unsubscribe from an Observable as soon as you no longer want to receive more events and Subscription provides just a way to do that using unsubscribe() method.

Multiple Subscribers

1
2
3
4
5
6
7
8
9
Observable<Integer> o1 =
Observable.create(subscriber -> {
log.info("generating values");
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
});
o1.subscribe(log::info);
o1.subscribe(log::info);
1
2
3
4
5
6
7
8
[main] - generating values
[main] - 1
[main] - 2
[main] - 3
[main] - generating values
[main] - 1
[main] - 2
[main] - 3

Unsubscribing from Outside.

1
2
3
4
5
6
7
8
9
10
Observable<Long> one =
Observable.interval(1, TimeUnit.SECONDS);
Subscription subscription = one.subscribe(log::info);
try {
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscription.unsubscribe();
1
2
3
4
[RxComputationScheduler-1] - 0
[RxComputationScheduler-1] - 1
Process finished with exit code 0

Unsubscribing from Inside.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable<Integer> one =
Observable.from(new Integer[] { -2, -1, 0, 1, 2});
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer i) {
if (i == 0)
unsubscribe();
log.info(i);
}
@Override
public void onError(Throwable e) { }
@Override
public void onCompleted() {
log.info("completed");
}
};
one.subscribe(subscriber);
1
2
3
4
5
[main] - -2
[main] - -1
[main] - 0
Process finished with exit code 0

Sharing an Observable

ConnectableObservable

Sometimes multiple subscribers are interested in your observable and you might none of them to miss any early emitted events. Without ConnectableObservable, hot observables will emit items and late subscribers might lose some of them.

A ConnectableObservable is an Observable that emits events to subscribers only when its connect() method is invoked. When this happens, a dedicated subscriber is assigned to the upstream which emits exactly the same sequence of events to all downstream subscribers. It ensures that the upstream Observable has at most one Subscriber as it opens only one subscription to the original Observable.

The following code tries to make a comparison between normal observable and a connectable observable with two subscribers. Notice how in case of ConnectableObservable, both subscribers experience the same events at the same time.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable<Integer> observable = Observable.create(
subscriber -> {
log.info("connecting...");
Runnable r = () -> {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
};
new Thread(r).start();
Subscription s = Subscriptions.create(
() -> log.info("disconnecting...")
);
subscriber.add(s);
});
log.info("before subscription");
Subscription sub1 = observable.subscribe(i -> log.info("sub1: " + i));
log.info("sub1 subscribed");
Subscription sub2 = observable.subscribe(i -> log.info("sub2: " + i));
log.info("sub2 subscribed");
sub1.unsubscribe();
log.info("sub1 unsubscribed");
sub2.unsubscribe();
log.info("sub2 unsubscribed");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[main] - before subscription
[main] - connecting...
[Thread-0] - sub1: 1
[Thread-0] - sub1: 2
[Thread-0] - sub1: 3
[main] - sub1 subscribed
[main] - connecting...
[main] - sub2 subscribed
[Thread-1] - sub2: 1
[Thread-1] - sub2: 2
[Thread-1] - sub2: 3
[main] - disconnecting...
[main] - sub1 unsubscribed
[main] - disconnecting...
[main] - sub2 unsubscribed

The publish() operator converts a normal observable to a ConnectableObservable. Finally, the connect() method causes the ConnectableObservable to start emitting events.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// the same observable as previous example
// ...
ConnectableObservable<Integer> shared = observable.publish();
log.info("before subscription");
Subscription sub1 = shared.subscribe(s -> log.info("sub1: " + s));
log.info("sub1 subscribed");
Subscription sub2 = shared.subscribe(s -> log.info("sub2: " + s));
log.info("sub2 subscribed");
shared.connect();
sub1.unsubscribe();
log.info("sub1 unsubscribed");
sub2.unsubscribe();
log.info("sub2 unsubscribed");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[main] - before subscription
[main] - sub1 subscribed
[main] - sub2 subscribed
[main] - connecting...
[Thread-0] - sub1: 1
[Thread-0] - sub2: 1
[Thread-0] - sub1: 2
[Thread-0] - sub2: 2
[Thread-0] - sub1: 3
[Thread-0] - sub2: 3
[main] - sub1 unsubscribed
[main] - sub2 unsubscribed

Notice and compare the threads on which subscribers receive the events.

publish().refCount() or share()

The publish().refCount() duet is so commonly used for sharing observables that the alias of share() is created for them. share() basically subscribes to the upstream Observable and shares it if there is at least one active subscriber. It also unsubscribes from upstream as soon as the last active subscriber unsubscribes.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Observable<Integer> observable = Observable.create(
subscriber -> {
log.info("connecting...");
Runnable r = () -> {
try {
subscriber.onNext(1);
Thread.currentThread().sleep(1000);
subscriber.onNext(2);
Thread.currentThread().sleep(1000);
subscriber.onNext(3);
} catch (InterruptedException e) {
log.error(e);
}
};
new Thread(r).start();
Subscription s = Subscriptions.create(
() -> log.info("disconnecting...")
);
subscriber.add(s);
});
Observable<Integer> shared = observable.share();
log.info("before subscription");
Subscription sub1 = shared.subscribe(s -> log.info("sub1: " + s));
log.info("sub1 subscribed");
pause(500);
Subscription sub2 = shared.subscribe(s -> log.info("sub2: " + s));
log.info("sub2 subscribed");
1
2
3
4
5
6
7
8
9
[main] - before subscription
[main] - connecting...
[main] - sub1 subscribed
[Thread-0] - sub1: 1
[main] - sub2 subscribed
[Thread-0] - sub1: 2
[Thread-0] - sub2: 2
[Thread-0] - sub1: 3
[Thread-0] - sub2: 3
Share Comments