2017-02-13 55 views
1

我有CSV格式的文件,其中包含与列“ID”,“时间戳”,“行动”,“价值”和“位置”的表。 我想一个函数应用于表中的每一行,我已经写在R上的代码如下:如何将函数应用于SparkR中的每一行?

user <- read.csv(file_path,sep = ";") 
num <- nrow(user) 
curLocation <- "1" 
for(i in 1:num) { 
    row <- user[i,] 
    if(user$action != "power") 
     curLocation <- row$value 
    user[i,"location"] <- curLocation 
} 

将R脚本正常工作,现在我想将其应用SparkR。但是,我无法直接访问SparkR中的第i行,并且找不到任何操作SparkR documentation中的每一行的函数。

我应以实现如在R脚本同样的效果使用哪种方法?

此外,作为@chateaur建议,我尝试使用dapply功能如下的代码:

curLocation <- "1" 
schema <- structType(structField("Sequence","integer"), structField("ID","integer"), structField("Timestamp","timestamp"), structField("Action","string"), structField("Value","string"), structField("Location","string")) 
setLocation <- function(row, curLoc) { 
    if(row$Action != "power|battery|level"){ 
     curLoc <- row$Value 
    } 
    row$Location <- curLoc 
} 
bw <- dapply(user, function(row) { setLocation(row, curLocation)}, schema) 
head(bw) 

然后,我得到了一个错误: error message

我抬头警告消息的条件具有长度> 1且仅第一个元素将被用来和我发现一些https://stackoverflow.com/a/29969702/4942713。这让我不知道在dapply功能参数是否代表我的数据帧,而不是一个单列的整个分区?可能功能不是一个理想的解决方案?

后来,我试图通过@chateaur作为建议修改功能。除了使用dapply的,我用dapplyCollect从而节省了我指定模式的努力。有用!

changeLocation <- function(partitionnedDf) { 
    nrows <- nrow(partitionnedDf) 
    curLocation <- "1" 
    for(i in 1:nrows){ 
     row <- partitionnedDf[i,] 
     if(row$action != "power") { 
      curLocation <- row$value 
     } 
    partitionnedDf[i,"location"] <- curLocation 
    } 
    partitionnedDf 
} 

bw <- dapplyCollect(user, changeLocation) 
+0

您可以使用sparklyr(相同的语法比dplyr ) –

+0

@DimitriPetrenko如果我需要使用SparkR,该怎么办? SparkR能达到这个效果吗? – Scorpion775

回答

2

Scorpion775,

你应该分享您的sparkR代码。不要忘记,R和sparkR中的数据操作方式不一样。

来源:http://spark.apache.org/docs/latest/sparkr.html

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA") 

然后你可以看一下dapply功能在这里:https://spark.apache.org/docs/2.1.0/api/R/dapply.html

这里是一个工作示例:

changeLocation <- function(partitionnedDf) { 
    nrows <- nrow(partitionnedDf) 
    curLocation <- as.integer(1) 

    # Loop over each row of the partitionned data frame 
    for(i in 1:nrows){ 
     row <- partitionnedDf[i,] 

     if(row[1] != "power") { 
      curLocation <- row[2] 
     } 
     partitionnedDf[i,3] <- curLocation 
    } 

    # Return modified data frame 
    partitionnedDf 
} 

# Load data 
df <- read.df("data.csv", "csv", header="false", inferSchema = "true") 

head(collect(df)) 

# Define schema of dataframe 
schema <- structType(structField("action", "string"), structField("value", "integer"), 
        structField("location", "integer")) 

# Change location of each row      
df2 <- dapply(df, changeLocation, schema) 

head(df2) 
+0

我接过一看dapply功能,并发现它是用于“应用** **功能的SparkDataFrame的每个分区”。根据我的理解,_partition_与_row_无关。我担心的是,我不知道如何编写**函数**以应用于SparkDataFrame。目前我只知道如何实现**函数**我想在R中但不在SparkR中。你能给我一些建议吗? – Scorpion775

+0

我不是一个火花专家,但我认为分区数据分散到整个集群中。你可以尝试一下上面的例子,告诉我它是否适合你的需要? – chateaur

+0

谢谢你的建议。我试图按照你的指示,但得到了一个错误,如问题所示。 – Scorpion775

相关问题