FlatMap


One of the most crucial and important concepts in mastering reactive extensions is understanding flatMap() and the uses of it which is mainly due to the fact that even a fairly simple application tend to work with multiple observables.


However, before understanding flatMap, we need to learn its simpler version, the map() operator and the details around using it.

map(), Transformation On-the-fly

Sometimes you want to perform a transformation on each and every event from the upstream as soon as they arrive and pack them into a new observable for the downstream subscribers. In other words, map() constructs Observable and replaces all upstream values of type T with R using your desired mapping function.

The snippet below takes integer values and maps items to corresponding strings embellished with double quotes.

1
2
3
4
5
Observable<Integer> observable =
Observable.just(1, 2, 3);
Observable<String> map1 =
observable.map(n -> "\"" + n + "\"");
map1.subscribe(log::info);
1
2
3
[main] - "1"
[main] - "2"
[main] - "3"

We could also use multiple map() operators together.

1
2
3
4
Observable<Instant> map = observable
.map(order -> order.getUser())
.map(user -> user.birthDate())
.map(Date::toInstant);

So far, each event within the upstream was a simple object, but what if the computation used for mapping is asynchronous or already return observables?

1
2
3
4
5
6
7
8
9
10
Observable<Integer> observable =
Observable.just(1, 2, 3);
Observable<Observable<Integer>> map =
observable.map(integer ->
Observable.timer(1, SECONDS)
.map(s -> integer)
);
map.subscribe(log::info);
1
2
3
[main] - rx.Observable@36f6e879
[main] - rx.Observable@5a61f5df
[main] - rx.Observable@3551a94

Using flatMap()

Since working with nested observables are cumbersome as you need to subscribe to each inner Observable and synchronize the results back into a single stream, flatMap() was created.

flatMap() does the following in order:

  • constructs Observable> (an observable of observables)
  • replaces all upstream values of type T with Observable (just like map()).
  • then it automatically subscribes to these inner Observable streams
  • produces a single stream of type R, containing all values from all inner streams, as they come.
  • note that events produced from one inner Observable can interleave with events from another.

The use cases of flatMap() mainly fall into the following categories; however check out its other overloads to find more usages.

Asynchronous Lengthy Operations

When the result of map() conversion is an Observable or a lengthy asynchronous operation that occurs on each element of the upstream. Examples of such operations are nonblocking IO operations.

The following example demonstrates a stream of webpages objects that only contain valid URLs that are to be fetched and parsed respectively.

1
2
3
4
5
6
7
8
9
10
return webPages
.flatMap(this::fetchAsync)
.flatMap(this::parseAsync);
Observable<Page> fetchAsync(Page webPage) {
/* fetches page's document */
}
Observable<Page> parseAsync(Page webPage) {
/* parses page's content */
}

or a lengthy operation such as recognizing people from their photos.

1
2
3
4
5
6
7
Observable<Image> photos = getPhotos();
Observable<String> names =
photos.flatMap(this::recognize);
Observable<String> recognize(Image photo) {
/* recognizes name by photo */
}

One-to-Many Transformation:

That is when a single event is mapped into multiple sub-events; like a stream of orders where each order could contain multiple products that can be mapped to.

Consider the following API and imagine that we need to create a stream of items (products) being purchased in order to run some statistical calculations and find the popular products.

1
2
3
4
5
6
7
8
9
10
11
12
13
class OnlinePurchase {
//...
Observable<Order> getOrders() {
/* an observable of orders made so far */
}
//...
}
class Order {
//...
List<Product> getProducts() {/*...*/}
//...
}

Notice the Customer class that has simple getter method List getProducts() returning an Iterable, now let’s see what we need to go through to create such a pipeline:

1
2

Now let’s see how this can be implemented using flatMap() or its counterpart flatMapIterable().

1
2
3
4
5
6
7
8
9
Observable<Product> items =
getOrders()
.map(order -> order.getProducts())
.flatMap(Observable::from);
/* which is equivalen to */
Observable<Product> items =
getOrders()
.flatMapIterable(order -> order.getProducts());
Using Instead of subscribe()

imagine that we can upload a video (while we get notified of how much of it is uploaded) and we need to rate it right after upload is done.
consider the following replacement with flatMap()

1
2
3
4
5
6
7
8
9
10
11
12
13
void store(UUID id) {
upload(id).subscribe(
bytes -> {}, //ignore
e -> log.error("Error", e),
() -> rate(id)
);
}
Observable<Long> upload(UUID id) {
//...
}
Observable<Rating> rate(UUID id) {
//...
}

we could replace the content of store(..) method with the following using the flatMap()

1
2
3
4
5
6
upload(id)
.flatMap(
bytes -> Observable.empty(),
e -> Observable.error(e),
() -> rate(id)
);

we replace the completion notification with another Observable (i.e. rate(id)). So, even if the
original Observable wanted to terminate, we ignore that and in a way append a different
Observable.

Remember that in practice, we do not replace map() and filter() with flatMap() due to the clarity
of code and performance.

the following is another example of flatMap()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable<Sound> toMorseCode(char ch) {
switch(ch) {
case 'a': return just(DI, DAH);
case 'b': return just(DAH, DI, DI, DI);
case 'c': return just(DAH, DI, DAH, DI);
//...
case 'p': return just(DI, DAH, DAH, DI);
case 'r': return just(DI, DAH, DI);
case 's': return just(DI, DI, DI);
case 't': return just(DAH);
//...
default:
return empty();
}
}
enum Sound { DI, DAH }
//...
just('S', 'p', 'a', 'r', 't', 'a')
.map(Character::toLowerCase)
.flatMap(this::toMorseCode);

Note: flatMap() does not preserve the original order of events. In other words,
flatMap() cannot give any guarantee about what order of sub-events will arrive at the downstream operator/subscriber.
In fact, flatMap() subscribes to all sub-streams immediately and merges them together, pushing events downstream whenever any of the inner streams emit anything.

Order of Events After flatMap()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable
.just(DayOfWeek.SUNDAY, DayOfWeek.MONDAY)
.flatMap(this::loadRecordsFor);
Observable<String> loadRecordsFor(DayOfWeek dow) {
switch(dow) {
case SUNDAY:
return Observable
.interval(90, MILLISECONDS)
.take(5)
.map(i -> "Sun-" + i);
case MONDAY:
return Observable
.interval(65, MILLISECONDS)
.take(5)
.map(i -> "Mon-" + i);
//...
}
}

flatMap() instead subscribes to all substreams
immediately and merges them together, pushing events downstream whenever
any of the inner streams emit anything. All subsequences returned from
flatMap() are merged and treated equally; that is, RxJava subscribes to all of them
immediately and pushes events downstream evenly:

Sun-0, Sun-1, Sun-2, Sun-3, Sun-4, Mon-0, Mon-1, Mon-2, Mon-3, Mon-4

Controlling the concurrency of flatMap()

Consider the following example where we load user profiles through HTTP requests.

1
2
3
4
5
6
7
8
9
10
11
12
13
class User {
Observable<Profile> loadProfile() {
//Make HTTP request...
}
}
class Profile {/* ... */}
//...
List<User> veryLargeList = //...
Observable<Profile> profiles = Observable
.from(veryLargeList)
.flatMap(User::loadProfile);

this might look great at first glance since it emits all users pretty much instantaneously. However, subscription to
inner Observable most likely makes a new HTTP connection for each user. Therefore,
if we have, say 10,000 Users, we suddenly triggered 10,000 concurrent HTTP connections hitting the same server and we might end up with:

  • Rejected connections
  • Long wait time and timeouts
  • Crashing the server
  • Hitting rate-limit or blacklisting
  • Overall latency increase
  • Issues on the client, including too many open sockets, threads, excessive memory usage

Increasing concurrency pays off only up to certain point. If you try to run too many
operations concurrently, you will most likely end up with a lot of context switches,
high memory and CPU utilization, and overall performance degradation.

flatMap() has a very simple overloaded version that limits the
total number of concurrent subscriptions to inner streams:

1
flatMap(User::loadProfile, 10);

the maxConcurrent parameter limits the number of background
tasks that are forked from flatMap(). As in our case, when the 11th User appears from upstream, flatMap() will
not even call loadProfile() but instead, it will wait for any ongoing inner streams to complete.

as can be deduced, concatMap(f) is semantically equivalent to flatMap(f, 1)

Preserving Order Using concatMap()

concatMap() operator has the exact same syntax as flatMap() but keeps the order of downstream events so that they align
perfectly with upstream events.
It does not introduce any concurrency whatsoever but it preserves the order of upstream events, avoiding overlapping.

1
2
3
Observable
.just(DayOfWeek.SUNDAY, DayOfWeek.MONDAY)
.concatMap(this::loadRecordsFor);

This time the output is exactly what we anticipated:

Sun-0, Sun-1, Sun-2, Sun-3, Sun-4, Mon-0, Mon-1, Mon-2, Mon-3, Mon-4

concatMap & flatMap Comparison

flatMap() subscribes to all sub-Observables at the same time and uses the merge() operator internally
concatMap() subscribes only to the first underlying Observable and continues with the second one when the first one completes.

merge() & mergeWith()

merge() operator catches all events and aggregates them into a single stream.

1
2
3
4
5
Observable<LicensePlate> all = Observable.merge(
preciseAlgo(photo),
fastAlgo(photo),
experimentalAlgo(photo)
);

as soon as someone subscribes to Observable all, it will automatically subscribe to all
upstream Observables at once. No matter which one emits a value first, it will be forwarded
to the Observer ensuring that events are serialized
(do not overlap), even if underlying streams each emit a value at the same time.

The merge() operator is used extensively when you want to treat multiple sources of
events of the same type as a single source.

errors appearing in any of the underlying Observables will be
eagerly propagated to Observers.
You can use the mergeDelayError() variant of
merge() to postpone any errors until all of the other streams have finished. mergeDe
layError() will even make sure to collect all exceptions, not only the first one, and
encapsulate them in rx.exceptions.CompositeException.

zip() & zipWith()

Zipping is the act of taking two (or more) streams and combining them with each
other in such a way that each element from one stream is paired with corresponding
event from the other.

Therefore, events appear only
when all upstream sources emit an event. This is useful when you want to combine
results from multiple streams that are somehow related to one another.
Or, quite the
contrary, when two independent streams emit values but only combining them
together has business meaning.

1
2
3
Observable.zip(s1, s2, s3...);
s1.zipWith(s2, ...);

example 1:

1
2
3
4
5
6
7
8
9
10
class Weather {
public Weather(Temperature temperature, Wind wind) {
//...
}
}
//...
Observable<Temperature> temperatureMeasurements = station.temperature();
Observable<Wind> windMeasurements = station.wind();
temperatureMeasurements.zipWith(windMeasurements,
(temperature, wind) -> new Weather(temperature, wind));

example 2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable<LocalDate> nextTenDays =
Observable
.range(1, 10)
.map(i -> LocalDate.now().plusDays(i));
Observable<Vacation> possibleVacations = Observable
.just(City.Warsaw, City.London, City.Paris)
.flatMap(city -> nextTenDays.map(date -> new Vacation(city, date)))
.flatMap(vacation ->
Observable.zip(
vacation.weather().filter(Weather::isSunny),
vacation.cheapFlightFrom(City.NewYork),
vacation.cheapHotel(),
(w, f, h) -> vacation
));
class Vacation {
private final City where;
private final LocalDate when;
Vacation(City where, LocalDate when) {
this.where = where;
this.when = when;
}
public Observable<Weather> weather() {/*...*/}
public Observable<Flight> cheapFlightFrom(City from) {/*...*/}
public Observable<Hotel> cheapHotel() {/*...*/}
}

the line (w, f, h) -> vacation basically means that for each vacation, we want to make sure
weather, cheap flight, and hotel are present. If all these conditions are met, we return
vacation instance; otherwise, zip will not invoke our lambda expression at all.

Share Comments