scala - Cannot extend Flink ProcessFunction -


i upgraded flink 1.2 flink 1.3, , trying update processfunction work 1.3. have function looking create extends processfunction class, throwing compile error saying not overriding processelement , ontimer here code have:

class timeoutstatefunction extends processfunction[objectnode, (string, long)] {     lazy val state : liststate[countwithtimestamp] = getruntimecontext       .getliststate(new liststatedescriptor[countwithtimestamp]("mystate", classof[countwithtimestamp]))       override def processelement(value: objectnode, ctx: context, out: collector[(string, long)]): unit = {          //stuff here      }       override def ontimer(timestamp: long, ctx: ontimercontext, out: collector[(string, long)]): unit = {          //more stuff here      } } 

here compile errors getting:

error:(8, 7) class timeoutstatefunction needs abstract, since method processelement in class processfunction of type (x$1: com.fasterxml.jackson.databind.node.objectnode, x$2: org.apache.flink.streaming.api.functions.processfunction[com.fasterxml.jackson.databind.node.objectnode,(string, long)]#context, x$3: org.apache.flink.util.collector[(string, long)])unit not defined class timeoutstatefunction extends processfunction[objectnode, (string, long)] {  error:(17, 18) method processelement overrides nothing. note: super classes of class timeoutstatefunction contain following, non final members named processelement: def processelement(x$1: com.fasterxml.jackson.databind.node.objectnode,x$2: org.apache.flink.streaming.api.functions.processfunction[com.fasterxml.jackson.databind.node.objectnode,(string, long)]#context,x$3: org.apache.flink.util.collector[(string, long)]): unit   override def processelement(value: objectnode, ctx: context, out: collector[(string, long)]): unit = {  error:(36, 16) method ontimer overrides nothing. note: super classes of class timeoutstatefunction contain following, non final members named ontimer: def ontimer(x$1: long,x$2: org.apache.flink.streaming.api.functions.processfunction[com.fasterxml.jackson.databind.node.objectnode,(string, long)]#ontimercontext,x$3: org.apache.flink.util.collector[(string, long)]): unit    override def ontimer(timestamp: long, ctx: ontimercontext, out: collector[(string, long)]): unit = { 

i using scala 2.11 , flink 1.3.2

the context , ontimercontext depend on processfunction , input , output types.

so should work:

override def processelement(      value: objectnode,      ctx: processfunction[objectnode, (string, long)]#context,      out: collector[(string, long)])    : unit = {      //stuff here  }   override def ontimer(      timestamp: long,      ctx: processfunction[objectnode, (string, long)]#ontimercontext,      out: collector[(string, long)])    : unit = {      //more stuff here  } 

Comments

Popular posts from this blog

PHP and MySQL WP -

android - InAppBilling registering BroadcastReceiver in AndroidManifest -

go - golang pprof for c library code -