2017-09-13 74 views
0

我怎么能收集传感器事件为每小时文档与原始消息的字段的子集的数组:Azure流分析:每小时紧凑的传感器数据?

输入事件的格式如下:

{"plantId": "Plant A", "machineId" : "M001", "sensorId": "S001", "unit": "kg", "time": "2017-09-05T22:00:14.9410000Z", "value": 1234.56} 

{"plantId": "Plant A", "machineId" : "M001", "sensorId": "S001", "unit": "kg", "time": "2017-09-05T22:00:19.5410000Z", "value": 1334.76} 

... 

我想获得的下面的输出为每个传感器中的每个小时:

{"plantId": "Plant A", "machineId" : "M001", "sensorId": "S001", "unit": "kg", 

    "from" : "2017-09-05T22:00:14.9410000Z", "to" : "2017-09-05T22:59:55.5410000Z", 

    "datat": [ 

    {"time": "2017-09-05T22:01:14.9410000Z", "value": 1234.56}, 

    {"time": "2017-09-05T22:01:19.5410000Z", "value": 1334.76}, 

    .... 

    ] 

} 

创建以下查询:

SELECT PlantId, MachineId, SensorId, Unit, 
     MIN(Time) AS From, MAX(Time) AS To, 
     Collect() AS Data 
INTO CosmosDBOutput 
FROM SensorsInput TIMESTAMP BY CAST(time as datetime) 
GROUP BY PlantId, MachineId, SensorId, Unit, TumblingWindow(hour,1) 

问题是collect会返回所有原点事件的完整数组。但我希望只有时间和价值领域。

如何将Collect()结果减少到此字段?

回答

2

根据您的描述,我建议您可以考虑使用JavaScript user-defined functions

您可以定义一个自定义函数来删除无用的值。

更多细节,你可以参考下面的步骤:

1.创建一个UDF:

enter image description here

2.添加以下代码功能

// Sample UDF which returns sum of two values. 
function main(InputJSON) { 
    for (i = 0; i < InputJSON.length; i++) { 
     delete InputJSON[i].plantId; 
     delete InputJSON[i].machineId; 
     delete InputJSON[i].sensorId; 
     delete InputJSON[i].unit; 
    } 
    return InputJSON; 
} 

3。更改查询:

注意:将UDF.remove为你的UDF的名字(UDF.yourUDFname)

SELECT 
    PlantId, MachineId, SensorId, Unit,UDF.remove(Collect()) AS Data,min(time) as fromdate,max(time) as todate 
INTO 
    [YourOutputAlias] 
FROM 
    [YourInputAlias] TIMESTAMP BY time 
GROUP BY PlantId, MachineId, SensorId, Unit, TumblingWindow(hour,1) 

结果:

enter image description here

+0

感谢。我已经在考虑一个类似的基于UDF的解决方案,但希望找到没有UDF的“纯SQL”解决方案。 –