scala - spark onQueryProgress duplicate -
when monitor structured streaming use streamingquerylistener ,i found duplicate in on onqueryprogress
override def onqueryprogress(queryprogress: queryprogressevent): unit = { if(queryprogress.progress.numinputrows!=0) { println("query made progress: " + queryprogress.progress) }
the result
query made progress: { "id" : "e76a8789-738c-49f6-b7f4-d85356c28600", "runid" : "d8ce0fad-db38-4566-9198-90169efeb2d8", "name" : null, "timestamp" : "2017-08-15t07:28:27.077z", "numinputrows" : 1, "processedrowspersecond" : 0.3050640634533252, "durationms" : { "addbatch" : 2452, "getbatch" : 461, "queryplanning" : 276, "triggerexecution" : 3278 }, "stateoperators" : [ ], "sources" : [ { "description" : "kafkasource[subscribe[test1]]", "startoffset" : { "test1" : { "0" : 19 } }, "endoffset" : { "test1" : { "0" : 20 } }, "numinputrows" : 1, "processedrowspersecond" : 0.3050640634533252 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.foreachsink@3ec8a100" } } query made progress: { "id" : "a5b1f905-5575-43a7-afe9-dead0e4de2a7", "runid" : "8caea640-8772-4aab-ab13-84c1e952fb77", "name" : null, "timestamp" : "2017-08-15t07:28:27.075z", "numinputrows" : 1, "processedrowspersecond" : 0.272108843537415, "durationms" : { "addbatch" : 2844, "getbatch" : 445, "queryplanning" : 293, "triggerexecution" : 3672 }, "stateoperators" : [ ], "sources" : [ { "description" : "kafkasource[subscribe[test1]]", "startoffset" : { "test1" : { "0" : 19 } }, "endoffset" : { "test1" : { "0" : 20 } }, "numinputrows" : 1, "processedrowspersecond" : 0.272108843537415 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.foreachsink@6953f971" } }
why send 1 message ,then has 2 different result.
ps : 1.my main program problem should use spark cal data every particular 5 mins ,like 00:00-00:05, 00:05-00:10 , on. day has 288 point cal 2.so idea use structured streaming filter particular data , not filter store database,and next time read database , structured streaming together
3.so should listening every batches update time read databases .
these 2 events different queries. can see id
, runid
different.
Comments
Post a Comment