rx java - RxJava : Consumers of Hot Observables -


i trying grip intricacies of rxjava, hit newbie problem:

i trying create hot observable cold one, , subscribe 2 consumers process pushed events @ different speeds. here code snippet:

connectableobservable<long> ob = observable.interval(200, timeunit.milliseconds)     .publish(); ob.connect();      consumer<long> withsleep = (long t) -> {     system.out.println("second : " + t);     sleep(1); };  consumer<long> nosleep = (long t) -> {     system.out.println("first : " + t); };  sleep(2);  ob.observeon(schedulers.newthread()).subscribe(nosleep);  ob.observeon(schedulers.newthread()).subscribe(withsleep);  sleep(5); 

the sleep(2) see whether observable started firing. , indeed, prints expected.

  1. first : 10
  2. second : 10
  3. first : 11
  4. first : 12
  5. first : 13
  6. first : 14
  7. second : 11
  8. first : 15
  9. first : 16
  10. first : 17

but second consumer (the 1 longer processing time, simulated 1 second sleep), picks event in sequence (output line 7), rather current event (no. 14) expect hot observable. isn't idea of hot observable keep firing, irrespective of subscribers, , subscribers pick whatever pushed @ moment (assuming no specific, explicit backpressure strategy)?

what need change have second consumer pick whatever produced @ moment (i.e. displaying 14 instead of 11 in example above)?

any appreciated.

hot or cold, operators such publish , observeon stay continuous once subscribed, you'll events if processing takes longer emission rate.

to avoid processing old entries in second case, have chain operators drop events , don't buffer either:

ob.toflowable(backpressurestrategy.drop)   .delay(0, timeunit.milliseconds, schedulers.newthread())   .rebatchrequests(1)   .subscribe(withsleep); 

or

ob.toflowable(backpressurestrategy.latest)   .delay(0, timeunit.milliseconds, schedulers.newthread())   .rebatchrequests(1)   .subscribe(withsleep); 

Comments

Popular posts from this blog

python Tkinter Capturing keyboard events save as one single string -

android - InAppBilling registering BroadcastReceiver in AndroidManifest -

javascript - Z-index in d3.js -