2017-08-08 63 views
3

编程指南指出,结构化流式传输可以使用适当的源/接收器确保一次端到端的语义。结构化流式传输:水印与准确一次语义

但是,我不明白这是如何工作,当工作崩溃,我们有一个水印应用。

下面是我目前如何想象它的工作原理的一个例子,请纠正我误解任何一点。提前致谢!

实施例:

火花工作:计算在每个1小时窗口#事件,1小时水印。

消息:

  • A - 时间戳10.00-16.00
  • 乙 - 时间戳上午10点10
  • Ç - 时间戳上午10点二十○
  • X - 时间戳下午12点
  • ý - 时间戳下午12时五十
  • Z - 时间戳8pm

我们开始工作,从源头读取A,B,C,并在上午10:30崩溃,然后再将它们写入我们的接收器。

在下午6点,作业返回并知道使用保存的检查点/ WAL重新处理A,B,C。 10-11am窗口的最终计数为3。

接下来,由于它们属于不同的分区,它将并行读取来自Kafka,X,Y,Z的新消息。 Z首先被处理,所以最大事件时间戳被设置为晚上8点。当作业读取X和Y时,它们现在位于水印的后面(下午8点--1小时= 7点),因此它们会作为旧数据丢弃。最后的计数是8点至9点的1点,并且该工作在12点至1点的窗口中没有报告任何内容。我们已经失去了对数据X和Y.

---结束例子---

是这种情况是否准确? 如果是这样,当从kafka-> spark正常流动时,1小时水印可能足以处理延迟/乱序数据,但当火花作业停机/ kafka连接长时间丢失时,不会处理延迟/乱序数据。避免数据丢失的唯一方法是使用水印的时间比您期望的工作时间更长吗?

+0

据我的理解,Spark会通过水印字段对传入数据进行排序,所以Z会被最后看到。 – nonsleepr

+0

有没有参考?据我所知,Spark会并行读取来自不同Kafka分区的数据,它只能在一个分区内对数据进行串行处理。 –

+0

我上面的假设是错误的:被我的排序的(比例更复杂的)作业的执行计划所误导。 – nonsleepr

回答

3

水印在minibatch期间是固定值。在你的例子中,由于X,Y和Z在同一个小批中处理,因此用于这些记录的水印将是上午9:20。在完成之后,minibatch水印将更新至晚上7点。

下面从design doc的报价为特征SPARK-18124它实现水印功能:

要计算我们基于触发器执行的降边界,我们必须做到以下几点。

  • 在每次触发,而汇总的数据,我们也扫描的事件时间的触发数据
  • 触发完成后的最高值,计算水印=触发前MAX(活动时间,最多不超过活动时间触发) - 门

大概模拟会比较说明:

import org.apache.hadoop.fs.Path 
import java.sql.Timestamp 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.streaming.ProcessingTime 

val dir = new Path("/tmp/test-structured-streaming") 
val fs = dir.getFileSystem(sc.hadoopConfiguration) 
fs.mkdirs(dir) 

val schema = StructType(StructField("vilue", StringType) :: 
         StructField("timestamp", TimestampType) :: 
         Nil) 

val eventStream = spark 
    .readStream 
    .option("sep", ";") 
    .option("header", "false") 
    .schema(schema) 
    .csv(dir.toString) 

// Watermarked aggregation 
val eventsCount = eventStream 
    .withWatermark("timestamp", "1 hour") 
    .groupBy(window($"timestamp", "1 hour")) 
    .count 

def writeFile(path: Path, data: String) { 
    val file = fs.create(path) 
    file.writeUTF(data) 
    file.close() 
} 

// Debug query 
val query = eventsCount.writeStream 
    .format("console") 
    .outputMode("complete") 
    .option("truncate", "false") 
    .trigger(ProcessingTime("5 seconds")) 
    .start() 

writeFile(new Path(dir, "file1"), """ 
    |A;2017-08-09 10:00:00 
    |B;2017-08-09 10:10:00 
    |C;2017-08-09 10:20:00""".stripMargin) 

query.processAllAvailable() 
val lp1 = query.lastProgress 

// ------------------------------------------- 
// Batch: 0 
// ------------------------------------------- 
// +---------------------------------------------+-----+ 
// |window          |count| 
// +---------------------------------------------+-----+ 
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3 | 
// +---------------------------------------------+-----+ 

// lp1: org.apache.spark.sql.streaming.StreamingQueryProgress = 
// { 
// ... 
// "numInputRows" : 3, 
// "eventTime" : { 
//  "avg" : "2017-08-09T10:10:00.000Z", 
//  "max" : "2017-08-09T10:20:00.000Z", 
//  "min" : "2017-08-09T10:00:00.000Z", 
//  "watermark" : "1970-01-01T00:00:00.000Z" 
// }, 
// ... 
// } 


writeFile(new Path(dir, "file2"), """ 
    |Z;2017-08-09 20:00:00 
    |X;2017-08-09 12:00:00 
    |Y;2017-08-09 12:50:00""".stripMargin) 

query.processAllAvailable() 
val lp2 = query.lastProgress 

// ------------------------------------------- 
// Batch: 1 
// ------------------------------------------- 
// +---------------------------------------------+-----+ 
// |window          |count| 
// +---------------------------------------------+-----+ 
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3 | 
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2 | 
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1 | 
// +---------------------------------------------+-----+ 

// lp2: org.apache.spark.sql.streaming.StreamingQueryProgress = 
// { 
// ... 
// "numInputRows" : 3, 
// "eventTime" : { 
//  "avg" : "2017-08-09T14:56:40.000Z", 
//  "max" : "2017-08-09T20:00:00.000Z", 
//  "min" : "2017-08-09T12:00:00.000Z", 
//  "watermark" : "2017-08-09T09:20:00.000Z" 
// }, 
// "stateOperators" : [ { 
//  "numRowsTotal" : 3, 
//  "numRowsUpdated" : 2 
// } ], 
// ... 
// } 

writeFile(new Path(dir, "file3"), "") 

query.processAllAvailable() 
val lp3 = query.lastProgress 

// ------------------------------------------- 
// Batch: 2 
// ------------------------------------------- 
// +---------------------------------------------+-----+ 
// |window          |count| 
// +---------------------------------------------+-----+ 
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3 | 
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2 | 
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1 | 
// +---------------------------------------------+-----+ 

// lp3: org.apache.spark.sql.streaming.StreamingQueryProgress = 
// { 
// ... 
// "numInputRows" : 0, 
// "eventTime" : { 
//  "watermark" : "2017-08-09T19:00:00.000Z" 
// }, 
// "stateOperators" : [ ], 
// ... 
// } 

query.stop() 
fs.delete(dir, true) 

公告批0如何以水印1970-01-01 00:00:00开始,而批1以水印2017-08-09 09:20:00开始(批0的最大事件时间减1小时)。第2批,虽然是空的,但使用水印2017-08-09 19:00:00

+0

这是超级有用的,谢谢你的详细答案! –

1

Z先被处理,所以最大事件时间戳被设置为晚上8点。

这是正确的。尽管可以首先计算Z,但是从当前查询迭代中的最大时间戳中减去水印。这意味着08:00 PM将被设置为我们减去水印时间的时间,意思是12:00和12:50将被丢弃。

From the documentation

用于T开始在特定时间窗口,发动机将保持状态并允许迟数据来更新状态,直到(由发动机看出最大事件时间 - 延迟阈值> T)


会以避免数据丢失的唯一选择比你期望这份工作不断往下走

使用水印长

不一定。让我们假设你设置了每个Kafka查询到100个项目的最大数据量。如果您读小批量,并且您从每个分区连续读取,则每个批次的每个最大时间戳可能不是代理中最新消息的最大时间,这意味着您不会丢失这些消息。

+0

对不起,我不确定我是否理解第一点; 'withWatermark' [文档](https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/Dataset.html#withWatermark-java.lang.String-java。 lang.String-)声明最大事件时间在当前查询的所有分区中设置。在这种情况下,我们不应该在20:00减去水印,因此12:50和12:00将被丢弃? 第二点,这是真的,节流将有助于缓解这个问题。它仍然看起来有点冒险,不管每批是否都适合水印。 –

+0

@RayJ水印在minibatch期间是一个固定值。在你的例子中,由于X,Y和Z在同一个小批中处理,所以他们使用的水印将是上午9:20。在完成之后,minibatch水印将更新至晚上7点。 – nonsleepr

+0

@RayJ活动的最长时间是下午12点10分,而不是晚上8点。为什么它会从一个不是最大的值中减去它? –