28 Dec 2019, 21:42

Fixing combineLatest()

combineLatest() group of methods from RxJava allows combining multiple observable sources to one Observable. When any of the sources emits, combiner function is called, fed with latest item from each source and resulting item is emitted downstream.

What is to fix?

Oddly, combineLatest overloads do not allow specifying scheduler on which combining function is to execute. This means that when any of sources emits, combiner function will be executed on that thread. This is often inconvenient, e.g. when combiner function includes access to network or database and an observable emits on main thread.

Fix

Simplest way to fix this is combine input values into holder class and then use observeOn. This works out of the box for two and three source observables as combineLatest from RxKotlin offers overloads which return Pair and Triple.

If additional tuple class is defined, such as

data class Tuple4<out T1, out T2, out T3, out T4>(val v1: T1, val v2: T2, val v3: T3, val v4: T4)

and matching combine method, such as

fun <T1 : Any, T2 : Any, T3 : Any, T4 : Any> combineLatest(
    o1: Observable<T1>,
    o2: Observable<T2>,
    o3: Observable<T3>,
    o4: Observable<T4>
): Observable<Tuple4<T1, T2, T3, T4>> =
    Observables.combineLatest(o1, o2, o3, o4) { v1, v2, v3, v4 ->
        Tuple4(v1, v2, v3, v4)
    }

then four observables can be combined into one and mapped to new value on desired scheduler:

val observable1 = ...
val observable2 = ...
val observable3 = ...
val observable4 = ...

val transformed : Observable<String> = combineLatest(observable1, observable2, observable3, observable4)
    .observeOn(Schedulers.computation())
    .map { (value1, value2, value3, value4) ->
        "" + value1 + value2 + value3 + value4
    }

Here, map operation will be done on computation scheduler. Note how due to use of destructuring, Tuple4 is never visible in code. Also, by virtue of it being data class, we ensure it plays nicely in cases where instances are compared for equality, such as distinctUntilChanged.

Adding Tuple5 to Tuple9 data classes and corresponding combineLatest overloads allows combining of up to 9 source observables.

24 Dec 2018, 21:23

Subscribing on Main Thread Might Be Delayed

In Android applications, some (many?) operations tend to be executed asynchronously. Consider the following code:

// in some activity

fun handleClick() {

    mObservable
      .subscribeOn(AndroidSchedulers.mainThread())
      .subscribe({
        toast("Operation finished")
      }, {
        Log.e("tag", it)
      })

    mManager.doAsyncWork()
}

The intent is to initiate some possibly long running work and later notify user that work is completed. doAsyncWork would (possibly indirectly) at some point emit a new item which signals completion on work.

However, there is a bug here.

If doAsyncWork is fast enough, new item will be emitted before subscription is actually connected to observable and so the subscriber code will never receive the item.

Calling subscribeOn(AndroidSchedulers.mainThread()) apparently changes the process of connecting even though it might seem nothing should happen as the code is already executing on the main thread.