Andrea Maglie has a nice writeup on how to nicely convert classic observable with listeners to Rx observable. Here is the code snippet:
public Observable<String> observableListenerWrapper() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
ValueUpdateListener listener = new ValueUpdateListener() {
@Override
public void onValueChanged(@NonNull String value) {
if (subscriber.isUnsubscribed()) {
valueHolder.unregisterListener(this);
} else {
subscriber.onNext(value);
}
}
};
valueHolder.registerListener(listener);
}
});
}
where ValueHolder
holds a collection on weak references to listeners:
class ValueHolder {
private final List<WeakReference<ValueUpdateListener>> fObservers = new CopyOnWriteArrayList<>();
private void notifyListeners(String newValue) {
for (WeakReference<ValueUpdateListener> ref : fObservers) {
final var listener = ref.get();
if (listener != null ) {
listener.onValueChanged(newValue);
}
else {
fObservers.remove(ref)
}
}
}
public void addListener(ValueUpdateListener listener) {
fObservers.add(new WeakReference<ValueUpdateListener>(listener));
}
}
After the above code is converted to Kotlin and RxJava 2 we have the following
object Consumer {
private var mDisposable: Disposable? = null
fun initWith(valueHolder: ValueHolder) {
mDisposable = Observable.create<String> { emitter ->
valueHolder.registerListener(object : ValueUpdateListener {
override fun onValueChanged(value: String) {
if (emitter.isDisposed) {
valueHolder.unregisterListener(this)
return
}
emitter.onNext(value);
}
})
}
.subscribe(
{ println(it) },
{ it.printStackTrace() }
)
}
}
which… does not work.
Apparently, the object implementing listener gets garbage collected. Consequently, when ValueHolder
goes thru the list of listeners it notices that weak reference points to nothing and removes it from listener collection.
First attempt to fix this was to extract listener object to a variable in hope it would be captured in emitter lambda.
object Consumer {
private var mDisposable: Disposable? = null
fun initWith(valueHolder: ValueHolder) {
mDisposable = Observable.create<String> { emitter ->
val listener = object : ValueUpdateListener {
override fun onValueChanged(value: String) {
if (emitter.isDisposed) {
valueHolder.unregisterListener(this)
return
}
emitter.onNext(value);
}
}
valueHolder.registerListener(listener)
}
.subscribe(
{ println(it) },
{ it.printStackTrace() }
)
}
}
That did not help. Somehow, the lambda was not capturing the reference to listener. Moving listener object all the way out to singleton corrected the issue, as it survived garbage collection and Rx observable continued to work as expected.
object Consumer {
private var mDisposable: Disposable? = null
private lateinit var mListener: ValueUpdateListener
fun initWith(valueHolder: ValueHolder) {
mDisposable = Observable.create<String> { emitter ->
mListener = object : ValueUpdateListener {
override fun onValueChanged(value: String) {
if (emitter.isDisposed) {
valueHolder.unregisterListener(this)
return
}
emitter.onNext(value);
}
}
valueHolder.registerListener(mListener)
}
.subscribe(
{ println(it) },
{ it.printStackTrace() }
)
}
}
Conclusion
Be cautious when mixing lambdas, Rx and weak references, as the end result can be unexpected.
Updated on 2018-12-23
Added implementation of ValueHolder
.