0

我正在使用Azure Data Factory来定期将数据从MySQL导入到Azure SQL数据仓库。如何使用Azure Data Factory将数据从MySQL逐步导入Azure数据仓库?

数据通过Azure存储帐户上的临时blob存储,但是当我运行管道时,它失败了,因为它无法将blob文本分离回列。管道试图插入目标的每一行都将成为一个长字符串,其中包含由“⯑”字符分隔的所有列值。

我之前使用过数据工厂,没有尝试增量机制,它工作正常。我没有看到会导致这种行为的原因,但我可能错过了一些东西。

我附上描述管道的JSON,附带一些小的命名更改,请让我知道是否看到任何可以解释这一点的内容。

谢谢!

编辑:添加异常消息:

Failed execution Database operation failed. Error message from database execution : ErrorCode=FailedDbOperation,'Type=Microsoft.DataTransfer.Com‌​mon.Shared.HybridDel‌​iveryException,Messa‌​ge=Error happened when loading data into SQL Data Warehouse.,Source=Microsoft.DataTransfer.ClientLibrary,''Typ‌​e=System.Data.SqlCli‌​ent.SqlException,Mes‌​sage=Query aborted-- the maximum reject threshold (0 rows) was reached while reading from an external source: 1 rows rejected out of total 1 rows processed. (/f4ae80d1-4560-4af9-9e74-05de941725ac/Data.8665812f-fba1-40‌​7a-9e04-2ee5f3ca5a7e‌​.txt) Column ordinal: 27, Expected data type: VARCHAR(45) collate SQL_Latin1_General_CP1_CI_AS, Offending value:* ROW OF VALUES * (Tokenization failed), Error: Not enough columns in this line.,},],'.

{ 
"name": "CopyPipeline-move_incremental_test", 
"properties": { 
    "activities": [ 
     { 
      "type": "Copy", 
      "typeProperties": { 
       "source": { 
        "type": "RelationalSource", 
        "query": "$$Text.Format('select * from [table] where InsertTime >= \\'{0:yyyy-MM-dd HH:mm}\\' AND InsertTime < \\'{1:yyyy-MM-dd HH:mm}\\'', WindowStart, WindowEnd)" 
       }, 
       "sink": { 
        "type": "SqlDWSink", 
        "sqlWriterCleanupScript": "$$Text.Format('delete [schema].[table] where [InsertTime] >= \\'{0:yyyy-MM-dd HH:mm}\\' AND [InsertTime] <\\'{1:yyyy-MM-dd HH:mm}\\'', WindowStart, WindowEnd)", 
        "allowPolyBase": true, 
        "polyBaseSettings": { 
         "rejectType": "Value", 
         "rejectValue": 0, 
         "useTypeDefault": true 
        }, 
        "writeBatchSize": 0, 
        "writeBatchTimeout": "00:00:00" 
       }, 
       "translator": { 
        "type": "TabularTranslator", 
        "columnMappings": "column1:column1,column2:column2,column3:column3" 
       }, 
       "enableStaging": true, 
       "stagingSettings": { 
        "linkedServiceName": "StagingStorage-somename", 
        "path": "somepath" 
       } 
      }, 
      "inputs": [ 
       { 
        "name": "InputDataset-input" 
       } 
      ], 
      "outputs": [ 
       { 
        "name": "OutputDataset-output" 
       } 
      ], 
      "policy": { 
       "timeout": "1.00:00:00", 
       "concurrency": 10, 
       "style": "StartOfInterval", 
       "retry": 3, 
       "longRetry": 0, 
       "longRetryInterval": "00:00:00" 
      }, 
      "scheduler": { 
       "frequency": "Hour", 
       "interval": 1 
      }, 
      "name": "Activity-0-_Custom query_->[schema]_[table]" 
     } 
    ], 
    "start": "2017-06-01T05:29:12.567Z", 
    "end": "2099-12-30T22:00:00Z", 
    "isPaused": false, 
    "hubName": "datafactory_hub", 
    "pipelineMode": "Scheduled" 
} 

}

+0

你能提供更多的步骤和例外吗? –

+0

执行失败 数据库操作失败。来自数据库执行的错误消息:ErrorCode = FailedDbOperation,'Type = Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message =将数据加载到SQL数据仓库时发生错误,Source = Microsoft.DataTransfer.ClientLibrary,'Type = System。 Data.SqlClient。SqlException,Message = Query中止 - 从外部源读取时达到最大拒绝阈值(0行):在处理的总共1行中被拒绝1行。 (/f4ae80d1-4560-4af9-9e74-05de941725ac/Data.8665812f-fba1-407a-9e04-2ee5f3ca5a7e.txt) – PandaZ

+0

列序号:27,预期的数据类型:VARCHAR(45)collat​​e SQL_Latin1_General_CP1_CI_AS,出错值:*过多的字符FOR RESPONSE *(Tokenization失败),错误:此行中没有足够的列。,},],'。 – PandaZ

回答

1

这听起来像你做什么是对的,但形成不良的数据(通病,无UTF-8编码)所以ADF无法根据需要分析结构。当我遇到这种情况时,我经常需要向管道添加一个自定义活动,以清理和准备数据,以便通过下游活动以结构化方式使用它。不幸的是,在解决方案的开发中这是一个很大的开销,并且需要编写一个C#类来处理数据转换。

还记得ADF没有自己的计算机,它只调用其他服务,所以你还需要一个Azure Batch服务来执行编译代码。

可悲的是,这里没有任何魔法修复。 Azure非常适合提取并加载完美结构化的数据,但在现实世界中,我们需要其他服务来完成“转换”或“清理”的含义,因此我们需要一个可以ETL或我更喜欢ECTL的管道。

下面是关于创建ADF自定义活动,让你开始一个链接:https://www.purplefrogsystems.com/paul/2016/11/creating-azure-data-factory-custom-activities/

希望这有助于。

1

我一直在使用数据工厂v.2使用staging(这意味着Polybase)从Azure sql db导入到Azure DWH时使用相同的消息。我已经了解到,Polybase会失败并显示与错误的数据类型有关的错误消息等。我收到的消息与here提到的消息非常相似,即使我没有直接从SQL使用Polybase,而是通过Data Factory。

无论如何,我的解决方案是避免为十进制或数字类型的列的NULL值,例如, ISNULL(mynumericCol,0)as mynumericCol。

相关问题