combineLatest() group of methods from RxJava allows combining multiple observable sources to one Observable
. When any of the sources emits, combiner function is called, fed with latest item from each source and resulting item is emitted downstream.
What is to fix?
Oddly, combineLatest
overloads do not allow specifying scheduler on which combining function is to execute. This means that when any of sources emits, combiner function will be executed on that thread. This is often inconvenient, e.g. when combiner function includes access to network or database and an observable emits on main thread.
Fix
Simplest way to fix this is combine input values into holder class and then use observeOn
. This works out of the box for two and three source observables as combineLatest
from RxKotlin offers overloads which return Pair
and Triple
.
If additional tuple class is defined, such as
data class Tuple4<out T1, out T2, out T3, out T4>(val v1: T1, val v2: T2, val v3: T3, val v4: T4)
and matching combine method, such as
fun <T1 : Any, T2 : Any, T3 : Any, T4 : Any> combineLatest(
o1: Observable<T1>,
o2: Observable<T2>,
o3: Observable<T3>,
o4: Observable<T4>
): Observable<Tuple4<T1, T2, T3, T4>> =
Observables.combineLatest(o1, o2, o3, o4) { v1, v2, v3, v4 ->
Tuple4(v1, v2, v3, v4)
}
then four observables can be combined into one and mapped to new value on desired scheduler:
val observable1 = ...
val observable2 = ...
val observable3 = ...
val observable4 = ...
val transformed : Observable<String> = combineLatest(observable1, observable2, observable3, observable4)
.observeOn(Schedulers.computation())
.map { (value1, value2, value3, value4) ->
"" + value1 + value2 + value3 + value4
}
Here, map
operation will be done on computation scheduler. Note how due to use of destructuring, Tuple4
is never visible in code. Also, by virtue of it being data class, we ensure it plays nicely in cases where instances are compared for equality, such as distinctUntilChanged
.
Adding Tuple5
to Tuple9
data classes and corresponding combineLatest
overloads allows combining of up to 9 source observables.