28 Dec 2019, 21:42

Fixing combineLatest()

combineLatest() group of methods from RxJava allows combining multiple observable sources to one Observable. When any of the sources emits, combiner function is called, fed with latest item from each source and resulting item is emitted downstream.

What is to fix?

Oddly, combineLatest overloads do not allow specifying scheduler on which combining function is to execute. This means that when any of sources emits, combiner function will be executed on that thread. This is often inconvenient, e.g. when combiner function includes access to network or database and an observable emits on main thread.

Fix

Simplest way to fix this is combine input values into holder class and then use observeOn. This works out of the box for two and three source observables as combineLatest from RxKotlin offers overloads which return Pair and Triple.

If additional tuple class is defined, such as

data class Tuple4<out T1, out T2, out T3, out T4>(val v1: T1, val v2: T2, val v3: T3, val v4: T4)

and matching combine method, such as

fun <T1 : Any, T2 : Any, T3 : Any, T4 : Any> combineLatest(
    o1: Observable<T1>,
    o2: Observable<T2>,
    o3: Observable<T3>,
    o4: Observable<T4>
): Observable<Tuple4<T1, T2, T3, T4>> =
    Observables.combineLatest(o1, o2, o3, o4) { v1, v2, v3, v4 ->
        Tuple4(v1, v2, v3, v4)
    }

then four observables can be combined into one and mapped to new value on desired scheduler:

val observable1 = ...
val observable2 = ...
val observable3 = ...
val observable4 = ...

val transformed : Observable<String> = combineLatest(observable1, observable2, observable3, observable4)
    .observeOn(Schedulers.computation())
    .map { (value1, value2, value3, value4) ->
        "" + value1 + value2 + value3 + value4
    }

Here, map operation will be done on computation scheduler. Note how due to use of destructuring, Tuple4 is never visible in code. Also, by virtue of it being data class, we ensure it plays nicely in cases where instances are compared for equality, such as distinctUntilChanged.

Adding Tuple5 to Tuple9 data classes and corresponding combineLatest overloads allows combining of up to 9 source observables.

24 Dec 2018, 21:23

Subscribing on Main Thread Might Be Delayed

In Android applications, some (many?) operations tend to be executed asynchronously. Consider the following code:

// in some activity

fun handleClick() {

    mObservable
      .subscribeOn(AndroidSchedulers.mainThread())
      .subscribe({
        toast("Operation finished")
      }, {
        Log.e("tag", it)
      })

    mManager.doAsyncWork()
}

The intent is to initiate some possibly long running work and later notify user that work is completed. doAsyncWork would (possibly indirectly) at some point emit a new item which signals completion on work.

However, there is a bug here.

If doAsyncWork is fast enough, new item will be emitted before subscription is actually connected to observable and so the subscriber code will never receive the item.

Calling subscribeOn(AndroidSchedulers.mainThread()) apparently changes the process of connecting even though it might seem nothing should happen as the code is already executing on the main thread.

03 Oct 2018, 16:32

Converting Weak Listener to Rx Observable

Andrea Maglie has a nice writeup on how to nicely convert classic observable with listeners to Rx observable. Here is the code snippet:

public Observable<String> observableListenerWrapper() {
    return Observable.create(new Observable.OnSubscribe<String>() {

        @Override
        public void call(Subscriber<? super String> subscriber) {
            ValueUpdateListener listener = new ValueUpdateListener() {

                @Override
                public void onValueChanged(@NonNull String value) {
                    if (subscriber.isUnsubscribed()) {
                        valueHolder.unregisterListener(this);
                    } else {
                        subscriber.onNext(value);
                    }
                }
            };

            valueHolder.registerListener(listener);
        }
    });
}

where ValueHolder holds a collection on weak references to listeners:

class ValueHolder {
  private final List<WeakReference<ValueUpdateListener>> fObservers = new CopyOnWriteArrayList<>();
  
  private void notifyListeners(String newValue) {
    for (WeakReference<ValueUpdateListener> ref : fObservers) {        
      final var listener = ref.get();
      if (listener != null ) {
        listener.onValueChanged(newValue);
      }
      else {
        fObservers.remove(ref)        
      }
    }
  }
 
  public void addListener(ValueUpdateListener listener) {
    fObservers.add(new WeakReference<ValueUpdateListener>(listener));
  }
}

After the above code is converted to Kotlin and RxJava 2 we have the following

object Consumer {
    private var mDisposable: Disposable? = null

    fun initWith(valueHolder: ValueHolder) {
        mDisposable = Observable.create<String> { emitter ->
            valueHolder.registerListener(object : ValueUpdateListener {

                override fun onValueChanged(value: String) {
                    if (emitter.isDisposed) {
                        valueHolder.unregisterListener(this)
                        return
                    }

                    emitter.onNext(value);
                }
            })
        }
        .subscribe(
            { println(it) },
            { it.printStackTrace() }
        )
    }
}

which… does not work.

Apparently, the object implementing listener gets garbage collected. Consequently, when ValueHolder goes thru the list of listeners it notices that weak reference points to nothing and removes it from listener collection. First attempt to fix this was to extract listener object to a variable in hope it would be captured in emitter lambda.

object Consumer {
    private var mDisposable: Disposable? = null

    fun initWith(valueHolder: ValueHolder) {
        mDisposable = Observable.create<String> { emitter ->
            val listener = object : ValueUpdateListener {

                override fun onValueChanged(value: String) {
                    if (emitter.isDisposed) {
                        valueHolder.unregisterListener(this)
                        return
                    }

                    emitter.onNext(value);
                }
            }

            valueHolder.registerListener(listener)
        }
        .subscribe(
            { println(it) },
            { it.printStackTrace() }
        )
    }
}

That did not help. Somehow, the lambda was not capturing the reference to listener. Moving listener object all the way out to singleton corrected the issue, as it survived garbage collection and Rx observable continued to work as expected.

object Consumer {
    private var mDisposable: Disposable? = null
    private lateinit var mListener: ValueUpdateListener

    fun initWith(valueHolder: ValueHolder) {
        mDisposable = Observable.create<String> { emitter ->
            mListener = object : ValueUpdateListener {

                override fun onValueChanged(value: String) {
                    if (emitter.isDisposed) {
                        valueHolder.unregisterListener(this)
                        return
                    }

                    emitter.onNext(value);
                }
            }

            valueHolder.registerListener(mListener)
        }
        .subscribe(
             { println(it) },
             { it.printStackTrace() }
        )
    }
}

Conclusion

Be cautious when mixing lambdas, Rx and weak references, as the end result can be unexpected.

Updated on 2018-12-23

Added implementation of ValueHolder.