看来,锁在我的代码下面不起作用。为什么我的锁不会阻塞?
这里有什么,我试图做一些背景:
我有一个库存管理系统,该系统具有以下功能:
- 接收订单
- 分配现有库存来履行订单
- 如果无法填写订单,则需要延期交货
- 尝试在添加新库存时填写缺货订单
- 允许取消订单。
因此,假设有100个订单处于缺货状态,那么一批库存即可满足所有100个缺货订单。当我的Warehousing类收到货物(下面未显示)时,将调用下面定义的AllocateOutstandingRequests方法。最终,我们在下面的AllocateInventory方法的第一个foreach循环中结束,它遍历100个订单,试图为每个订单分配库存。现在,假设我的消息传递类(部分定义如下)接收到该列表的订单号2的CancelOrder,则循环在项80上。 CancelOrder例程在Allocation类中调用我的AddCancel例程,并且应该锁定在锁上(_cancelsLock),但它不会。我使用timestamps和threadId跟踪了所有这些代码,并且我甚至在循环中放置了3秒的延迟,试图为延期交付的项目分配库存。我可以在跟踪中看到循环完全按照预期工作,每次分配之间有3秒钟,但是我还可以看到AddCancel调用立即返回到Messaging类,没有任何延迟;我预计会有延迟,它应该阻止,直到所有100个订单被分配并锁定释放。我究竟做错了什么?似乎AddCancel中的锁并没有做任何事情!
我已经在代码中添加了日志的样子,以及日志文件中的实际日志条目。可以看出,Messaging.ReceiveInventory [threadId 38]接收库存并调用Allocation.AllocateOutstandingRequests,进而启动任务[threadId 26]:
2014-02-28 17:00:08,871 [38] INFO - 由用户处理传入ReceiveDrugs请求:WELLDYNERX \ privera
2014年2月28日17:00:08871 [38] INFO - NDC 00002323230加到库存请求者110
2014年2月28日17:00 :08,871 [26]信息 - ...分配未决请求
2014-02-28 17:00:08,887 [26]信息 - 试图分配100个未完成请求
2014年2月28日17:00:08934 [26] INFO - 延期交货RequestUID 100689分配
2014年2月28日17:00:23934 [26] INFO - 延期交货RequestUID 100690分配
2014-02-28 17:00:25,293 [42] INFO - 处理传入的CancelNotification; UID:100689,来自PRIVERA for RequestorUID:110
2014-02-28 17:00:25,309 [42] INFO - 取消通知UID:100689,处理。
2014年2月28日17:00:39012 [26] INFO - 延期交货RequestUID 100691分配
2014年2月28日17:00:54012 [26] INFO - 延期交货RequestUID 100692分配
配置类别:
public static class Allocation
{
public static void AllocateOutstandingRequests()
{
var factory = new TaskFactory(_orderedTaskScheduler);
TaskScheduler.UnobservedTaskException += OrderedTaskScheduler_UnobservedTaskException;
factory.StartNew(() =>
{
Trace.TraceInformation("...Allocating outstanding requests");
List<QueueingRequest> backorderedRequests = InventoryDao.GetBackorderedRequests();
List<AllocationRequest> backorderedRequestsAllocated =
AllocateInventory(backorderedRequests.OrderBy(r => r.RequestUID).ToList());
SendAllocationResponses(backorderedRequestsAllocated);
Trace.TraceInformation("Completed allocating outstanding requests...");
});
}
static List<AllocationRequest> AllocateInventory(List<QueueingRequest> outstandingRequests)
{
List<AllocationRequest> allocatedBackorderedRequests = new List<AllocationRequest>();
lock (_cancelsLock)
{
Trace.TraceInformation(string.Format("Attempting to allocate {0} outstanding requests", outstandingRequests.Count));
foreach (QueueingRequest queuedRequest in outstandingRequests)
{
if (_cancels.Contains(queuedRequest.RequestUID)) continue;
AllocationRequest allocationRequest = new AllocationRequest(queuedRequest);
if (AllocateOrder(allocationRequest))
{
Trace.TraceInformation(string.Format("Backordered RequestUID {0} Allocated", queuedRequest.RequestUID));
allocatedBackorderedRequests.Add(allocationRequest);
}
for (int iSleepAlot = 0; iSleepAlot < 5; iSleepAlot++)
System.Threading.Thread.Sleep(3000);
}
// Check to see if a CancelOrder came thru for backordered requests
// that the code above allocated inventory for.
foreach (int requestUID in allocatedBackorderedRequests.Select(r => r.RequestUID))
{
if (_cancels.Contains(requestUID))
_cancels.Remove(requestUID);
}
}
return allocatedBackorderedRequests;
}
static bool AllocateOrder(AllocationRequest request)
{
bool inventoryAllocated = false;
try
{
if (InventoryDao.SaveAllocation(request))
inventoryAllocated = Warehousing.AllocateDrugs(request.RequestorUID, request.Items);
}
catch (RequestAlreadyAllocatedException ex)
{
inventoryAllocated = true;
}
catch (Exception ex)
{
Trace.TraceError(ex.ToString());
throw;
}
return inventoryAllocated;
}
public static bool AddCancel(int requestUID)
{
bool requestStatusChangedToAllocated = false;
_cancels.Add(requestUID);
// block until backordered requests are allocated.
lock (_cancelsLock)
{
requestStatusChangedToAllocated = !_cancels.Contains(requestUID);
if (!requestStatusChangedToAllocated)
_cancels.Remove(requestUID);
}
return requestStatusChangedToAllocated;
}
static readonly TaskScheduler _orderedTaskScheduler = new LimitedConcurrencyLevelTaskScheduler(1);
static readonly List<int> _cancels = new List<int>();
static readonly object _cancelsLock = new object();
}
消息类:
public static class Messaging
{
public static void CancelOrder(CancelNotification notification)
{
Trace.TraceInformation(string.Format("Processing incoming CancelNotification; UID:{0}, from {1} for RequestorUID:{2}",
notification.RequestUID,
notification.User,
notification.RequestorUID));
// This is a blocking call which returns after all backordered requests are processed.
// The call may change the status from backordered to allocated, in which case, we'll
// have to DeAllocateDrugs in the services cache
bool requestStatusChangedToAllocated = Allocation.AddCancel(notification.RequestUID);
// do some work
Trace.TraceInformation(string.Format("CancelNotification for UID:{0}, processed.", notification.RequestUID));
}
public static List<string> ReceiveInventory(List<ReceivedInventory> received, string user, string comment)
{
Trace.TraceInformation(string.Format("Processing incoming ReceiveDrugs request by User:{0}", user));
foreach (ReceivedInventory inventory in received)
{
// do some work
Trace.TraceInformation(string.Format("NDC {0} added to inventory for requestor {1}", drugInventory.NDC, inventory.RequestorUID));
}
// re-evaluate allocations after inventory is loaded
Allocation.AllocateOutstandingRequests();
}
}
一般 - 不好的风格。使用您自己的任务计划程序并一次使用1个任务 - 当您获得大量订单时,比锁定更有效。在消息传递环境中进行串行处理的服务器要好得多,以确保只有一名工作人员处理消息。 – TomTom
何时/何处是分配的优秀请求()被调用? –
我看不到任何新的线程/任务正在创建....我看到.StartNew()但没有我在哪里看到它叫.. –