java - hadoop writables NotSerializableException with Apache Spark API -
spark java application throws notserializableexception on hadoop writables.
public final class myapp { public static void main(string[] args) throws exception { if (args.length < 1) { system.err.println("usage: myapp <file>"); system.exit(1); } sparkconf sparkconf = new sparkconf().setappname("myapp").setmaster("local"); javasparkcontext ctx = new javasparkcontext(sparkconf); configuration conf = new configuration(); javapairrdd<longwritable,text> lines = ctx.newapihadoopfile(args[0], textinputformat.class, longwritable.class, text.class, conf); system.out.println( lines.collect().tostring()); ctx.stop(); }
.
java.io.notserializableexception: org.apache.hadoop.io.longwritable serialization stack: - object not serializable (class: org.apache.hadoop.io.longwritable, value: 15227295) - field (class: scala.tuple2, name: _1, type: class java.lang.object) - object (class scala.tuple2, (15227295,)) - element of array (index: 0) - array (class [lscala.tuple2;, size 1153163) @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:38) @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:47) @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:80) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:213) @ java.util.concurrent.threadpoolexecutor.runworker(unknown source) @ java.util.concurrent.threadpoolexecutor$worker.run(unknown source) @ java.lang.thread.run(unknown source) 15/04/26 16:05:05 error tasksetmanager: task 0.0 in stage 0.0 (tid 0) had not serializable result: org.apache.hadoop.io.longwritable serialization stack: - object not serializable (class: org.apache.hadoop.io.longwritable, value: 15227295) - field (class: scala.tuple2, name: _1, type: class java.lang.object) - object (class scala.tuple2, (15227295,)) - element of array (index: 0) - array (class [lscala.tuple2;, size 1153163); not retrying 15/04/26 16:05:05 info taskschedulerimpl: removed taskset 0.0, tasks have completed, pool 15/04/26 16:05:05 info taskschedulerimpl: cancelling stage 0 15/04/26 16:05:05 info dagscheduler: job 0 failed: collect @ parser2.java:60, took 0.460181 s exception in thread "main" org.apache.spark.sparkexception: job aborted due stage failure: task 0.0 in stage 0.0 (tid 0) had not serializable result: org.apache.hadoop.io.longwritable
in spark scala program register hadoop writables below , works fine.
sparkconf.registerkryoclasses(array(classof[org.apache.hadoop.io.longwritable], classof[org.apache.hadoop.io.text]))
apparently approach doesn't work apache spark api
sparkconf.set("spark.serializer", "org.apache.spark.serializer.kryoserializer"); sparkconf.set("spark.kryo.registrator", longwritable.class.getname());
.
exception in thread "main" org.apache.spark.sparkexception: failed register classes kryo @ org.apache.spark.serializer.kryoserializer.newkryo(kryoserializer.scala:101) @ org.apache.spark.serializer.kryoserializerinstance.<init>(kryoserializer.scala:153) @ org.apache.spark.serializer.kryoserializer.newinstance(kryoserializer.scala:115) @ org.apache.spark.broadcast.torrentbroadcast$.blockifyobject(torrentbroadcast.scala:200) @ org.apache.spark.broadcast.torrentbroadcast.writeblocks(torrentbroadcast.scala:101) @ org.apache.spark.broadcast.torrentbroadcast.<init>(torrentbroadcast.scala:84) @ org.apache.spark.broadcast.torrentbroadcastfactory.newbroadcast(torrentbroadcastfactory.scala:34) @ org.apache.spark.broadcast.torrentbroadcastfactory.newbroadcast(torrentbroadcastfactory.scala:29) @ org.apache.spark.broadcast.broadcastmanager.newbroadcast(broadcastmanager.scala:62) @ org.apache.spark.sparkcontext.broadcast(sparkcontext.scala:1051) @ org.apache.spark.rdd.newhadooprdd.<init>(newhadooprdd.scala:77) @ org.apache.spark.sparkcontext.newapihadoopfile(sparkcontext.scala:848) @ org.apache.spark.api.java.javasparkcontext.newapihadoopfile(javasparkcontext.scala:488) @ com.nsn.pmparser.parser2.main(parser2.java:56) caused by: java.lang.classcastexception: org.apache.hadoop.io.longwritable cannot cast org.apache.spark.serializer.kryoregistrator @ org.apache.spark.serializer.kryoserializer$$anonfun$newkryo$3.apply(kryoserializer.scala:97) @ org.apache.spark.serializer.kryoserializer$$anonfun$newkryo$3.apply(kryoserializer.scala:97) @ scala.option.map(option.scala:145) @ org.apache.spark.serializer.kryoserializer.newkryo(kryoserializer.scala:97) ... 13 more
hadoop writables notserializableexception apache spark java api?
as of spark v1.4.0, can use java api register classes serialized using kryo: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sparkconf.html#registerkryoclasses(java.lang.class[]) , passing in array of class objects, each of can obtained using http://docs.oracle.com/javase/7/docs/api/java/lang/class.html#forname(java.lang.string)
such as:
new sparkconf().registerkryoclasses(new class<?>[]{ class.forname("org.apache.hadoop.io.longwritable"), class.forname("org.apache.hadoop.io.text") });
hope helps.
Comments
Post a Comment