scala - spark onQueryProgress duplicate -


when monitor structured streaming use streamingquerylistener ,i found duplicate in on onqueryprogress

  override def onqueryprogress(queryprogress: queryprogressevent): unit = {          if(queryprogress.progress.numinputrows!=0) {            println("query made progress: " + queryprogress.progress)          } 

the result

query made progress: {   "id" : "e76a8789-738c-49f6-b7f4-d85356c28600",   "runid" : "d8ce0fad-db38-4566-9198-90169efeb2d8",   "name" : null,   "timestamp" : "2017-08-15t07:28:27.077z",   "numinputrows" : 1,   "processedrowspersecond" : 0.3050640634533252,   "durationms" : {     "addbatch" : 2452,     "getbatch" : 461,     "queryplanning" : 276,     "triggerexecution" : 3278   },   "stateoperators" : [ ],   "sources" : [ {     "description" : "kafkasource[subscribe[test1]]",     "startoffset" : {       "test1" : {         "0" : 19       }     },     "endoffset" : {       "test1" : {         "0" : 20       }     },     "numinputrows" : 1,     "processedrowspersecond" : 0.3050640634533252   } ],   "sink" : {     "description" : "org.apache.spark.sql.execution.streaming.foreachsink@3ec8a100"   } } query made progress: {   "id" : "a5b1f905-5575-43a7-afe9-dead0e4de2a7",   "runid" : "8caea640-8772-4aab-ab13-84c1e952fb77",   "name" : null,   "timestamp" : "2017-08-15t07:28:27.075z",   "numinputrows" : 1,   "processedrowspersecond" : 0.272108843537415,   "durationms" : {     "addbatch" : 2844,     "getbatch" : 445,     "queryplanning" : 293,     "triggerexecution" : 3672   },   "stateoperators" : [ ],   "sources" : [ {     "description" : "kafkasource[subscribe[test1]]",     "startoffset" : {       "test1" : {         "0" : 19       }     },     "endoffset" : {       "test1" : {         "0" : 20       }     },     "numinputrows" : 1,     "processedrowspersecond" : 0.272108843537415   } ],   "sink" : {     "description" : "org.apache.spark.sql.execution.streaming.foreachsink@6953f971"   } } 

why send 1 message ,then has 2 different result.

ps : 1.my main program problem should use spark cal data every particular 5 mins ,like 00:00-00:05, 00:05-00:10 , on. day has 288 point cal 2.so idea use structured streaming filter particular data , not filter store database,and next time read database , structured streaming together
3.so should listening every batches update time read databases .

these 2 events different queries. can see id , runid different.


Comments

Popular posts from this blog

android - InAppBilling registering BroadcastReceiver in AndroidManifest -

python Tkinter Capturing keyboard events save as one single string -

sql server - Why does Linq-to-SQL add unnecessary COUNT()? -