我想知道Spark将在哪些情况下作为UDAF函数的一部分执行合并。什么时候合并发生在用户定义的聚合函数中Spark中的UDAF
动机: 我在Spark项目的窗口中使用了很多UDAF函数。我经常想回答这样一个问题:
信用卡交易在同一个国家与30天窗口中的当前交易进行了多少次?
该窗口将从当前事务开始,但不会将其包括在计数中。它需要通过当前交易的价值来了解过去30天内哪个国家/地区的数量。
val rollingWindow = Window
.partitionBy(partitionByColumn)
.orderBy(orderByColumn.desc)
.rangeBetween(0, windowSize)
df.withColumn(
outputColumnName,
customUDAF(inputColumn, orderByColumn).over(rollingWindow))
我写了我的customUDAF来做计数。我总是使用.orderBy(orderByColumn.desc)
并感谢.desc
当前交易在计算期间在窗口中首先显示。
UDAF函数需要实现merge
函数,该函数在并行计算中合并两个中间聚合缓冲区。如果发生任何合并,我的current transaction
对于不同的缓冲区可能会不同,并且UDAF的结果将不正确。
我写了一个UDAF函数来计算我的数据集合并的数量,并且只保留窗口中的第一个事务与当前事务进行比较。
class FirstUDAF() extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
.add("y", StringType)
def bufferSchema = new StructType()
.add("first", StringType)
.add("numMerge", IntegerType)
def dataType = new StructType()
.add("firstCode", StringType)
.add("numMerge", IntegerType)
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = ""
buffer(1) = 1
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (buffer.getString(0) == "")
buffer(0) = input.getString(0)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
}
def evaluate(buffer: Row) = buffer
}
当我在本地主用16个CPU与火花2.0.1运行它,也有从未在窗口中的任何兼并和第一笔交易始终是当前事务。这就是我要的。在不久的将来,我将在x100更大的数据集和真正的分布式Spark群集上运行我的代码,并想知道是否可以在那里发生合并。
问题:
- 在何种情况下/ conditons兼并发生在UDAF?
- 使用orderBy执行Windows有没有兼并?
- 可以告诉Spark不要做兼并吗?
谢谢您的澄清。我接受你的答案。对于你的最后一点,我不知道我明白我该怎么做。你能否详细说明一下?你如何通过窗口聚合?我按用户进行分区,按日期排序并计算窗口中发生的当前事务的国家(当前相对于窗口,例如sql中的current_row)的次数。对于每个交易这个国家是不同的。 –