java - Zip reactive flow with itself -


i'm using java reactor core, , have reactive flux of objects. each object of flux need external query return 1 different object each input. newly generated flux needs zipped original 1 - items of 2 flux must synchronized , generated in same order.

i'm re-using same flow twice, this:

 flux<myobj> atest = flux.fromiterable(alistofobj);  flux<string> myobjlists = atest.map(o -> myrepository.findbyid(o.name)).map(o -> {   if (!o.ispresent()) {     system.out.println("fallback empty-object");     return "";   }   list<string> l = o.get();   if (l.size() > 1) {     system.out.println("that's bad");   }   return l.get(0); });  flux.zip(atest, myobjlists, (a, b) -> dosomethingwith(a,b)) 

is right way it? if myobjlists emits error, how prevent zip phase skip failing iteration?

i've opted using tuples , optionals (to prevent null-items break flux), don't need re-use initial flux:

flux<tuple<myobj, optional<string>>> myobjlists = flux.fromiterable(alistofobj)   .map(o -> tuples.of(o, optional.ofnullable(myrepository.findbyid(o.name))   .flatmap(t -> {     if (!t.gett2().ispresent()) {       system.out.println("discarding item");       return flux.empty();     }     list<string> l = t.gett2().get();     if (l.size() > 1) {       system.out.println("that's bad");     }     return tuples.of(t.gett1(), l.get(0));   })   .map(t -> dosomethingwith(t.gett1(),t.gett2())) 

note flatmap replaced .map().filter(), removing tuples missing optional items


Comments

Popular posts from this blog

android - InAppBilling registering BroadcastReceiver in AndroidManifest -

python Tkinter Capturing keyboard events save as one single string -

sql server - Why does Linq-to-SQL add unnecessary COUNT()? -