reactive programming - Delay items emission until item is emitted from another observable -
playing rxjava , stumbled upon following problem:
i have 2 different streams:
- stream items
- stream (with 1 item) emits transformation information first stream.
so have stream of items , want items combined single item 2nd stream:
----a1----a2----a3----a4----a5----|--------------->
-------------b1--|----------------------------------->
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
------------a1b1-a2b1-a3b1-a4b1-a5b1-------->
it looks similar combilelatest operator, combinelatest ignore items first stream except closest item second stream. means not receive a1b1 - first resulting item emitted gonna a2b1.
i looked @ delay operator, doesn't allow me specify close stream done buffer operatior
is there fancy operator solves problem above?
afaik, there no built-in operator achieve behavior you've described. can implement custom operator or build on top of existing operators. think second option easier implement , here code:
public static <l, r, t> observable<t> zipper(final observable<? extends l> left, final observable<? extends r> right, final func2<? super l, ? super r, ? extends t> function) { return observable.defer(new func0<observable<t>>() { @override public observable<t> call() { final serialsubscription subscription = new serialsubscription(); final connectableobservable<? extends r> cached = right.replay(); return left.flatmap(new func1<l, observable<t>>() { @override public observable<t> call(final l valueleft) { return cached.map(new func1<r, t>() { @override public t call(final r valueright) { return function.call(valueleft, valueright); } }); } }).doonsubscribe(new action0() { @override public void call() { subscription.set(cached.connect()); } }).doonunsubscribe(new action0() { @override public void call() { subscription.unsubscribe(); } }); } }); } if have questions regarding code, can explain in details.
update
regarding questing how solution different following one:
left.flatmap(valueleft -> right.map(valueright -> together(valueleft, valueright))); - parallel execution - in implementation both
left,rightobservables executing in parallel.rightobservable doesn't have waitleft1 emit first item. caching - solution subscribes once
rightobservables , caches result. thats whyb1sameaxxxitems. solution provided akarnokd subscribesrightobservable every timeleft1 emits item. means:there no guarantee
b1won't change value. example following observable differentbeacha.final observable<double> right = observable.defer(new func0<observable<double>>() { @override public observable<double> call() { return observable.just(math.random()); } });if
rightobservable time consuming operation (e.g. network call), have wait completion every timeleftobservable emits new item.
Comments
Post a Comment