2016-09-22 78 views
23

我有一个存储过程将插入大量的记录,现在有没有可能将数据并行插入到3个表中;如何在三个不同的表中插入数据parellel

  • 第一张表插入100万条记录。
  • 第二张表插入150万条记录。
  • 三表中插入500K记录

按我的知识 - 程序插入一个发生后等。

那么我该如何实现并行加载?

+2

简单的解决方案:为每个插入创建一个单独的proc,创建作业并将它们从最初的proc中启动。 – Serg

+0

一个例子会更小心。也许在事务中包装3个插入语句? – shadow

+3

根据你实际想要达到什么,这也可以使用SSIS包完成。并行运行几个步骤在那里很常见。 – Jens

回答

16

声明一个T-SQL批处理中同步执行。要从存储过程异步执行多个语句,需要使用多个并发数据库连接。请注意,异步执行的棘手部分不仅决定所有任务何时完成,还取决于它们是成功还是失败。

方法1:SSIS包

创建SSIS包来并行执行3个SQL语句。在SQL 2012和更高版本中,使用SSIS目录存储过程运行该包。在SQL 2012之前,您需要为该包创建一个SQL代理作业并使用sp_start_job启动。

您需要检查SSIS执行状态或SQL代理作业状态以确定完成和成功/失败结果。

方法2:PowerShell和SQL代理

执行运行PowerShell脚本执行使用PowerShell后台作业并行查询一个SQL代理作业(Start-Job命令)。脚本可以返回一个退出码,成功为零,失败为非零,以便SQL Agent可以确定它是否成功。检查SQL代理作业状态以确定完成以及成功/失败结果。

方法3:多个SQL代理作业

执行多个SQL Agent作业的同时,每一个T-SQL作业步骤containting导入脚本。检查每个作业的SQL Agent作业状态以确定完成以及成功/失败结果。

方法4:服务代理 使用队列激活的proc并行执行导入脚本。如果您以前没有使用过Service Broker,那么这可能会变得迟钝,并且遵循审查模式很重要。我已经包含了一个示例来帮助您开始(将pre-SQL 2012中的THROW替换为RAISERROR)。数据库必须启用Service Broker,默认情况下启用Service Broker,但在还原或附加后关闭。

USE YourDatabase; 
Go 

--create proc that will be automatically executed (activated) when requests are waiting 
CREATE PROC dbo.ExecuteTSqlTask 
AS 
SET NOCOUNT ON; 

DECLARE 
     @TSqlJobConversationHandle uniqueidentifier = NEWID() 
    , @TSqlExecutionRequestMessage xml 
    , @TSqlExecutionResultMessage xml 
    , @TSqlExecutionResult varchar(10) 
    , @TSqlExecutionResultDetails nvarchar(MAX) 
    , @TSqlScript nvarchar(MAX) 
    , @TSqlTaskName sysname 
    , @RowsAffected int 
    , @message_type_name sysname; 

WHILE 1 = 1 
BEGIN 

    --get the next task to execute 
    WAITFOR (
     RECEIVE TOP (1) 
       @TSqlJobConversationHandle = conversation_handle 
      , @TSqlExecutionRequestMessage = CAST(message_body AS xml) 
      , @message_type_name = message_type_name 
     FROM dbo.TSqlExecutionQueue 
     ), TIMEOUT 1000; 

    IF @@ROWCOUNT = 0 
    BEGIN 
     --no work to do - exit 
     BREAK; 
    END; 

    IF @message_type_name = N'TSqlExecutionRequest' 
    BEGIN 

     --get task name and script 
     SELECT 
       @TSqlTaskName = @TSqlExecutionRequestMessage.value('(/TSqlTaskName)[1]', 'sysname') 
      , @TSqlScript = @TSqlExecutionRequestMessage.value('(/TSqlScript)[1]', 'nvarchar(MAX)'); 

     --execute script 
     BEGIN TRY 
      EXEC sp_executesql @TSqlScript; 
      SET @RowsAffected = @@ROWCOUNT; 
      SET @TSqlExecutionResult = 'Completed'; 
      SET @TSqlExecutionResultDetails = CAST(@RowsAffected as varchar(10)) + ' rows affected'; 
     END TRY 
     BEGIN CATCH 
      SET @TSqlExecutionResult = 'Erred'; 
      SET @TSqlExecutionResultDetails = 
        'Msg ' + CAST(ERROR_NUMBER() AS varchar(10)) 
       + ', Level ' + CAST(ERROR_SEVERITY() AS varchar(2)) 
       + ', State ' + CAST(ERROR_STATE() AS varchar(10)) 
       + ', Line ' + CAST(ERROR_LINE() AS varchar(10)) 
       + ': ' + ERROR_MESSAGE(); 
     END CATCH; 

     --send execution result back to initiator 
     SET @TSqlExecutionResultMessage = '<TSqlTaskName /><TSqlExecutionResult /><TSqlExecutionResultDetails />'; 
     SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlTaskName")} into (/TSqlTaskName)[1] '); 
     SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlExecutionResult")} into (/TSqlExecutionResult)[1] '); 
     SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlExecutionResultDetails")} into (/TSqlExecutionResultDetails)[1] '); 
     SEND ON CONVERSATION @TSqlJobConversationHandle 
      MESSAGE TYPE TSqlExecutionResult 
      (@TSqlExecutionResultMessage); 

    END 
    ELSE 
    BEGIN 
     IF @message_type_name = N'TSqlJobComplete' 
     BEGIN 
      --service has ended conversation so we're not going to get any more execution requests 
      END CONVERSATION @TSqlJobConversationHandle; 
     END 
     ELSE 
     BEGIN 
      END CONVERSATION @TSqlJobConversationHandle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type received by ExecuteTSqlTask'; 
      RAISERROR('Unexpected message type received (%s) by ExecuteTSqlTask', 16, 1, @message_type_name); 
     END; 
    END; 
END; 
GO 

CREATE QUEUE dbo.TSqlResultQueue; 
CREATE QUEUE dbo.TSqlExecutionQueue 
    WITH STATUS=ON, 
    ACTIVATION (
      STATUS = ON 
     , PROCEDURE_NAME = dbo.ExecuteTSqlTask 
     , MAX_QUEUE_READERS = 3 --max number of concurrent activated proc instances 
     , EXECUTE AS OWNER 
     ); 
CREATE MESSAGE TYPE TSqlExecutionRequest VALIDATION = WELL_FORMED_XML; 
CREATE MESSAGE TYPE TSqlExecutionResult VALIDATION = WELL_FORMED_XML; 
CREATE MESSAGE TYPE TSqlJobComplete VALIDATION = WELL_FORMED_XML; 
CREATE CONTRACT TSqlExecutionContract (
     TSqlExecutionRequest SENT BY INITIATOR 
    , TSqlJobComplete SENT BY INITIATOR 
    , TSqlExecutionResult SENT BY TARGET 
    ); 
CREATE SERVICE TSqlJobService ON QUEUE dbo.TSqlResultQueue ([TSqlExecutionContract]); 
CREATE SERVICE TSqlExecutorService ON QUEUE dbo.TSqlExecutionQueue ([TSqlExecutionContract]); 
GO 

CREATE PROC dbo.ExecuteParallelImportScripts 
AS 
SET NOCOUNT ON; 

DECLARE 
     @TSqlJobConversationHandle uniqueidentifier 
    , @TSqlExecutionRequestMessage xml 
    , @TSqlExecutionResultMessage xml 
    , @TSqlExecutionResult varchar(10) 
    , @TSqlExecutionResultDetails nvarchar(MAX) 
    , @TSqlTaskName sysname 
    , @CompletedCount int = 0 
    , @ErredCount int = 0 
    , @message_type_name sysname; 

DECLARE @TsqlTask TABLE(
     TSqlTaskName sysname NOT NULL PRIMARY KEY 
    , TSqlScript nvarchar(MAX) NOT NULL 
    ); 

BEGIN TRY 

    --insert a row for each import task 
    INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript) 
     VALUES(N'ImportScript1', N'INSERT INTO dbo.Table1 SELECT * FROM dbo.Table1Staging;'); 
    INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript) 
     VALUES(N'ImportScript2', N'INSERT INTO dbo.Table2 SELECT * FROM dbo.Table2Staging;'); 
    INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript) 
     VALUES(N'ImportScript3', N'INSERT INTO dbo.Table3 SELECT * FROM dbo.Table3Staging;'); 

    --start a conversation for this import process 
    BEGIN DIALOG CONVERSATION @TsqlJobConversationHandle 
     FROM SERVICE TSqlJobService 
     TO SERVICE 'TSqlExecutorService', 'CURRENT DATABASE' 
     ON CONTRACT TSqlExecutionContract 
     WITH ENCRYPTION = OFF; 

    --send import tasks to executor service for parallel execution 
    DECLARE JobTasks CURSOR LOCAL FAST_FORWARD FOR 
     SELECT (SELECT TSqlTaskName, TSqlScript 
      FROM @TsqlTask AS task 
      WHERE task.TSqlTaskName = job.TSqlTaskName 
      FOR XML PATH(''), TYPE) AS TSqlExecutionRequest 
     FROM @TsqlTask AS job; 
    OPEN JobTasks; 
    WHILE 1 = 1 
    BEGIN 
     FETCH NEXT FROM JobTasks INTO @TSqlExecutionRequestMessage; 
     IF @@FETCH_STATUS = -1 BREAK; 
     SEND ON CONVERSATION @TSqlJobConversationHandle 
      MESSAGE TYPE TSqlExecutionRequest 
      (@TSqlExecutionRequestMessage); 
    END; 
    CLOSE JobTasks; 
    DEALLOCATE JobTasks; 

    --get each parallel task execution result until all are complete 
    WHILE 1 = 1 
    BEGIN 

     --get next task result 
     WAITFOR (
      RECEIVE TOP (1) 
        @TSqlExecutionResultMessage = CAST(message_body AS xml) 
       , @message_type_name = message_type_name 
      FROM dbo.TSqlResultQueue 
      WHERE conversation_handle = @TSqlJobConversationHandle 
      ), TIMEOUT 1000; 

     IF @@ROWCOUNT <> 0 
     BEGIN 

      IF @message_type_name = N'TSqlExecutionResult' 
      BEGIN 

       --get result of import script execution 
       SELECT 
         @TSqlTaskName = @TSqlExecutionResultMessage.value('(/TSqlTaskName)[1]', 'sysname') 
        , @TSqlExecutionResult = @TSqlExecutionResultMessage.value('(/TSqlExecutionResult)[1]', 'varchar(10)') 
        , @TSqlExecutionResultDetails = COALESCE(@TSqlExecutionResultMessage.value('(/TSqlExecutionResultDetails)[1]', 'nvarchar(MAX)'), N''); 
       RAISERROR('Import task %s %s: %s', 0, 0, @TSqlTaskName, @TSqlExecutionResult, @TSqlExecutionResultDetails) WITH NOWAIT; 
       IF @TSqlExecutionResult = 'Completed' 
       BEGIN 
        SET @CompletedCount += 1; 
       END 
       ELSE 
       BEGIN 
        SET @ErredCount += 1; 
       END; 

       --remove task from tracking table after completion 
       DELETE FROM @TSqlTask 
       WHERE TSqlTaskName = @TSqlTaskName; 

       IF NOT EXISTS(SELECT 1 FROM @TsqlTask) 
       BEGIN 
        --all tasks are done - send TSqlJobComplete message to instruct executor service to end conversation 
        SEND ON CONVERSATION @TSqlJobConversationHandle 
         MESSAGE TYPE TSqlJobComplete; 
       END 
      END 
      ELSE 
      BEGIN 
       IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' 
       BEGIN 
        --executor service has ended conversation so we're done 
        END CONVERSATION @TSqlJobConversationHandle; 
        BREAK; 
       END 
       ELSE 
       BEGIN 
        END CONVERSATION @TSqlJobConversationHandle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type received by ExecuteParallelInserts'; 
        RAISERROR('Unexpected message type received (%s) by ExecuteParallelInserts', 16, 1, @message_type_name); 
       END; 
      END 
     END; 
    END; 
    RAISERROR('Import processing completed. CompletedCount=%d, ErredCount=%d.', 0, 0, @CompletedCount, @ErredCount); 
END TRY 
BEGIN CATCH 
    THROW; 
END CATCH; 
GO 

--execute import scripts in parallel 
EXEC dbo.ExecuteParallelImportScripts; 
GO 
-3

所有三个表的结构和内容是否相同? 如果是这样,使用事务/合并复制

另外,在第一个表创建触发器插入到第二个和第三个表

2

假设你想有相同的插入日期值所有插入,定义一个日期参数设置为当前日期,如图所示。

DECLARE @InsertDate as date 
SET @InsertDate = GetDate() 

然后,将插入日期参数​​传递到插入存储过程并相应地更新此存储过程以使用该输入。这将确保相同的插入日期值将用于所有插入。

EXEC dbo.InsertTables123 @p1 = @InsertDate 

如果需要的话,还可以手动分配@InsertDate输入参数,如果不是当前日期。

9

你可以尝试创建三个作业和下面的并行执行插入脚本:

DECLARE @jobId BINARY(16) 
EXEC msdb.dbo.sp_add_job @job_name=N'Job1', 
     @enabled=1, 
     @description=N'No description available.', 
     @job_id = @jobId OUTPUT 

EXEC msdb.dbo.sp_add_jobstep @[email protected], @step_name=N'Insert into First Table', 
     @step_id=1, 
     @cmdexec_success_code=0, 
     @on_success_action=1, 
     @on_success_step_id=0, 
     @on_fail_action=2, 
     @on_fail_step_id=0, 
     @retry_attempts=0, 
     @retry_interval=0, 
     @os_run_priority=0, @subsystem=N'TSQL', 
     @command=N'--Insert script for first table', 
     @database_name=N'Test', 
     @flags=0 

EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)' 
GO 

DECLARE @jobId BINARY(16) 
EXEC msdb.dbo.sp_add_job @job_name=N'Job2', 
     @enabled=1, 
     @description=N'No description available.', 
     @job_id = @jobId OUTPUT 

EXEC msdb.dbo.sp_add_jobstep @[email protected], @step_name=N'Insert into second Table', 
     @step_id=1, 
     @cmdexec_success_code=0, 
     @on_success_action=1, 
     @on_success_step_id=0, 
     @on_fail_action=2, 
     @on_fail_step_id=0, 
     @retry_attempts=0, 
     @retry_interval=0, 
     @os_run_priority=0, @subsystem=N'TSQL', 
     @command=N'--Insert script for second table', 
     @database_name=N'Test', 
     @flags=0 

EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)' 
GO 

DECLARE @jobId BINARY(16) 
EXEC msdb.dbo.sp_add_job @job_name=N'Job3', 
     @enabled=1, 
     @description=N'No description available.', 
     @job_id = @jobId OUTPUT 

EXEC msdb.dbo.sp_add_jobstep @[email protected], @step_name=N'Insert into Third Table', 
     @step_id=1, 
     @cmdexec_success_code=0, 
     @on_success_action=1, 
     @on_success_step_id=0, 
     @on_fail_action=2, 
     @on_fail_step_id=0, 
     @retry_attempts=0, 
     @retry_interval=0, 
     @os_run_priority=0, @subsystem=N'TSQL', 
     @command=N'--Insert script for third table', 
     @database_name=N'Test', 
     @flags=0 

EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)' 
GO 

EXEC msdb.dbo.sp_start_job N'Job1' ; --All will execute in parallel 
EXEC msdb.dbo.sp_start_job N'Job2' ; 
EXEC msdb.dbo.sp_start_job N'Job3' ; 
+0

写得好先生。 –

1

对于您的过程,我假设您有tableName和文件位置作为参数。

如果您有一个拥有300万条记录的大文件,您必须将该文件拆分为3个小文件(如果您知道除sql之外的其他任何语言),那么可以打开3个Sql服务器命令控制台行,并在每个控制台中调用该过程。它会使插入并行。或者您知道任何其他编程语言,您可以使用多个线程来调用该过程。

相关问题