2015-11-07 82 views
2

对于Spark和SparkR,我相当新,并且可能有一些基本问题。为什么Window函数(延迟)在SparkR中不起作用?

本练习的目的是在SparkR中实现窗口函数(lead,lag,rank等)。

我称为那里提到下面的链接和Databricks后但没有利用 -

SparkSQL - Lag function?

代码段我使用:

初始化sqlContext并注册该数据帧作为一个临时表使用 Registertemptable

output_data<-SparkR::sql(sqlContext, "select *,lag(type) over(partition by key order by key) as lag_type from input_data") 

错误我们面临的是:

failure: ``union'' expected but `(' found 

另一个建议,我发现是使用一个Hivecontext而非SQLcontext作为SQLcontext可能不允许所有功能。

在该方法中,初始化Hivecontext并试图运行HiveQL 做同样给了我们一个错误说:

cannot find table named input_table 

问:做我们需要运行类似registertemptable所以一些命令至于允许Hivecontext访问表?

saveastable可能是一个选项,但从我读到的内容来看,它将收集S3存储中的数据,而不是将其存储在群集的内存中。

希望对此有所帮助! 谢谢!

+1

人们似乎在这个问题上这么多的问题。他们并不都很清楚。你愿意重温你的问题吗? – eliasah

回答

1

让我们使用freeny数据集准备输入data.frame

ldf <- freeny 

# Extract year and quater 
ldf$yr <- as.integer(rownames(ldf)) 
ldf$qr <- as.integer(4 * (as.numeric(rownames(ldf)) - ldf$yr)) 

# Clean column names 
colnames(ldf) <- gsub("\\.", "_", colnames(ldf)) 

# Drop a couple of things so output fits nicely in the code box 
row.names(ldf) <- NULL 
ldf$market_potential <- NULL 

head(ldf) 


##   y lag_quarterly_revenue price_index income_level yr qr 
## 1 8.79236    8.79636  4.70997  5.82110 1962  1 
## 2 8.79137    8.79236  4.70217  5.82558 1962  2 
## 3 8.81486    8.79137  4.68944  5.83112 1962  3 
## 4 8.81301    8.81486  4.68558  5.84046 1963  0 
## 5 8.90751    8.81301  4.64019  5.85036 1963  1 
## 6 8.93673    8.90751  4.62553  5.86464 1963  2 

我发现另一项建议是使用Hivecontext而非SQLcontext作为SQLcontext可能不允许所有功能。

这是正确的,最先进的功能仅被HiveContext支持,而默认的是SQLContext。首先,你必须确保你的Spark版本已经用Hive支持构建。关于Spark downloads page可用的二进制文件是真的,但是如果您从源代码构建,则一定要使用-Phive标志。

hiveContext <- sparkRHive.init(sc) 
sdf <- createDataFrame(hiveContext, ldf) 
printSchema(sdf) 

## root 
## |-- y: double (nullable = true) 
## |-- lag_quarterly_revenue: double (nullable = true) 
## |-- price_index: double (nullable = true) 
## |-- income_level: double (nullable = true) 
## |-- yr: integer (nullable = true) 
## |-- qr: integer (nullable = true) 

初始化sqlContext和使用Registertemptable

这就是正确的,以及寄存器中的数据帧作为一个临时表。为了能够使用sql命令,您已注册一个表。

registerTempTable(sdf, "sdf") 

请记住,DataFrame绑定到已用于创建它的上下文。

head(tables(hiveContext)) 

## tableName isTemporary 
## 1  sdf  TRUE 

head(tables(sqlContext)) 

## [1] tableName isTemporary 
## <0 rows> (or 0-length row.names) 

最后例如查询:

query <- "SELECT yr, qr, y, lag_quarterly_revenue AS old_lag, 
      LAG(y) OVER (ORDER BY yr, qr) AS new_lag 
      FROM sdf" 

sql(hiveContext, query) 

##  yr qr  y old_lag new_lag 
## 1 1962 1 8.79236 8.79636  NA 
## 2 1962 2 8.79137 8.79236 8.79236 
## 3 1962 3 8.81486 8.79137 8.79137 
## 4 1963 0 8.81301 8.81486 8.81486 
## 5 1963 1 8.90751 8.81301 8.81301 
## 6 1963 2 8.93673 8.90751 8.90751 
+0

thanks @ zero323:问题在于将数据框绑定到SQL上下文,我们使用SQLcontext创建了数据框,但尝试使用Hivecontext滞后,显然无法引用创建的数据框!感谢您对此的及时回应! –

相关问题