java - Zip reactive flow with itself -
i'm using java reactor core, , have reactive flux of objects. each object of flux need external query return 1 different object each input. newly generated flux needs zipped original 1 - items of 2 flux must synchronized , generated in same order.
i'm re-using same flow twice, this:
flux<myobj> atest = flux.fromiterable(alistofobj); flux<string> myobjlists = atest.map(o -> myrepository.findbyid(o.name)).map(o -> { if (!o.ispresent()) { system.out.println("fallback empty-object"); return ""; } list<string> l = o.get(); if (l.size() > 1) { system.out.println("that's bad"); } return l.get(0); }); flux.zip(atest, myobjlists, (a, b) -> dosomethingwith(a,b)) is right way it? if myobjlists emits error, how prevent zip phase skip failing iteration?
i've opted using tuples , optionals (to prevent null-items break flux), don't need re-use initial flux:
flux<tuple<myobj, optional<string>>> myobjlists = flux.fromiterable(alistofobj) .map(o -> tuples.of(o, optional.ofnullable(myrepository.findbyid(o.name)) .flatmap(t -> { if (!t.gett2().ispresent()) { system.out.println("discarding item"); return flux.empty(); } list<string> l = t.gett2().get(); if (l.size() > 1) { system.out.println("that's bad"); } return tuples.of(t.gett1(), l.get(0)); }) .map(t -> dosomethingwith(t.gett1(),t.gett2())) note flatmap replaced .map().filter(), removing tuples missing optional items
Comments
Post a Comment