28 December 2019

Fixing combineLatest()

combineLatest() group of methods from RxJava allows combining multiple observable sources to one Observable. When any of the sources emits, combiner function is called, fed with latest item from each source and resulting item is emitted downstream.

What is to fix?

Oddly, combineLatest overloads do not allow specifying scheduler on which combining function is to execute. This means that when any of sources emits, combiner function will be executed on that thread. This is often inconvenient, e.g. when combiner function includes access to network or database and an observable emits on main thread.

Fix

Simplest way to fix this is combine input values into holder class and then use observeOn. This works out of the box for two and three source observables as combineLatest from RxKotlin offers overloads which return Pair and Triple.

If additional tuple class is defined, such as

data class Tuple4<out T1, out T2, out T3, out T4>(val v1: T1, val v2: T2, val v3: T3, val v4: T4)

and matching combine method, such as

fun <T1 : Any, T2 : Any, T3 : Any, T4 : Any> combineLatest(
    o1: Observable<T1>,
    o2: Observable<T2>,
    o3: Observable<T3>,
    o4: Observable<T4>
): Observable<Tuple4<T1, T2, T3, T4>> =
    Observables.combineLatest(o1, o2, o3, o4) { v1, v2, v3, v4 ->
        Tuple4(v1, v2, v3, v4)
    }

then four observables can be combined into one and mapped to new value on desired scheduler:

val observable1 = ...
val observable2 = ...
val observable3 = ...
val observable4 = ...

val transformed : Observable<String> = combineLatest(observable1, observable2, observable3, observable4)
    .observeOn(Schedulers.computation())
    .map { (value1, value2, value3, value4) ->
        "" + value1 + value2 + value3 + value4
    }

Here, map operation will be done on computation scheduler. Note how due to use of destructuring, Tuple4 is never visible in code. Also, by virtue of it being data class, we ensure it plays nicely in cases where instances are compared for equality, such as distinctUntilChanged.

Adding Tuple5 to Tuple9 data classes and corresponding combineLatest overloads allows combining of up to 9 source observables.