reactive programming - Delay items emission until item is emitted from another observable -
playing rxjava , stumbled upon following problem:
i have 2 different streams:
- stream items
- stream (with 1 item) emits transformation information first stream.
so have stream of items , want items combined single item 2nd stream:
----a1----a2----a3----a4----a5----|--------------->
-------------b1--|----------------------------------->
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
------------a1b1-a2b1-a3b1-a4b1-a5b1-------->
it looks similar combilelatest
operator, combinelatest ignore items first stream except closest item second stream. means not receive a1b1
- first resulting item emitted gonna a2b1
.
i looked @ delay
operator, doesn't allow me specify close
stream done buffer
operatior
is there fancy operator solves problem above?
afaik, there no built-in operator achieve behavior you've described. can implement custom operator or build on top of existing operators. think second option easier implement , here code:
public static <l, r, t> observable<t> zipper(final observable<? extends l> left, final observable<? extends r> right, final func2<? super l, ? super r, ? extends t> function) { return observable.defer(new func0<observable<t>>() { @override public observable<t> call() { final serialsubscription subscription = new serialsubscription(); final connectableobservable<? extends r> cached = right.replay(); return left.flatmap(new func1<l, observable<t>>() { @override public observable<t> call(final l valueleft) { return cached.map(new func1<r, t>() { @override public t call(final r valueright) { return function.call(valueleft, valueright); } }); } }).doonsubscribe(new action0() { @override public void call() { subscription.set(cached.connect()); } }).doonunsubscribe(new action0() { @override public void call() { subscription.unsubscribe(); } }); } }); }
if have questions regarding code, can explain in details.
update
regarding questing how solution different following one:
left.flatmap(valueleft -> right.map(valueright -> together(valueleft, valueright)));
- parallel execution - in implementation both
left
,right
observables executing in parallel.right
observable doesn't have waitleft
1 emit first item. caching - solution subscribes once
right
observables , caches result. thats whyb1
sameaxxx
items. solution provided akarnokd subscribesright
observable every timeleft
1 emits item. means:there no guarantee
b1
won't change value. example following observable differentb
eacha
.final observable<double> right = observable.defer(new func0<observable<double>>() { @override public observable<double> call() { return observable.just(math.random()); } });
if
right
observable time consuming operation (e.g. network call), have wait completion every timeleft
observable emits new item.
Comments
Post a Comment