Hi, i am following the python instructions from:
https://github.com/yahoo/CaffeOnSpark/wiki/GetStarted_python
and trying to use the python APIs to train models. But when i use the following example command:
pushd ${CAFFE_ON_SPARK}/data/
unzip ${CAFFE_ON_SPARK}/caffe-grid/target/caffeonsparkpythonapi.zip
IPYTHON=1 pyspark --master yarn
--num-executors 1
--driver-library-path "${CAFFE_ON_SPARK}/caffe-grid/target/caffe-grid-0.1-SNAPSHOT-jar-with-dependencies.jar"
--driver-class-path "${CAFFE_ON_SPARK}/caffe-grid/target/caffe-grid-0.1-SNAPSHOT-jar-with-dependencies.jar"
--conf spark.cores.max=1
--conf spark.driver.extraLibraryPath="${LD_LIBRARY_PATH}"
--conf spark.executorEnv.LD_LIBRARY_PATH="${LD_LIBRARY_PATH}"
--py-files ${CAFFE_ON_SPARK}/caffe-grid/target/caffeonsparkpythonapi.zip
--files ${CAFFE_ON_SPARK}/data/caffe/_caffe.so
--jars "${CAFFE_ON_SPARK}/caffe-grid/target/caffe-grid-0.1-SNAPSHOT-jar-with-dependencies.jar"
Then run examples as below, there is a error appeared for the last line:
from pyspark import SparkConf,SparkContext
from com.yahoo.ml.caffe.RegisterContext import registerContext,registerSQLContext
from com.yahoo.ml.caffe.CaffeOnSpark import CaffeOnSpark
from com.yahoo.ml.caffe.Config import Config
from com.yahoo.ml.caffe.DataSource import DataSource
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
registerContext(sc)
registerSQLContext(sqlContext)
cos=CaffeOnSpark(sc,sqlContext)
cfg=Config(sc)
cfg.protoFile='/Users/afeng/dev/ml/CaffeOnSpark/data/lenet_memory_solver.prototxt'
cfg.modelPath = 'file:/tmp/lenet.model'
cfg.devices = 1
cfg.isFeature=True
cfg.label='label'
cfg.features=['ip1']
cfg.outputFormat = 'json'
cfg.clusterSize = 1
cfg.lmdb_partitions=cfg.clusterSize
Train
dl_train_source = DataSource(sc).getSource(cfg,True)
cos.train(dl_train_source) <------------------error happened after call this.
the error message is :
In [41]: cos.train(dl_train_source)
16/04/27 10:44:34 INFO spark.SparkContext: Starting job: collect at CaffeOnSpark.scala:127
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Got job 4 (collect at CaffeOnSpark.scala:127) with 1 output partitions
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Final stage: ResultStage 4 (collect at CaffeOnSpark.scala:127)
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Missing parents: List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[14] at map at CaffeOnSpark.scala:116), which has no missing parents
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 3.2 KB, free 23.9 KB)
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 2.1 KB, free 25.9 KB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on 10.110.53.146:59213 (size: 2.1 KB, free: 511.5 MB)
16/04/27 10:44:34 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (MapPartitionsRDD[14] at map at CaffeOnSpark.scala:116)
16/04/27 10:44:34 INFO cluster.YarnScheduler: Adding task set 4.0 with 1 tasks
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 10, sweet, partition 0,PROCESS_LOCAL, 2169 bytes)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on sweet:46000 (size: 2.1 KB, free: 511.5 MB)
16/04/27 10:44:34 INFO scheduler.DAGScheduler: ResultStage 4 (collect at CaffeOnSpark.scala:127) finished in 0.084 s
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 10) in 84 ms on sweet (1/1)
16/04/27 10:44:34 INFO cluster.YarnScheduler: Removed TaskSet 4.0, whose tasks have all completed, from pool
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Job 4 finished: collect at CaffeOnSpark.scala:127, took 0.092871 s
16/04/27 10:44:34 INFO caffe.CaffeOnSpark: rank = 0, address = null, hostname = sweet
16/04/27 10:44:34 INFO caffe.CaffeOnSpark: rank 0:sweet
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_6 stored as values in memory (estimated size 112.0 B, free 26.0 KB)
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 221.0 B, free 26.3 KB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on 10.110.53.146:59213 (size: 221.0 B, free: 511.5 MB)
16/04/27 10:44:34 INFO spark.SparkContext: Created broadcast 6 from broadcast at CaffeOnSpark.scala:146
16/04/27 10:44:34 INFO spark.SparkContext: Starting job: collect at CaffeOnSpark.scala:155
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Got job 5 (collect at CaffeOnSpark.scala:155) with 1 output partitions
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Final stage: ResultStage 5 (collect at CaffeOnSpark.scala:155)
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Missing parents: List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting ResultStage 5 (MapPartitionsRDD[16] at map at CaffeOnSpark.scala:149), which has no missing parents
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_7 stored as values in memory (estimated size 2.6 KB, free 28.9 KB)
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 1597.0 B, free 30.4 KB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_7_piece0 in memory on 10.110.53.146:59213 (size: 1597.0 B, free: 511.5 MB)
16/04/27 10:44:34 INFO spark.SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:1006
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 5 (MapPartitionsRDD[16] at map at CaffeOnSpark.scala:149)
16/04/27 10:44:34 INFO cluster.YarnScheduler: Adding task set 5.0 with 1 tasks
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 5.0 (TID 11, sweet, partition 0,PROCESS_LOCAL, 2169 bytes)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_7_piece0 in memory on sweet:46000 (size: 1597.0 B, free: 511.5 MB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on sweet:46000 (size: 221.0 B, free: 511.5 MB)
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 5.0 (TID 11) in 48 ms on sweet (1/1)
16/04/27 10:44:34 INFO scheduler.DAGScheduler: ResultStage 5 (collect at CaffeOnSpark.scala:155) finished in 0.049 s
16/04/27 10:44:34 INFO cluster.YarnScheduler: Removed TaskSet 5.0, whose tasks have all completed, from pool
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Job 5 finished: collect at CaffeOnSpark.scala:155, took 0.058122 s
16/04/27 10:44:34 INFO caffe.LmdbRDD: local LMDB path:/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/mnist_train_lmdb
16/04/27 10:44:34 INFO caffe.LmdbRDD: 1 LMDB RDD partitions
16/04/27 10:44:34 INFO spark.SparkContext: Starting job: reduce at CaffeOnSpark.scala:205
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Got job 6 (reduce at CaffeOnSpark.scala:205) with 1 output partitions
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Final stage: ResultStage 6 (reduce at CaffeOnSpark.scala:205)
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Missing parents: List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting ResultStage 6 (MapPartitionsRDD[17] at mapPartitions at CaffeOnSpark.scala:190), which has no missing parents
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_8 stored as values in memory (estimated size 3.4 KB, free 33.8 KB)
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 2.2 KB, free 35.9 KB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on 10.110.53.146:59213 (size: 2.2 KB, free: 511.5 MB)
16/04/27 10:44:34 INFO spark.SparkContext: Created broadcast 8 from broadcast at DAGScheduler.scala:1006
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 6 (MapPartitionsRDD[17] at mapPartitions at CaffeOnSpark.scala:190)
16/04/27 10:44:34 INFO cluster.YarnScheduler: Adding task set 6.0 with 1 tasks
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 6.0 (TID 12, sweet, partition 0,PROCESS_LOCAL, 1992 bytes)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on sweet:46000 (size: 2.2 KB, free: 511.5 MB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added rdd_12_0 on disk on sweet:46000 (size: 26.0 B)
16/04/27 10:44:34 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 6.0 (TID 12, sweet): java.lang.UnsupportedOperationException: empty.reduceLeft
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:167)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.reduce(TraversableOnce.scala:195)
at scala.collection.AbstractIterator.reduce(Iterator.scala:1157)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:199)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:191)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 6.0 (TID 13, sweet, partition 0,PROCESS_LOCAL, 1992 bytes)
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 6.0 (TID 13) on executor sweet: java.lang.UnsupportedOperationException (empty.reduceLeft) [duplicate 1]
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 6.0 (TID 14, sweet, partition 0,PROCESS_LOCAL, 1992 bytes)
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 6.0 (TID 14) on executor sweet: java.lang.UnsupportedOperationException (empty.reduceLeft) [duplicate 2]
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 6.0 (TID 15, sweet, partition 0,PROCESS_LOCAL, 1992 bytes)
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 6.0 (TID 15) on executor sweet: java.lang.UnsupportedOperationException (empty.reduceLeft) [duplicate 3]
16/04/27 10:44:34 ERROR scheduler.TaskSetManager: Task 0 in stage 6.0 failed 4 times; aborting job
16/04/27 10:44:34 INFO cluster.YarnScheduler: Removed TaskSet 6.0, whose tasks have all completed, from pool
16/04/27 10:44:34 INFO cluster.YarnScheduler: Cancelling stage 6
16/04/27 10:44:34 INFO scheduler.DAGScheduler: ResultStage 6 (reduce at CaffeOnSpark.scala:205) failed in 0.117 s
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Job 6 failed: reduce at CaffeOnSpark.scala:205, took 0.124712 s
Py4JJavaError Traceback (most recent call last)
in ()
----> 1 cos.train(dl_train_source)
/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/com/yahoo/ml/caffe/CaffeOnSpark.py in train(self, train_source)
29 :param DataSource: the source for training data
30 """
---> 31 self.dict.get('cos').train(train_source)
32
33 def test(self,test_source):
/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/com/yahoo/ml/caffe/ConversionUtil.py in call(self, _args)
814 for i in self.syms:
815 try:
--> 816 return callJavaMethod(i,self.javaInstance,self._evalDefaults(),self.mirror,_args)
817 except Py4JJavaError:
818 raise
/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/com/yahoo/ml/caffe/ConversionUtil.py in callJavaMethod(sym, javaInstance, defaults, mirror, _args)
617 return javaInstance(__getConvertedTuple(args,sym,defaults,mirror))
618 else:
--> 619 return toPython(javaInstance.getattr(name)(*_getConvertedTuple(args,sym,defaults,mirror)))
620 #It is good for debugging to know whether the argument conversion was successful.
621 #If it was, a Py4JJavaError may be raised from the Java code.
/home/atlas/work/caffe_spark/3rdparty/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in call(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814
815 for temp_arg in temp_args:
/home/atlas/work/caffe_spark/3rdparty/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.pyc in deco(_a, *_kw)
43 def deco(_a, *_kw):
44 try:
---> 45 return f(_a, *_kw)
46 except py4j.protocol.Py4JJavaError as e:
47 s = e.java_exception.toString()
/home/atlas/work/caffe_spark/3rdparty/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(
Py4JJavaError: An error occurred while calling o2122.train.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 15, sweet): java.lang.UnsupportedOperationException: empty.reduceLeft
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:167)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.reduce(TraversableOnce.scala:195)
at scala.collection.AbstractIterator.reduce(Iterator.scala:1157)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:199)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:191)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
at com.yahoo.ml.caffe.CaffeOnSpark.train(CaffeOnSpark.scala:205)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: empty.reduceLeft
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:167)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.reduce(TraversableOnce.scala:195)
at scala.collection.AbstractIterator.reduce(Iterator.scala:1157)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:199)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:191)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
Could you please help me to check what was happened?