Spring Integration Java DSL with a defined IntegrationFlow - missing data in response and mismatched correlationIds -
i using spring integration java dsl defined integrationflow. seeing behavior response missing pieces of data , correlationid in aggregator response not match value in response received calling service.
background: have jmeter performance test running on server uses random data , running @ 600 requests per minute. on laptop, have soapui performance test running hits same server. soapui project sends requests same search criteria (we doing matching) @ rate of 60 requests per minute. responses should contain same result data.
approximately 0.5% of time response returned data missing. in these responses, correlationid of response logged aggregator , correlationid of response logged calling service (logged after response returned calling service , has passed through aggregator) not match.
any idea wrong? please see code snippets below.
@configuration @enableautoconfiguration @import(.....aserviceconfig.class) public class serviceconfig { @bean(name = "inputchannel") public directchannel inputchannel() { return new directchannel(); } @bean(name = "outputchannel") public queuechannel outputchannel() { return new queuechannel(); } @bean(name = "transactionlogger") public ourlogger ourtransactionlogger() { return ourloggerfactory.getlogger("ourapptrx", new ourloggerconfig(ourtransactionloggerkey.values())); } public integrationflow ourflow() { return integrationflows.from(inputchannel()) .split(splitter(ourtransactionlogger())) .channel(messagechannels.executor(getexecutor())) .handle(ourserviceactivator, "service") .aggregate(t -> t.processor(ouraggregator, aggregate)) .channel(outputchannel()) .get(); } @bean(name = "executor") public executor getexecutor() { threadpoolexecutor executor = (threadpoolexecutor) executors.newcachedthreadpool(); executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy()); return executor; } } //snippet calling service public inquiryresponse inquire(inquiryrequest request) { inputchannel.send(messagebuilder.withpayload(request).build()); message<?> msgresponse = outputchannel.receive(); inquiryresponse response = (inquiryresponse) msgresponse.getpayload(); transactionlogger.debug("correlationid + msgresponse.getheaders().get("correlationid")); transactionlogger.debug("inquireyservice inquire response = " + response.tostring()); return response; } //snippet aggregator @aggregator public <t> inquiryresponse aggregate(list<message> serviceresponses) { inquiryresponse response = new inquiryresponse(); serviceresponses.foreach(serviceresponse -> { object payload = serviceresponse.getpayload(); if (payload instanceof amatchresponse) { response.seta(((amatchresponse) payload).geta()); } else if (payload instanceof bvalueresponse) { response.setb(((bvalueresponse) payload).getb()); } else if (payload instanceof berror) { response.setb(new b().addberrorsitem((berror) payload)); } else if (payload instanceof aerror) { response.seta(new a().aerror((aerror) payload)); } else { transactionlogger.warn("unknown message type received. message not aggregated response. ||| model=" + payload.getclass().getname()); } }); transactionlogger.debug("ouraggregator.response = " + response.tostring()); return response; }
Comments
Post a Comment