2014-10-06 65 views
2

我已成功安装Apache Spark,Hadoop,并通过Ubuntu 12.04(单机独立模式)进行逻辑回归。还用小型csv数据集进行测试,但它不适用于具有269369行的大型数据集。OutOfMemoryError,而SparkR中的Logistic回归

library(SparkR) 
sc <- sparkR.init() 
iterations <- as.integer(11) 
D <- 540 

readPartition <- function(part){ 
part = strsplit(part, ",", fixed = T) 
list(matrix(as.numeric(unlist(part)), ncol = length(part[[1]]))) 
} 
w <- runif(n=D, min = -1, max = 1) 

cat("Initial w: ", w, "\n") 

# Compute logistic regression gradient for a matrix of data points 
gradient <- function(partition) { 
    partition = partition[[1]] 
    Y <- partition[, 1] # point labels (first column of input file) 

    X <- partition[, -1] # point coordinates 
    # For each point (x, y), compute gradient function 
    #print(w) 
    dot <- X %*% w  
    logit <- 1/(1 + exp(-Y * dot)) 
    grad <- t(X) %*% ((logit - 1) * Y) 
    list(grad) 
} 


for (i in 1:iterations) { 
    cat("On iteration ", i, "\n") 
    w <- w - reduce(lapplyPartition(points, gradient), "+") 
} 

> points <- cache(lapplyPartition(textFile(sc, "hdfs://localhost:54310/henry/cdata_mr.csv"), readPartition)) 

错误消息我:数据

14/10/07 01:47:16 INFO FileInputFormat: Total input paths to process : 1 
14/10/07 01:47:28 WARN CacheManager: Not enough space to cache partition rdd_23_0 in memory! Free memory is 235841615 bytes. 
14/10/07 01:47:42 WARN CacheManager: Not enough space to cache partition rdd_23_1 in memory! Free memory is 236015334 bytes. 
14/10/07 01:47:55 WARN CacheManager: Not enough space to cache partition rdd_23_2 in memory! Free memory is 236015334 bytes. 
14/10/07 01:48:10 WARN CacheManager: Not enough space to cache partition rdd_23_3 in memory! Free memory is 236015334 bytes. 
14/10/07 01:48:29 ERROR Executor: Exception in task 0.0 in stage 13.0 (TID 17) 
java.lang.OutOfMemoryError: Java heap space 
    at edu.berkeley.cs.amplab.sparkr.RRDD$$anon$2.read(RRDD.scala:144) 
    at edu.berkeley.cs.amplab.sparkr.RRDD$$anon$2.<init>(RRDD.scala:156) 
    at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:129) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) 
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) 
    at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:120) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
    at org.apache.spark.scheduler.Task.run(Task.scala:54) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:701) 
14/10/07 01:48:29 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main] 

尺寸(样品):

data <- read.csv("/home/Henry/data.csv") 

dim(data) 

[1] 269369 541 

我自己也尝试举办了本地文件系统相同的csv文件,以及在HDFS上。我认为它需要更多的Hadoop数据手册来存储大型数据集?如果是的话,我该如何设置Spark Hadoop集群来摆脱这种情况。 (或者我做错了什么)

提示:我认为增加Java和Spark堆空间将帮助我运行此操作。我尝试了很多,但没有成功。任何人都可以知道为两者增加堆空间的方式。

回答

1

您可以尝试将spark.executor.memory设置为更大的值,如文档here?作为信封后计算,假设数据集中的每个条目占用4个字节,则整个文件在内存中的开销将为269369 * 541 * 4 bytes ~= 560MB,该值超过该参数的默认512m值。

举个例子,你可以试试(假设每个工作节点集群中的已超过1GB的内存更多可用):

sc <- sparkR.init("local[2]", "SparkR", "/home/spark", 
        list(spark.executor.memory="1g"))