03 Oct 2018, 16:32

Converting Weak Listener to Rx Observable

Andrea Maglie has a nice writeup on how to nicely convert classic observable with listeners to Rx observable. Here is the code snippet:

public Observable<String> observableListenerWrapper() {
    return Observable.create(new Observable.OnSubscribe<String>() {

        @Override
        public void call(Subscriber<? super String> subscriber) {
            ValueUpdateListener listener = new ValueUpdateListener() {

                @Override
                public void onValueChanged(@NonNull String value) {
                    if (subscriber.isUnsubscribed()) {
                        valueHolder.unregisterListener(this);
                    } else {
                        subscriber.onNext(value);
                    }
                }
            };

            valueHolder.registerListener(listener);
        }
    });
}

where ValueHolder holds a collection on weak references to listeners:

class ValueHolder {
  private final List<WeakReference<ValueUpdateListener>> fObservers = new CopyOnWriteArrayList<>();
  
  private void notifyListeners(String newValue) {
    for (WeakReference<ValueUpdateListener> ref : fObservers) {        
      final var listener = ref.get();
      if (listener != null ) {
        listener.onValueChanged(newValue);
      }
      else {
        fObservers.remove(ref)        
      }
    }
  }
 
  public void addListener(ValueUpdateListener listener) {
    fObservers.add(new WeakReference<ValueUpdateListener>(listener));
  }
}

After the above code is converted to Kotlin and RxJava 2 we have the following

object Consumer {
    private var mDisposable: Disposable? = null

    fun initWith(valueHolder: ValueHolder) {
        mDisposable = Observable.create<String> { emitter ->
            valueHolder.registerListener(object : ValueUpdateListener {

                override fun onValueChanged(value: String) {
                    if (emitter.isDisposed) {
                        valueHolder.unregisterListener(this)
                        return
                    }

                    emitter.onNext(value);
                }
            })
        }
        .subscribe(
            { println(it) },
            { it.printStackTrace() }
        )
    }
}

which… does not work.

Apparently, the object implementing listener gets garbage collected. Consequently, when ValueHolder goes thru the list of listeners it notices that weak reference points to nothing and removes it from listener collection. First attempt to fix this was to extract listener object to a variable in hope it would be captured in emitter lambda.

object Consumer {
    private var mDisposable: Disposable? = null

    fun initWith(valueHolder: ValueHolder) {
        mDisposable = Observable.create<String> { emitter ->
            val listener = object : ValueUpdateListener {

                override fun onValueChanged(value: String) {
                    if (emitter.isDisposed) {
                        valueHolder.unregisterListener(this)
                        return
                    }

                    emitter.onNext(value);
                }
            }

            valueHolder.registerListener(listener)
        }
        .subscribe(
            { println(it) },
            { it.printStackTrace() }
        )
    }
}

That did not help. Somehow, the lambda was not capturing the reference to listener. Moving listener object all the way out to singleton corrected the issue, as it survived garbage collection and Rx observable continued to work as expected.

object Consumer {
    private var mDisposable: Disposable? = null
    private lateinit var mListener: ValueUpdateListener

    fun initWith(valueHolder: ValueHolder) {
        mDisposable = Observable.create<String> { emitter ->
            mListener = object : ValueUpdateListener {

                override fun onValueChanged(value: String) {
                    if (emitter.isDisposed) {
                        valueHolder.unregisterListener(this)
                        return
                    }

                    emitter.onNext(value);
                }
            }

            valueHolder.registerListener(mListener)
        }
        .subscribe(
             { println(it) },
             { it.printStackTrace() }
        )
    }
}

Conclusion

Be cautious when mixing lambdas, Rx and weak references, as the end result can be unexpected.

Updated on 2018-12-23

Added implementation of ValueHolder.