scala - Cannot extend Flink ProcessFunction -
i upgraded flink 1.2 flink 1.3, , trying update processfunction work 1.3. have function looking create extends processfunction class, throwing compile error saying not overriding processelement , ontimer here code have:
class timeoutstatefunction extends processfunction[objectnode, (string, long)] { lazy val state : liststate[countwithtimestamp] = getruntimecontext .getliststate(new liststatedescriptor[countwithtimestamp]("mystate", classof[countwithtimestamp])) override def processelement(value: objectnode, ctx: context, out: collector[(string, long)]): unit = { //stuff here } override def ontimer(timestamp: long, ctx: ontimercontext, out: collector[(string, long)]): unit = { //more stuff here } } here compile errors getting:
error:(8, 7) class timeoutstatefunction needs abstract, since method processelement in class processfunction of type (x$1: com.fasterxml.jackson.databind.node.objectnode, x$2: org.apache.flink.streaming.api.functions.processfunction[com.fasterxml.jackson.databind.node.objectnode,(string, long)]#context, x$3: org.apache.flink.util.collector[(string, long)])unit not defined class timeoutstatefunction extends processfunction[objectnode, (string, long)] { error:(17, 18) method processelement overrides nothing. note: super classes of class timeoutstatefunction contain following, non final members named processelement: def processelement(x$1: com.fasterxml.jackson.databind.node.objectnode,x$2: org.apache.flink.streaming.api.functions.processfunction[com.fasterxml.jackson.databind.node.objectnode,(string, long)]#context,x$3: org.apache.flink.util.collector[(string, long)]): unit override def processelement(value: objectnode, ctx: context, out: collector[(string, long)]): unit = { error:(36, 16) method ontimer overrides nothing. note: super classes of class timeoutstatefunction contain following, non final members named ontimer: def ontimer(x$1: long,x$2: org.apache.flink.streaming.api.functions.processfunction[com.fasterxml.jackson.databind.node.objectnode,(string, long)]#ontimercontext,x$3: org.apache.flink.util.collector[(string, long)]): unit override def ontimer(timestamp: long, ctx: ontimercontext, out: collector[(string, long)]): unit = { i using scala 2.11 , flink 1.3.2
the context , ontimercontext depend on processfunction , input , output types.
so should work:
override def processelement( value: objectnode, ctx: processfunction[objectnode, (string, long)]#context, out: collector[(string, long)]) : unit = { //stuff here } override def ontimer( timestamp: long, ctx: processfunction[objectnode, (string, long)]#ontimercontext, out: collector[(string, long)]) : unit = { //more stuff here }
Comments
Post a Comment