rx java - RxJava : Consumers of Hot Observables -
i trying grip intricacies of rxjava, hit newbie problem:
i trying create hot observable cold one, , subscribe 2 consumers process pushed events @ different speeds. here code snippet:
connectableobservable<long> ob = observable.interval(200, timeunit.milliseconds) .publish(); ob.connect(); consumer<long> withsleep = (long t) -> { system.out.println("second : " + t); sleep(1); }; consumer<long> nosleep = (long t) -> { system.out.println("first : " + t); }; sleep(2); ob.observeon(schedulers.newthread()).subscribe(nosleep); ob.observeon(schedulers.newthread()).subscribe(withsleep); sleep(5);
the sleep(2) see whether observable started firing. , indeed, prints expected.
- first : 10
- second : 10
- first : 11
- first : 12
- first : 13
- first : 14
- second : 11
- first : 15
- first : 16
- first : 17
but second consumer (the 1 longer processing time, simulated 1 second sleep), picks event in sequence (output line 7), rather current event (no. 14) expect hot observable. isn't idea of hot observable keep firing, irrespective of subscribers, , subscribers pick whatever pushed @ moment (assuming no specific, explicit backpressure strategy)?
what need change have second consumer pick whatever produced @ moment (i.e. displaying 14 instead of 11 in example above)?
any appreciated.
hot or cold, operators such publish , observeon stay continuous once subscribed, you'll events if processing takes longer emission rate.
to avoid processing old entries in second case, have chain operators drop events , don't buffer either:
ob.toflowable(backpressurestrategy.drop) .delay(0, timeunit.milliseconds, schedulers.newthread()) .rebatchrequests(1) .subscribe(withsleep);
or
ob.toflowable(backpressurestrategy.latest) .delay(0, timeunit.milliseconds, schedulers.newthread()) .rebatchrequests(1) .subscribe(withsleep);
Comments
Post a Comment