我有一系列正在由流分析作业处理的动态数据。有几个可以明确查询的统一属性,但在查询时大部分有效负载是未知类型。我的目标是获取这个未知数据(记录)并将所有属性提升为写入Azure Table的结果查询中的顶级字段。从记录到顶级提升已知属性导致流分析查询
我能够扁平化记录的属性,它总是作为子对象添加到查询中。 GetRecordProperties()
没有帮助,因为我不想为每个属性返回一个单独的记录。
我的查询看起来是这样的:
WITH
[custom_events_temp] AS
(
SELECT
[magellan].[context].[data].[eventTime] as [event_time],
[flat_event].ArrayValue.name as [event_name],
udf.FlattenCustomDimensions([magellan].[context].[custom].[dimensions]) as [flat_custom_dim]
FROM [Magellan--AI-CustomEvents] magellan
TIMESTAMP BY [magellan].[context].[data].[eventTime]
CROSS APPLY GetElements([magellan].[event]) as [flat_event]
),
-- create table with extracted webhook data
[all_webhooks] AS
(
SELECT
[flat_custom_dim].[hook_event_source] as PartitionKey,
udf.CreateGuid('') as RowKey,
-- event data
[custom_events_temp].[event_time],
[custom_events_temp].[flat_custom_dim].[hook_event_name] as [event_name],
-- webhook payload data
udf.FlattenWebhookPayload(udf.ExtractJsonWebhookPayload([custom_events_temp].[flat_custom_dim].[webhook_payload])) AS [payload]
FROM [custom_events_temp]
)
SELECT * INTO [TrashTableOut] FROM [all_webhooks]
,所得记录我得到是这样的。这个想法是将嵌套对象中的所有内容都嵌套在一起,因此每个属性在Azure表中都有自己的列。
{
"partitionkey": "zzzzzzzzz",
"rowkey": "8beeb783-b07f-8a98-ef56-71c43378a5fc",
"event_time": "2017-10-15T05:37:06.3240000Z",
"event_name": "subscriber.updated_lead_score",
"payload": {
"event": "subscriber.updated_custom_field",
"data.subscriber.id": "...",
"occurred_at": "2017-10-15T05:36:57.000Z",
"data.account_id": "11111",
"data.subscriber.status": "active",
"data.subscriber.custom_fields.coupon": "xxxxxxx",
"data.subscriber.custom_fields.coupon_discounted_price": "11111",
"data.subscriber.custom_fields.coupon_pre_discount_price": "11111",
"data.subscriber.custom_fields.name": "John Doe",
"data.subscriber.custom_fields.first_name": "John",
"data.subscriber.custom_fields.ip_address": "0.0.0.0",
"data.subscriber.tags": "tag1,tag2,tag3",
"data.subscriber.time_zone": "Europe/Berlin",
"data.subscriber.utc_offset": 120,
"data.subscriber.created_at": "2017-03-27T18:19:35.000Z"
}
}
这可能吗? UDF FlattenCustomDimensions
接收一系列项目并将它们作为属性公开。 UDF ExtractJsonWebhookPayload
采用字符串&将其转换为JSON,而UDF FlattenWebhookPayload
采用复杂的JSON对象&创建在结果中的对象中看到的点语法。
我的最终目标是获得一个结果集,看起来像:
{
"partitionkey": "zzzzzzzzz",
"rowkey": "8beeb783-b07f-8a98-ef56-71c43378a5fc",
"event_time": "2017-10-15T05:37:06.3240000Z",
"event_name": "subscriber.updated_lead_score",
"payload.event": "subscriber.updated_custom_field",
"payload.data.subscriber.id": "...",
"payload.occurred_at": "2017-10-15T05:36:57.000Z",
"payload.data.account_id": "11111",
"payload.data.subscriber.status": "active",
"payload.data.subscriber.custom_fields.coupon": "xxxxxxx",
"payload.data.subscriber.custom_fields.coupon_discounted_price": "11111",
"payload.data.subscriber.custom_fields.coupon_pre_discount_price": "11111",
"payload.data.subscriber.custom_fields.name": "John Doe",
"payload.data.subscriber.custom_fields.first_name": "John",
"payload.data.subscriber.custom_fields.ip_address": "0.0.0.0",
"payload.data.subscriber.tags": "tag1,tag2,tag3",
"payload.data.subscriber.time_zone": "Europe/Berlin",
"payload.data.subscriber.utc_offset": 120,
"payload.data.subscriber.created_at": "2017-03-27T18:19:35.000Z"
}
除非有人有更好的主意/选项。
,如果你知道所有的列名,它可能可以写一个查询来促进所有嵌套的字段。但是这个查询将会很大并且很难改变。 你有没有考虑过使用[javascript UDF](https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-javascript-user-defined-functions)?它会更清洁,你将不得不将整个有效载荷传递到UDF的单个字段中。 –
嗯,是的,点是我不知道领域。通过使用点语法的直接查询,了解字段名称很容易解决。 –