java - Zip reactive flow with itself -
i'm using java reactor core, , have reactive flux
of objects. each object of flux need external query return 1 different object each input. newly generated flux
needs zipped original 1 - items of 2 flux must synchronized , generated in same order.
i'm re-using same flow twice, this:
flux<myobj> atest = flux.fromiterable(alistofobj); flux<string> myobjlists = atest.map(o -> myrepository.findbyid(o.name)).map(o -> { if (!o.ispresent()) { system.out.println("fallback empty-object"); return ""; } list<string> l = o.get(); if (l.size() > 1) { system.out.println("that's bad"); } return l.get(0); }); flux.zip(atest, myobjlists, (a, b) -> dosomethingwith(a,b))
is right way it? if myobjlists
emits error, how prevent zip
phase skip failing iteration?
i've opted using tuple
s , optional
s (to prevent null-items break flux), don't need re-use initial flux
:
flux<tuple<myobj, optional<string>>> myobjlists = flux.fromiterable(alistofobj) .map(o -> tuples.of(o, optional.ofnullable(myrepository.findbyid(o.name)) .flatmap(t -> { if (!t.gett2().ispresent()) { system.out.println("discarding item"); return flux.empty(); } list<string> l = t.gett2().get(); if (l.size() > 1) { system.out.println("that's bad"); } return tuples.of(t.gett1(), l.get(0)); }) .map(t -> dosomethingwith(t.gett1(),t.gett2()))
note flatmap
replaced .map().filter()
, removing tuples missing optional items
Comments
Post a Comment