How to execute Spark 2 action using Oozie 4.3 in AWS EMR -
i'm using aws emr 5.7.0 oozie version 4.3.0 , spark version 2.1.1
i've simple spark program written in scala. working fine when executed shell using spark-submit.
but when i'm trying execute program using oozie spark action, i'm getting errors.
job.properties:
namenode=hdfs://ip-xx-xx-xx-xx.ec2.internal:8020 jobtracker=ip-xx-xx-xx-xx.ec2.internal:8032 master=local oozie.use.system.libpath=true oozie.wf.application.path=hdfs://ip-xx-xx-xx-xx.ec2.internal:8020/test-artifacts/ oozie.action.sharelib.for.spark = spark2
workflow.xml:
<?xml version="1.0" encoding="utf-8"?> <workflow-app xmlns="uri:oozie:workflow:0.5" name="test program"> <start to="spark-node" /> <action name="spark-node"> <spark xmlns="uri:oozie:spark-action:0.2"> <job-tracker>${jobtracker}</job-tracker> <name-node>${namenode}</name-node> <master>${master}</master> <name>spark on oozie - test job</name> <class>testpackage.testobj</class> <jar>/home/hadoop/oozie-test.jar</jar> </spark> <ok to="end" /> <error to="fail" /> </action> <kill name="fail"> <message>workflow failed, error message]</message> </kill> <end name="end" /> </workflow-app>
workflow.xml kept in hdfs , job.properties in master node. when oozie job executed using command "oozie job -oozie http:/ /ip-xx-xx-xx-xx.ec2.internal:11000/oozie -config job.properties -run", map-reduce program started. mapreduce job failing errors without starting spark job.
1) sparkmaster=yarn-cluster , mode=cluster , getting following exception.
log file: /mnt/yarn/usercache/hadoop/appcache/application_1502719828530_0011/container_1502719828530_0011_01_000001/spark-oozie-job_1502719828530_0011.log not present. therefore no hadoop job ids found. failing oozie launcher, main class [org.apache.oozie.action.hadoop.sparkmain], main() threw exception, null java.lang.nullpointerexception @ java.io.file.<init>(file.java:277) @ org.apache.spark.deploy.yarn.client.adddistributeduri$1(client.scala:416) @ org.apache.spark.deploy.yarn.client.org$apache$spark$deploy$yarn$client$$distribute$1(client.scala:454) @ org.apache.spark.deploy.yarn.client$$anonfun$preparelocalresources$11$$anonfun$apply$6.apply(client.scala:580) @ org.apache.spark.deploy.yarn.client$$anonfun$preparelocalresources$11$$anonfun$apply$6.apply(client.scala:579) @ scala.collection.mutable.arrayseq.foreach(arrayseq.scala:74) @ org.apache.spark.deploy.yarn.client$$anonfun$preparelocalresources$11.apply(client.scala:579) @ org.apache.spark.deploy.yarn.client$$anonfun$preparelocalresources$11.apply(client.scala:578) @ scala.collection.immutable.list.foreach(list.scala:381) @ org.apache.spark.deploy.yarn.client.preparelocalresources(client.scala:578) @ org.apache.spark.deploy.yarn.client.createcontainerlaunchcontext(client.scala:814) @ org.apache.spark.deploy.yarn.client.submitapplication(client.scala:169) @ org.apache.spark.deploy.yarn.client.run(client.scala:1091) @ org.apache.spark.deploy.yarn.client$.main(client.scala:1150) @ org.apache.spark.deploy.yarn.client.main(client.scala) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ org.apache.spark.deploy.sparksubmit$.org$apache$spark$deploy$sparksubmit$$runmain(sparksubmit.scala:755) @ org.apache.spark.deploy.sparksubmit$.dorunmain$1(sparksubmit.scala:180) @ org.apache.spark.deploy.sparksubmit$.submit(sparksubmit.scala:205) @ org.apache.spark.deploy.sparksubmit$.main(sparksubmit.scala:119) @ org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala) @ org.apache.oozie.action.hadoop.sparkmain.runspark(sparkmain.java:340) @ org.apache.oozie.action.hadoop.sparkmain.run(sparkmain.java:259) @ org.apache.oozie.action.hadoop.launchermain.run(launchermain.java:60) @ org.apache.oozie.action.hadoop.sparkmain.main(sparkmain.java:80) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ org.apache.oozie.action.hadoop.launchermapper.map(launchermapper.java:234) @ org.apache.hadoop.mapred.maprunner.run(maprunner.java:54) @ org.apache.hadoop.mapred.maptask.runoldmapper(maptask.java:455) @ org.apache.hadoop.mapred.maptask.run(maptask.java:344) @ org.apache.hadoop.mapred.localcontainerlauncher$eventhandler.runsubtask(localcontainerlauncher.java:380) @ org.apache.hadoop.mapred.localcontainerlauncher$eventhandler.runtask(localcontainerlauncher.java:301) @ org.apache.hadoop.mapred.localcontainerlauncher$eventhandler.access$200(localcontainerlauncher.java:187) @ org.apache.hadoop.mapred.localcontainerlauncher$eventhandler$1.run(localcontainerlauncher.java:230) @ java.util.concurrent.executors$runnableadapter.call(executors.java:511) @ java.util.concurrent.futuretask.run(futuretask.java:266) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1149) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:624) @ java.lang.thread.run(thread.java:748)
2) (master=yarn or master=local[*] or master =local[1] or master = local) mode='client' getting error below.
log file: /mnt/yarn/usercache/hadoop/appcache/application_1502719828530_0013/container_1502719828530_0013_01_000001/spark-oozie-job_1502719828530_0013.log not present. therefore no hadoop job ids found. failing oozie launcher, main class [org.apache.oozie.action.hadoop.sparkmain], main() threw exception, no filesystem scheme: org.apache.spark java.io.ioexception: no filesystem scheme: org.apache.spark @ org.apache.hadoop.fs.filesystem.getfilesystemclass(filesystem.java:2708) @ org.apache.hadoop.fs.filesystem.createfilesystem(filesystem.java:2715) @ org.apache.hadoop.fs.filesystem.access$200(filesystem.java:93) @ org.apache.hadoop.fs.filesystem$cache.getinternal(filesystem.java:2751) @ org.apache.hadoop.fs.filesystem$cache.get(filesystem.java:2733) @ org.apache.hadoop.fs.filesystem.get(filesystem.java:377) @ org.apache.spark.deploy.sparksubmit$.downloadfile(sparksubmit.scala:865) @ org.apache.spark.deploy.sparksubmit$$anonfun$downloadfilelist$2.apply(sparksubmit.scala:850) @ org.apache.spark.deploy.sparksubmit$$anonfun$downloadfilelist$2.apply(sparksubmit.scala:850) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:234) @ scala.collection.indexedseqoptimized$class.foreach(indexedseqoptimized.scala:33) @ scala.collection.mutable.arrayops$ofref.foreach(arrayops.scala:186) @ scala.collection.traversablelike$class.map(traversablelike.scala:234) @ scala.collection.mutable.arrayops$ofref.map(arrayops.scala:186) @ org.apache.spark.deploy.sparksubmit$.downloadfilelist(sparksubmit.scala:850) @ org.apache.spark.deploy.sparksubmit$$anonfun$preparesubmitenvironment$2.apply(sparksubmit.scala:317) @ org.apache.spark.deploy.sparksubmit$$anonfun$preparesubmitenvironment$2.apply(sparksubmit.scala:317) @ scala.option.map(option.scala:146) @ org.apache.spark.deploy.sparksubmit$.preparesubmitenvironment(sparksubmit.scala:317) @ org.apache.spark.deploy.sparksubmit$.submit(sparksubmit.scala:153) @ org.apache.spark.deploy.sparksubmit$.main(sparksubmit.scala:119) @ org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala) @ org.apache.oozie.action.hadoop.sparkmain.runspark(sparkmain.java:340) @ org.apache.oozie.action.hadoop.sparkmain.run(sparkmain.java:259) @ org.apache.oozie.action.hadoop.launchermain.run(launchermain.java:60) @ org.apache.oozie.action.hadoop.sparkmain.main(sparkmain.java:80) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ org.apache.oozie.action.hadoop.launchermapper.map(launchermapper.java:234) @ org.apache.hadoop.mapred.maprunner.run(maprunner.java:54) @ org.apache.hadoop.mapred.maptask.runoldmapper(maptask.java:455) @ org.apache.hadoop.mapred.maptask.run(maptask.java:344) @ org.apache.hadoop.mapred.localcontainerlauncher$eventhandler.runsubtask(localcontainerlauncher.java:380) @ org.apache.hadoop.mapred.localcontainerlauncher$eventhandler.runtask(localcontainerlauncher.java:301) @ org.apache.hadoop.mapred.localcontainerlauncher$eventhandler.access$200(localcontainerlauncher.java:187) @ org.apache.hadoop.mapred.localcontainerlauncher$eventhandler$1.run(localcontainerlauncher.java:230) @ java.util.concurrent.executors$runnableadapter.call(executors.java:511) @ java.util.concurrent.futuretask.run(futuretask.java:266) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1149) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:624) @ java.lang.thread.run(thread.java:748)
===============================
from link https: //issues.apache.org/jira/plugins/servlet/mobile#issue/oozie-2767 , seems spark2 action not yet supported oozie.
but based on link https://docs.hortonworks.com/hdpdocuments/hdp2/hdp-2.6.1/bk_spark-component-guide/content/ch_oozie-spark-action.html, there seems workarounds.
along hortonworks link, i've followed steps mentioned @ https://aws.amazon.com/blogs/big-data/use-apache-oozie-workflows-to-automate-apache-spark-jobs-and-more-on-amazon-emr/
but no luck far. couldn't find documentation confirms oozie + spark 2 either supported or not supported. if worked anyone, please provide detailed steps on how oozie + spark2 work in aws emr.
Comments
Post a Comment