Spark job blocked by updatedependencies -
i ran spark job in yarn, blocked bad network here exceptions:
「 :at sun.nio.ch.filedispatcherimpl.read0(native method) at sun.nio.ch.filedispatcherimpl.read(filedispatcherimpl.java:46) at sun.nio.ch.ioutil.readintonativebuffer(ioutil.java:223) at sun.nio.ch.ioutil.read(ioutil.java:197) at sun.nio.ch.sourcechannelimpl.read(sourcechannelimpl.java:167) at org.apache.spark.rpc.netty.nettyrpcenv$filedownloadchannel$$anonfun$1.apply$mci$sp(nettyrpcenv.scala:371) at org.apache.spark.rpc.netty.nettyrpcenv$filedownloadchannel$$anonfun$1.apply(nettyrpcenv.scala:371) at org.apache.spark.rpc.netty.nettyrpcenv$filedownloadchannel$$anonfun$1.apply(nettyrpcenv.scala:371) at scala.util.try$.apply(try.scala:192) at org.apache.spark.rpc.netty.nettyrpcenv$filedownloadchannel.read(nettyrpcenv.scala:371) at sun.nio.ch.channelinputstream.read(channelinputstream.java:65) at sun.nio.ch.channelinputstream.read(channelinputstream.java:109) at sun.nio.ch.channelinputstream.read(channelinputstream.java:103) at java.io.inputstream.read(inputstream.java:101) at org.apache.spark.util.utils$$anonfun$copystream$1.apply$mcj$sp(utils.scala:354) at org.apache.spark.util.utils$$anonfun$copystream$1.apply(utils.scala:322) at org.apache.spark.util.utils$$anonfun$copystream$1.apply(utils.scala:322) at org.apache.spark.util.utils$.trywithsafefinally(utils.scala:1303) at org.apache.spark.util.utils$.copystream(utils.scala:362) at org.apache.spark.util.utils$.downloadfile(utils.scala:509) at org.apache.spark.util.utils$.dofetchfile(utils.scala:639) at org.apache.spark.util.utils$.fetchfile(utils.scala:450) at org.apache.spark.executor.executor$$anonfun$org$apache$spark$executor$executor$$updatedependencies$5.apply(executor.scala:659) at org.apache.spark.executor.executor$$anonfun$org$apache$spark$executor$executor$$updatedependencies$5.apply(executor.scala:651) at scala.collection.traversablelike$withfilter$$anonfun$foreach$1.apply(traversablelike.scala:733) at scala.collection.mutable.hashmap$$anonfun$foreach$1.apply(hashmap.scala:99) at scala.collection.mutable.hashmap$$anonfun$foreach$1.apply(hashmap.scala:99) at scala.collection.mutable.hashtable$class.foreachentry(hashtable.scala:230) at scala.collection.mutable.hashmap.foreachentry(hashmap.scala:40) at scala.collection.mutable.hashmap.foreach(hashmap.scala:99) at scala.collection.traversablelike$withfilter.foreach(traversablelike.scala:732) at org.apache.spark.executor.executor.org$apache$spark$executor$executor$$updatedependencies(executor.scala:651) at org.apache.spark.executor.executor$taskrunner.run(executor.scala:297) at java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) at java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) at java.lang.thread.run(thread.java:748) 」 --------
as inputstream.read() block method. want add read timeout filedownloadchannel.
override def openchannel(uri: string): readablebytechannel = { val parseduri = new uri(uri) require(parseduri.gethost() != null, "host name must defined.") require(parseduri.getport() > 0, "port must defined.") require(parseduri.getpath() != null && parseduri.getpath().nonempty, "path must defined.") val pipe = pipe.open() val source = new filedownloadchannel(pipe.source()) try { val client = downloadclient(parseduri.gethost(), parseduri.getport()) val callback = new filedownloadcallback(pipe.sink(), source, client) client.stream(parseduri.getpath(), callback) } catch { case e: exception => pipe.sink().close() source.close() throw e } source }
any suggestions or ideas??
Comments
Post a Comment