Spark-sql java.net.NoRouteToHostException on cluster reboot

We had a EMR cluster reboot and hit this error all of sudden. The error is independent of EMR so worth sharing. Error: Caused by: java.net.NoRouteToHostException: No route to host at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495) at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:614) at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:712) at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375) at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528) at org.apache.hadoop.ipc.Client.call(Client.java:1451) … 56 more java.net.NoRouteToHostException: …

More

Spark append mode for partitioned text file fails with SaveMode.Append – IOException File already Exists

Code- dataDF.write.partitionBy(“year”, “month”, “date”).mode(SaveMode.Append).text(“s3://data/test2/events/”) Error- 16/07/06 02:15:05 ERROR datasources.DynamicPartitionWriterContainer: Aborting task. java.io.IOException: File already exists:s3://path/1839dd1ed38a.gz at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:614) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:894) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:791) at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:177) at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135) at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.<init>(DefaultSource.scala:156) at org.apache.spark.sql.execution.datasources.text.TextRelation$$anon$1.newInstance(DefaultSource.scala:125) at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129) at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.newOutputWriter$1(WriterContainer.scala:424) at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:356) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/07/06 02:15:05 …

More

How to write gzip compressed Json in spark data frame

A compressed format can be specified in spark as : conf = SparkConf() conf.set(“spark.hadoop.mapred.output.compress”, “true”) conf.set(“spark.hadoop.mapred.output.compression.codec”, “true”) conf.set(“spark.hadoop.mapred.output.compression.codec”, “org.apache.hadoop.io.compress.GzipCodec”) conf.set(“spark.hadoop.mapred.output.compression.type”, “BLOCK”) The same can be provided to spark shell as: $> spark-shel –conf spark.hadoop.mapred.output.compress=true –conf spark.hadoop.mapred.output.compression.codec=true –conf spark.hadoop.mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec –conf spark.hadoop.mapred.output.compression.type=BLOCK The code for writing the Json/Text is same as usual- case class C(key: String, value: String) val list = …

More

Minimal Spark hello world

1. Build Sbt Create a build.sbt file. This manages all dependencies and stuffs that would had been in your pom file- import AssemblyKeys._ import sbtassembly.Plugin._ name := “FeedSystem” version := “1.0” scalaVersion := “2.10.5” organization := “com.snapdeal” resolvers += “Typesafe Repo” at “http://repo.typesafe.com/typesafe/releases/” libraryDependencies ++= Seq(“org.apache.spark” % “spark-core_2.10” % “1.3.1” % “provided”, “org.apache.spark” % “spark-mllib_2.10” …

More