0

我与室壁运动分析实验和已经解决了与它的许多问题,但实际上坚持了以下内容:计算设备秒使用室壁运动分析

其实,我有记录的流当设备被打开,反映

device_id | timestamp | reading 1 | 2011/09/01 22:30 | 1 1 | 2011/09/01 23:00 | 0 1 | 2011/09/02 03:30 | 1 1 | 2011/09/02 03:31 | 0

  • 有关在reading场我使用1和0关:像关闭。

我试图做到的是创建一个重定向几秒钟的设备已经每5分钟窗口到另一个流上的数字看上去就像一个泵:

device_id | timestamp | reading 1 | 2011/09/01 22:35 | 300 1 | 2011/09/01 22:40 | 300 1 | 2011/09/01 22:45 | 300 1 | 2011/09/01 22:50 | 300 1 | 2011/09/01 22:55 | 300 1 | 2011/09/01 23:00 | 300 1 | 2011/09/01 23:05 | 0 1 | 2011/09/01 23:10 | 0 ...

不知道这是Kinesis Analytics可以完成的事情,我实际上可以做一个查询SQL表的工作,但我坚持的是流数据。

回答

1

这是可能的Drools Kinesis Analytics(亚马逊托管服务):

类型:

package com.text; 

import java.util.Deque; 

declare EventA 
    @role(event) 
    id: int; 
    timestamp: long; 
    on: boolean; 

    //not part of the message 
    seen: boolean; 
end 

declare Session 
    id: int @key; 
    events: Deque; 
end 

declare Report 
    id: int @key; 
    timestamp: long @key; 
    onInLast5Mins: int; 
end 

规则:

package com.text; 

import java.util.Deque; 
import java.util.ArrayDeque; 

declare enum Constants 

    // 20 seconds - faster to test 
    WINDOW_SIZE(20*1000); 

    value: int; 
end 

rule "Reporter" 
    // 20 seconds - faster to test 
    timer(cron:0/20 * * ? * * *) 
when 
    $s: Session() 
then 
    long now = System.currentTimeMillis(); 

    int on = 0; //how long was on 
    int off = 0; //how long was off 
    int toPersist = 0; //last interesting event 

    for (EventA a : (Deque<EventA>)$s.getEvents()) { 
     toPersist ++; 
     boolean stop = false; 
     // time elapsed since the reading till now 
     int delta = (int)(now - a.getTimestamp()); 
     if (delta >= Constants.WINDOW_SIZE.getValue()) { 
      delta = Constants.WINDOW_SIZE.getValue(); 
      stop = true; 
     } 

     // remove time already counted 
     delta -= (on+off); 
     if (a.isOn()) 
      on += delta; 
     else 
      off += delta; 

     if (stop) 
      break; 
    } 

    int toRemove = $s.getEvents().size() - toPersist; 
    while (toRemove > 0) { 
     // this event is out of window of interest - delete 
     delete($s.getEvents().removeLast()); 
     toRemove --; 
    } 

    insertLogical(new Report($s.getId(), now, on)); 
end 

rule "SessionCreate" 
when 
    // for every new EventA 
    EventA(!seen, $id: id) from entry-point events 
    // check there is no session 
    not (exists(Session(id == $id))) 
then 
    insert(new Session($id, new ArrayDeque())); 
end 

rule "SessionJoin" 
when 
    // for every new EventA 
    $a : EventA(!seen) from entry-point events 
    // get event's session 
    $g: Session(id == $a.id) 
then 
    $g.getEvents().push($a); 
    modify($a) { 
     setSeen(true), 
     setTimestamp(System.currentTimeMillis()) 
    }; 
end 
+0

我正在检查Drools,看起来很酷,但现在想用SQL来实现它。 – codeadict

0

为此,您可以使用SQL与Stride HTTP API。您可以将持续SQL查询网络链接在一起,订阅更改流,以及在发生这种情况时想要采取某种任意操作时发送实时webhook。有关详细信息,请参阅Stride API docs