2017-06-04 116 views
1

我有我需要处理的项目编号列表。一个项目可能有大约8000个项目,我需要获取项目中每个项目的数据,然后将这些数据推送到服务器列表中。任何人都可以告诉我以下...BroadcastBlock缺失项目

1)我有1000个项目在iR但只有998被写入服务器。通过使用broadCastBlock,我的物品是否松动? 2)我是否正确地等待所有actionBlocks? 3)如何使数据库调用异步?

这里是数据库代码

public MemcachedDTO GetIR(MemcachedDTO dtoItem) 
    { 

     string[] Tables = new string[] { "iowa", "la" }; 
     using (SqlConnection connection = new SqlConnection(ConfigurationManager.ConnectionStrings["test"].ConnectionString)) 
     { 
      using (SqlCommand command = new SqlCommand("test", connection)) 
      { 
       DataSet Result = new DataSet(); 
       command.CommandType = CommandType.StoredProcedure; 

       command.Parameters.Add("@ProjectId", SqlDbType.VarChar); 
       command.Parameters["@ProjectId"].Value = dtoItem.ProjectId; 


       connection.Open(); 
       Result.EnforceConstraints = false; 
       Result.Load(command.ExecuteReader(CommandBehavior.CloseConnection), LoadOption.OverwriteChanges, Tables); 
       dtoItem.test = Result; 
      } 
     } 
     return dtoItem; 
    } 

更新: 我的代码更新到下面。它只是在我运行它时挂起,只写入1/4的数据到服务器?你能让我知道我做错了什么吗?

 public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(IEnumerable<ITargetBlock<T>> targets, DataflowBlockOptions options) 
    { 
     var targetsList = targets.ToList(); 

     var block = new ActionBlock<T>(
      async item => 
      { 
       foreach (var target in targetsList) 
       { 
        await target.SendAsync(item); 
       } 
      }, new ExecutionDataflowBlockOptions 
      { 
       CancellationToken = options.CancellationToken 
      }); 

     block.Completion.ContinueWith(task => 
     { 
      foreach (var target in targetsList) 
      { 
       if (task.Exception != null) 
        target.Fault(task.Exception); 
       else 
        target.Complete(); 
      } 
     }); 

     return block; 
    } 

    [HttpGet] 
    public async Task< HttpResponseMessage> ReloadItem(string projectQuery) 
    { 
     try 
     { 

      var linkCompletion = new ExecutionDataflowBlockOptions 
      { 
       MaxDegreeOfParallelism = 2 
      }; 
      var cts = new CancellationTokenSource(); 
      var dbOptions = new DataflowBlockOptions { CancellationToken = cts.Token }; 


      IList<string> projectIds = projectQuery.Split(',').ToList(); 
      IEnumerable<string> serverList = ConfigurationManager.AppSettings["ServerList"].Split(',').Cast<string>(); 

      var iR = new TransformBlock<MemcachedDTO, MemcachedDTO>(
       dto => dto.GetIR(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 }); 

      List<ActionBlock<MemcachedDTO>> actionList = new List<ActionBlock<MemcachedDTO>>(); 


      List<MemcachedDTO> dtoList = new List<MemcachedDTO>(); 

      foreach (string pid in projectIds) 
      { 
       IList<MemcachedDTO> dtoTemp = new List<MemcachedDTO>(); 
       dtoTemp = MemcachedDTO.GetItemIdsByProject(pid); 
       dtoList.AddRange(dtoTemp); 
      } 
      foreach (string s in serverList) 
      { 
       var action = new ActionBlock<MemcachedDTO>(
       async dto => await PostEachServerAsync(dto, s, "setitemcache")); 
       actionList.Add(action); 
      } 
      var bBlock = CreateGuaranteedBroadcastBlock(actionList, dbOptions); 

      foreach (MemcachedDTO d in dtoList) 
      { 
       await iR.SendAsync(d); 
      } 

      iR.Complete(); 
      iR.LinkTo(bBlock); 
      await Task.WhenAll(actionList.Select(action => action.Completion).ToList()); 

      return Request.CreateResponse(HttpStatusCode.OK, new { message = projectIds.ToString() + " reload success" }); 
     } 
     catch (Exception ex) 
     { 
      return Request.CreateResponse(HttpStatusCode.InternalServerError, new { message = ex.Message.ToString() }); 
     } 
    } 

回答

0

1)我在IR 1000个项目,但只有998被写入到服务器。通过使用broadCastBlock,我的物品是否松动?

是的,在下面的代码中您将BoundedCapacity设置为1,如果在任何时候您的BroadcastBlock无法传递一件物品,它将丢弃它。此外BroadcastBlock只会传播Completion到一个TargetBlock,在这里不要使用PropagateCompletion=true。如果您想要完成所有块,则需要手动处理Completion。这可以通过设置BroadcastBlock上的ContinueWithCompletion传递给所有连接的目标来完成。

var action = new ActionBlock<MemcachedDTO>(dto => PostEachServerAsync(dto, s, "set"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3, BoundedCapacity = 1 }); 
broadcast.LinkTo(action, linkCompletion); 
actionList.Add(action); 

选项:取而代之的是BroadcastBlock的使用正确界定BufferBlock。当您的下游块绑定到一个项目时,它们将无法接收其他项目,直到它们完成处理。这将允许BufferBlock将其物品提供给另一个可能空闲的ActionBlock

当您将项目添加到节流流程中时,即流程BoundedCapacity小于Unbounded。您需要使用SendAsync方法或至少处理Post的退货。我建议你只需使用SendAsync

foreach (MemcachedDTO d in dtoList) 
{ 
    await iR.SendAsync(d); 
} 

这将迫使你的方法签名成为:

public async Task<HttpResponseMessage> ReloadItem(string projectQuery) 

2)我是否正确做等待有关的所有actionBlocks?

以前的变化将使你失去阻塞Wait呼叫赞成的await Task.WhenAlll

iR.Complete(); 
actionList.ForEach(x => x.Completion.Wait()); 

To: 

iR.Complete(); 
await bufferBlock.Completion.ContinueWith(tsk => actionList.ForEach(x => x.Complete()); 
await Task.WhenAll(actionList.Select(action => action.Completion).ToList()); 

3)如何使数据库调用异步?

我要离开这个打开,因为它应该是无关的TPL-Dataflow一个单独的问题,但在短期使用async API来访问你的数据库和async将通过您的代码库自然生长。 This should get you started

BufferBlock VS BroadcastBlock

重新阅读您的previous question和答案来自@VMAtm后。看来你想每个项目发送到全部五台服务器,在这种情况下,你将需要一个BroadcastBlock。您可以使用BufferBlock将消息相对均匀地分发到各自可处理消息的灵活服务器池。无论如何,您仍然需要通过等待BroadcastBlock的完成来控制传播完成和故障到所有连接的ActionBlocks

以防止BroadcastBlock丢弃的消息

一般来说,你两个选项,设置您的ActionBlocks相绑定,这是他们的默认值:

new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3, BoundedCapacity = Unbounded }); 

或者广播消息你的自我从任何各种各样的你自己的建筑。 @ i3arnon的Here is an example implementationAnd another来自@svick

+0

感谢您的详细回复@JSteward。我已经在上面的帖子中更新了我的代码,并且仍然有问题...应用程序没有完成(我删除了PropagateCompletion),只写了1/4的记录。你能否指点我正确的方向?谢谢 – klkj898

+0

跳出来的第一件事是:在将所有数据发送到流中之前,您没有将“TransformBlock”链接到“BroadcastBlock”。当你修复那部分会发生什么? – JSteward

+0

另一种可能性是'PostEachServerAsync'未被等待,如果它正在运行'async',那么它将被视为火并遗忘,并且可能无法在流程完成时完成。 – JSteward