Spring Integration Java DSL with a defined IntegrationFlow - missing data in response and mismatched correlationIds -


i using spring integration java dsl defined integrationflow. seeing behavior response missing pieces of data , correlationid in aggregator response not match value in response received calling service.

background: have jmeter performance test running on server uses random data , running @ 600 requests per minute. on laptop, have soapui performance test running hits same server. soapui project sends requests same search criteria (we doing matching) @ rate of 60 requests per minute. responses should contain same result data.

approximately 0.5% of time response returned data missing. in these responses, correlationid of response logged aggregator , correlationid of response logged calling service (logged after response returned calling service , has passed through aggregator) not match.

any idea wrong? please see code snippets below.

@configuration @enableautoconfiguration @import(.....aserviceconfig.class) public class serviceconfig {  @bean(name = "inputchannel") public directchannel inputchannel() {     return new directchannel(); }  @bean(name = "outputchannel") public queuechannel outputchannel() {     return new queuechannel(); }  @bean(name = "transactionlogger") public ourlogger ourtransactionlogger() {     return ourloggerfactory.getlogger("ourapptrx", new ourloggerconfig(ourtransactionloggerkey.values())); }  public integrationflow ourflow() {      return integrationflows.from(inputchannel())             .split(splitter(ourtransactionlogger()))             .channel(messagechannels.executor(getexecutor()))             .handle(ourserviceactivator, "service")             .aggregate(t -> t.processor(ouraggregator, aggregate))             .channel(outputchannel())             .get(); }  @bean(name = "executor") public executor getexecutor() {     threadpoolexecutor executor = (threadpoolexecutor) executors.newcachedthreadpool();     executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy());     return executor; } } //snippet calling service  public inquiryresponse inquire(inquiryrequest request) {      inputchannel.send(messagebuilder.withpayload(request).build());     message<?> msgresponse = outputchannel.receive();        inquiryresponse response = (inquiryresponse) msgresponse.getpayload();     transactionlogger.debug("correlationid + msgresponse.getheaders().get("correlationid"));     transactionlogger.debug("inquireyservice inquire response = " + response.tostring());      return response; }  //snippet aggregator  @aggregator public <t> inquiryresponse aggregate(list<message> serviceresponses) {     inquiryresponse response = new inquiryresponse();      serviceresponses.foreach(serviceresponse -> {             object payload = serviceresponse.getpayload();              if (payload instanceof amatchresponse) {                 response.seta(((amatchresponse) payload).geta());             } else if (payload instanceof bvalueresponse) {                 response.setb(((bvalueresponse) payload).getb());             } else if (payload instanceof berror) {                  response.setb(new b().addberrorsitem((berror) payload));             } else if (payload instanceof aerror) {                 response.seta(new a().aerror((aerror) payload));             } else {                 transactionlogger.warn("unknown message type received. message not aggregated response. ||| model=" + payload.getclass().getname());             }      });     transactionlogger.debug("ouraggregator.response = " + response.tostring());     return response; }    


Comments

Popular posts from this blog

python Tkinter Capturing keyboard events save as one single string -

android - InAppBilling registering BroadcastReceiver in AndroidManifest -

javascript - Z-index in d3.js -