reactive programming - Delay items emission until item is emitted from another observable -


playing rxjava , stumbled upon following problem:

i have 2 different streams:

  • stream items
  • stream (with 1 item) emits transformation information first stream.

so have stream of items , want items combined single item 2nd stream:

----a1----a2----a3----a4----a5----|--------------->

-------------b1--|----------------------------------->

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

------------a1b1-a2b1-a3b1-a4b1-a5b1-------->

it looks similar combilelatest operator, combinelatest ignore items first stream except closest item second stream. means not receive a1b1 - first resulting item emitted gonna a2b1.

i looked @ delay operator, doesn't allow me specify close stream done buffer operatior

is there fancy operator solves problem above?

afaik, there no built-in operator achieve behavior you've described. can implement custom operator or build on top of existing operators. think second option easier implement , here code:

public static <l, r, t> observable<t> zipper(final observable<? extends l> left, final observable<? extends r> right, final func2<? super l, ? super r, ? extends t> function) {     return observable.defer(new func0<observable<t>>() {         @override         public observable<t> call() {             final serialsubscription subscription = new serialsubscription();             final connectableobservable<? extends r> cached = right.replay();              return left.flatmap(new func1<l, observable<t>>() {                 @override                 public observable<t> call(final l valueleft) {                     return cached.map(new func1<r, t>() {                         @override                         public t call(final r valueright) {                             return function.call(valueleft, valueright);                         }                     });                 }             }).doonsubscribe(new action0() {                 @override                 public void call() {                     subscription.set(cached.connect());                 }             }).doonunsubscribe(new action0() {                 @override                 public void call() {                     subscription.unsubscribe();                 }             });         }     }); } 

if have questions regarding code, can explain in details.

update

regarding questing how solution different following one:

left.flatmap(valueleft -> right.map(valueright -> together(valueleft, valueright))); 
  1. parallel execution - in implementation both left , right observables executing in parallel. right observable doesn't have wait left 1 emit first item.
  2. caching - solution subscribes once right observables , caches result. thats why b1 same axxx items. solution provided akarnokd subscribes rightobservable every time left 1 emits item. means:

    • there no guarantee b1 won't change value. example following observable different b each a.

      final observable<double> right = observable.defer(new func0<observable<double>>() {     @override     public observable<double> call() {         return observable.just(math.random());     } }); 
    • if right observable time consuming operation (e.g. network call), have wait completion every time left observable emits new item.


Comments

Popular posts from this blog

apache - PHP Soap issue while content length is larger -

asynchronous - Python asyncio task got bad yield -

javascript - Complete OpenIDConnect auth when requesting via Ajax -