2017-04-21 74 views
1

由于源数据集中的数据不兼容与我的目标数据集相比,我收到了来自时间和时间的错误。我想控制流水线根据错误类型确定的操作,可能会输出或丢弃这些微粒行,然后完成其他所有操作。那可能吗?此外,是否可以通过某种简单的方式从Data Factory中获取实际失败的行,而无需访问和搜索实际的源数据集?如何控制Azure数据工厂管道中的数据失败?

复制活动在Sink端遇到用户错误:ErrorCode = UserErrorInvalidDataValue,'Type = Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message = Column'Timestamp'包含无效值'11667'。无法将'11667'转换为类型'DateTimeOffset',Source = Microsoft.DataTransfer.Common,''Type = System.FormatException,Message = String未被识别为有效的DateTime。,Source = mscorlib,'。

感谢

回答

3

我想你已经打了ADF中一个相当普遍的问题和局限性。尽管使用JSON定义的数据集允许ADF理解数据的结构,但仅此结构,编排工具无法做任何事情来转换或操作数据作为活动处理的一部分。

要直接回答你的问题,这当然是可能的。但是您需要分解C#并使用ADF的可扩展性功能在将它传递到最终目标之前处理坏行。

我建议您扩展您的数据工厂以包含一个自定义活动,您可以在其中构建一些较低级别的清理流程来转移所描述的错误行。

这是一种方法,我们经常拿,因为不是所有的数据被完美的(我想)和ETLELT不起作用。我更喜欢首字母缩写词ECLT。 'C'代表干净。或者清理,准备等。这当然适用于ADF,因为此服务没有自己的计算或SSIS样式的数据流引擎。

所以...

就如何做到这一点而言。首先,我建议您查看关于创建ADF自定义活动的博客文章。链接:

https://www.purplefrogsystems.com/paul/2016/11/creating-azure-data-factory-custom-activities/

然后从IDotNetActivity继承了你的C#类中做类似的下面。

public IDictionary<string, string> Execute(
     IEnumerable<LinkedService> linkedServices, 
     IEnumerable<Dataset> datasets, 
     Activity activity, 
     IActivityLogger logger) 
    { 

    //etc 

    using (StreamReader vReader = new StreamReader(YourSource)) 
     { 
      using (StreamWriter vWriter = new StreamWriter(YourDestination)) 
      { 
       while (!vReader.EndOfStream) 
       { 
       //data transform logic, if bad row etc 
       } 
      } 
     } 
    } 

你明白了。建立你自己的SSIS数据流!

然后将干净的行写出来作为输出数据集,它可以作为您下一次ADF活动的输入。可以使用多个管道,也可以作为单个管道中的链接活动。

这是您获得ADF处理当前服务产品中不良数据的唯一方法。

希望这会有所帮助

相关问题