2017-08-25 46 views
0

问题背景

我试图生成每个从实时视频流密钥事件项目的总体(线性)订单,其中的顺序是事件时间(源自活动按关键字进行的处理全排序事件有效载荷)。使用Apache梁

方法

我曾试图实现此使用流如下:

1)设置的非重叠序列的窗户,例如持续时间5分钟

2)建立一个允许迟到 - 这是很好丢弃晚期事件

3)设置的累加模式保留全部解雇窗格

4)使用“AfterwaterMark”触发

5)当处理触发窗格时,只考虑窗格是否是最后一个窗口

6)使用GroupBy.perKey确保此窗口中此密钥的所有事件将作为单个资源上的一个单元进行处理

尽管这种方法确保给定窗口内每个键的线性顺序,但它不能保证跨越多个窗口,例如,可能会出现一个事件窗口,该事件在与之前的窗口同时处理之后发生,如果第一个窗口失败并且必须重试,这很容易发生。

我正在考虑采用这种方法,可以首先处理实时流,以便按键对事件进行分区,并将它们写入由其窗口范围命名的文件。 由于光束处理的并行性,这些文件也将被无序生成。 然后,单个进程协调器可以将这些文件顺序地提交给批处理管道 - 只有在它接收到前一个文件并且其下游处理已成功完成时才提交下一个文件。

问题是,如果该时间窗口中至少有一个时间元素,Apache Beam将仅触发窗格。因此,如果事件中存在差距,那么可能会在生成的文件中存在空白 - 即丢失的文件。丢失文件的问题在于,协调批处理器无法区分时间窗口是否已经过去而没有数据,或者在出现故障时出现故障,在这种情况下,只有在文件最终到达时才能继续。

强制事件窗口触发的一种方法可能是以某种方式将虚拟事件添加到每个分区和时间窗口的流中。然而,这很难做...如果时间序列中存在很大的空白,那么如果这些虚拟事件发生在很久以后被事件包围,那么它们将被放弃为迟。

是否有其他方法可以确保触发每个可能的事件窗口,即使这会导致输出空文件?

正在通过实时流的关键码生成一个总体排序,这是Apache Beam的一个易于处理的问题?我应该考虑另一种方法吗?

回答

0

根据您对可管理的定义,当然可以通过Apache Beam中的事件时间戳对每个按键进行流排序。

下面是设计背后的考虑:

  1. 阿帕奇梁并不保证有序的运输,所以有管道内没有使用。所以我会假设你是这样做的,所以你可以写一个外部系统,只有能够处理的东西,如果他们按顺序。
  2. 如果一个事件有时间戳t,你永远不能确定没有早先的事件会到达,除非你等到t可以放下。

因此,这里是我们将如何做到这一点:

  1. 我们将编写一个全局窗口使用state和定时器(blog post still under review)一ParDo。这使它成为一个按键工作流程。
  2. 我们将缓存到达状态的元素。因此,您允许的迟到会影响您需要的数据结构的效率。你需要的是一个堆看,并弹出最小时间戳和元素;没有内置的堆状态,所以我将它写为ValueState
  3. 我们将设置一个事件时间定时器在不能再违反元素的时间戳时收到回调。

为了简洁起见,我将假定自定义的EventHeap数据结构。在实践中,你想把它分解成多个状态单元来最小化传输的数据。堆可能是对原始类型状态的合理补充。

我还会假设我们需要的所有编码器都已经注册并专注于状态和定时器逻辑。

new DoFn<KV<K, Event>, Void>() { 

    @StateId("heap") 
    private final StateSpec<ValueState<EventHeap>> heapSpec = StateSpecs.value(); 

    @TimerId("next") 
    private final TimerSpec nextTimerSpec = TimerSpec.timer(TimeDomain.EVENT_TIME); 

    @ProcessElement 
    public void process(
     ProcessContext ctx, 
     @StateId("heap") ValueState<EventHeap> heapState, 
     @TimerId("next") Timer nextTimer) { 
    EventHeap heap = firstNonNull(
     heapState.read(), 
     EventHeap.createForKey(ctx.element().getKey())); 
    heap.add(ctx.element().getValue()); 
    // When the watermark reaches this time, no more elements 
    // can show up that have earlier timestamps 
    nextTimer.set(heap.nextTimestamp().plus(allowedLateness); 
    } 

    @OnTimer("next") 
    public void onNextTimestamp(
     OnTimerContext ctx, 
     @StateId("heap") ValueState<EventHeap> heapState, 
     @TimerId("next") Timer nextTimer) { 
    EventHeap heap = heapState.read(); 
    // If the timer at time t was delivered the watermark must 
    // be strictly greater than t 
    while (!heap.nextTimestamp().isAfter(ctx.timestamp())) { 
     writeToExternalSystem(heap.pop()); 
    } 
    nextTimer.set(heap.nextTimestamp().plus(allowedLateness); 
    } 
} 

这应该有希望让你开始走向任何潜在的用例。

+0

这太好了。 使用OnTimerContext对象,可以使用OnTimer方法而不是ProcessElement方法(process(..))生成ParDo函数的输出吗? – mark

+0

是的,你也可以通过OnTimerContext输出。 –

+0

“为了简洁起见,我将假定一个自定义的EventHeap数据结构。实际上,您希望将其分解为多个状态单元,以最大限度地减少传输的数据。” 我不知道这将如何帮助。为了将每个新项目添加到堆中,您必须首先读取整个堆,然后将其写回。如何将事件的属性分成不同的值状态,避免读取和写入所有这些状态? – mark