我想有满足以下要求的自定义线程池:自定义线程池支持异步操作
- 房地产线程根据池容量预分配。实际的工作是免费使用标准的.NET线程池,如果需要产生并发任务。
- 池必须能够返回空闲线程的数量。返回的数字可能小于空闲线程的实际数量,但不得更大。当然,数字越准确越好。
- 对池的排队工作应返回相应的
Task
,这应该与基于任务的API很好地匹配。 - NEW最大作业能力(或并行度)应该可以动态调整。试图减少容量不一定立即生效,但增加它应该立即生效。
的第一个项目的基本原理描述如下:
- 机器是不应该被同时运行的大于N的工作项目更多,其中N是比较小 - 10和30 之间
- 工作从数据库中提取,如果提取了K个项目,那么我们要确保有K个空闲线程立即开始工作。工作从数据库中获取但仍然等待下一个可用线程的情况是不可接受的。
最后一项还解释了空闲线程数的原因 - 我将从数据库中获取许多工作项。它还解释了为什么报告的空闲线程数不能高于实际的线程数 - 否则我可能会获取更多可以立即启动的工作。
不管怎么说,这是我实现一个小程序来测试它(BJE代表后台作业引擎)一起:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace TaskStartLatency
{
public class BJEThreadPool
{
private sealed class InternalTaskScheduler : TaskScheduler
{
private int m_idleThreadCount;
private readonly BlockingCollection<Task> m_bus;
public InternalTaskScheduler(int threadCount, BlockingCollection<Task> bus)
{
m_idleThreadCount = threadCount;
m_bus = bus;
}
public void RunInline(Task task)
{
Interlocked.Decrement(ref m_idleThreadCount);
try
{
TryExecuteTask(task);
}
catch
{
// The action is responsible itself for the error handling, for the time being...
}
Interlocked.Increment(ref m_idleThreadCount);
}
public int IdleThreadCount
{
get { return m_idleThreadCount; }
}
#region Overrides of TaskScheduler
protected override void QueueTask(Task task)
{
m_bus.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return TryExecuteTask(task);
}
protected override IEnumerable<Task> GetScheduledTasks()
{
throw new NotSupportedException();
}
#endregion
public void DecrementIdleThreadCount()
{
Interlocked.Decrement(ref m_idleThreadCount);
}
}
private class ThreadContext
{
private readonly InternalTaskScheduler m_ts;
private readonly BlockingCollection<Task> m_bus;
private readonly CancellationTokenSource m_cts;
public readonly Thread Thread;
public ThreadContext(string name, InternalTaskScheduler ts, BlockingCollection<Task> bus, CancellationTokenSource cts)
{
m_ts = ts;
m_bus = bus;
m_cts = cts;
Thread = new Thread(Start)
{
IsBackground = true,
Name = name
};
Thread.Start();
}
private void Start()
{
try
{
foreach (var task in m_bus.GetConsumingEnumerable(m_cts.Token))
{
m_ts.RunInline(task);
}
}
catch (OperationCanceledException)
{
}
m_ts.DecrementIdleThreadCount();
}
}
private readonly InternalTaskScheduler m_ts;
private readonly CancellationTokenSource m_cts = new CancellationTokenSource();
private readonly BlockingCollection<Task> m_bus = new BlockingCollection<Task>();
private readonly List<ThreadContext> m_threadCtxs = new List<ThreadContext>();
public BJEThreadPool(int threadCount)
{
m_ts = new InternalTaskScheduler(threadCount, m_bus);
for (int i = 0; i < threadCount; ++i)
{
m_threadCtxs.Add(new ThreadContext("BJE Thread " + i, m_ts, m_bus, m_cts));
}
}
public void Terminate()
{
m_cts.Cancel();
foreach (var t in m_threadCtxs)
{
t.Thread.Join();
}
}
public Task Run(Action<CancellationToken> action)
{
return Task.Factory.StartNew(() => action(m_cts.Token), m_cts.Token, TaskCreationOptions.DenyChildAttach, m_ts);
}
public Task Run(Action action)
{
return Task.Factory.StartNew(action, m_cts.Token, TaskCreationOptions.DenyChildAttach, m_ts);
}
public int IdleThreadCount
{
get { return m_ts.IdleThreadCount; }
}
}
class Program
{
static void Main()
{
const int THREAD_COUNT = 32;
var pool = new BJEThreadPool(THREAD_COUNT);
var tcs = new TaskCompletionSource<bool>();
var tasks = new List<Task>();
var allRunning = new CountdownEvent(THREAD_COUNT);
for (int i = pool.IdleThreadCount; i > 0; --i)
{
var index = i;
tasks.Add(pool.Run(cancellationToken =>
{
Console.WriteLine("Started action " + index);
allRunning.Signal();
tcs.Task.Wait(cancellationToken);
Console.WriteLine(" Ended action " + index);
}));
}
Console.WriteLine("pool.IdleThreadCount = " + pool.IdleThreadCount);
allRunning.Wait();
Debug.Assert(pool.IdleThreadCount == 0);
int expectedIdleThreadCount = THREAD_COUNT;
Console.WriteLine("Press [c]ancel, [e]rror, [a]bort or any other key");
switch (Console.ReadKey().KeyChar)
{
case 'c':
Console.WriteLine("Cancel All");
tcs.TrySetCanceled();
break;
case 'e':
Console.WriteLine("Error All");
tcs.TrySetException(new Exception("Failed"));
break;
case 'a':
Console.WriteLine("Abort All");
pool.Terminate();
expectedIdleThreadCount = 0;
break;
default:
Console.WriteLine("Done All");
tcs.TrySetResult(true);
break;
}
try
{
Task.WaitAll(tasks.ToArray());
}
catch (AggregateException exc)
{
Console.WriteLine(exc.Flatten().InnerException.Message);
}
Debug.Assert(pool.IdleThreadCount == expectedIdleThreadCount);
pool.Terminate();
Console.WriteLine("Press any key");
Console.ReadKey();
}
}
}
这是一个非常简单的实现,它似乎是工作。但是,有一个问题 - BJEThreadPool.Run
方法不接受异步方法。即我实现不允许我添加以下重载:
public Task Run(Func<CancellationToken, Task> action)
{
return Task.Factory.StartNew(() => action(m_cts.Token), m_cts.Token, TaskCreationOptions.DenyChildAttach, m_ts).Unwrap();
}
public Task Run(Func<Task> action)
{
return Task.Factory.StartNew(action, m_cts.Token, TaskCreationOptions.DenyChildAttach, m_ts).Unwrap();
}
我InternalTaskScheduler.RunInline
使用在这种情况下不能正常工作模式。
所以,我的问题是如何添加对异步工作项目的支持?只要在文章开头提到的要求得到遵守,我就可以改变整个设计。
编辑
我想澄清所需的池的intented使用。请注意以下代码:
if (pool.IdleThreadCount == 0)
{
return;
}
foreach (var jobData in FetchFromDB(pool.IdleThreadCount))
{
pool.Run(CreateJobAction(jobData));
}
注:
- 的代码将被定期运行,例如每1分钟。
- 该代码将由多台机器同时运行,观察同一个数据库。
FetchFromDB
将使用Using SQL Server as a DB queue with multiple clients中描述的技术从数据库中原子获取并锁定工作。CreateJobAction
将调用jobData
(工作代码)所示的代码,并在该代码完成时关闭工作。工作代码超出了我的控制范围,它可能几乎是任何东西 - 重大的CPU绑定代码或轻量级异步IO绑定代码,严重写入的同步IO绑定代码或混合的所有代码。它可以运行几分钟,它可以运行几个小时。关闭工作是我的代码,它会通过异步的IO绑定代码。因此,返回的作业操作的签名是异步方法的签名。
第2项强调了正确识别空闲线程数量的重要性。如果有900个未决工作项目和10个代理机器,我不能允许代理获取300个工作项目并将它们排列在线程池中。为什么?因为,代理最不可能同时运行300个工作项目。它会运行一些,果然,但其他人将在线程池工作队列中等待。假设它将运行100并让200等待(即使100可能远远超过)。这可以使用3个满载代理和7个闲置代理。但900个中只有300个工作项目正在同时处理!
我的目标是最大限度地扩大可用代理人之间的工作传播。理想情况下,我应该评估代理的负载和待定工作的“沉重程度”,但这是一项艰巨的任务,并为未来的版本保留。现在,我希望为每个代理分配最大的工作能力,并且意图提供在不重新启动代理的情况下动态增加/减少它的方法。
接下来的观察。这项工作可能需要很长时间才能运行,并且可能都是同步代码。据我所知,利用线程池线程进行这种工作是不可取的。
EDIT 2
有一份声明中说TaskScheduler
只为CPU密集型工作。但是如果我不知道工作的性质呢?我的意思是它是一个通用的后台作业引擎,它运行数千种不同的作业。我没有办法告诉“该作业是CPU绑定的”,“这是同步IO绑定”,而另一个是异步IO绑定。我希望我能,但我不能。
编辑3
最后,我不使用SemaphoreSlim
,但我也不使用TaskScheduler
- 它终于流下我的厚头骨,这是unappropriate和完全错误的,再加上它使代码过于复杂。
不过,我没有看到SemaphoreSlim
是如何。建议模式:
public async Task Enqueue(Func<Task> taskGenerator)
{
await semaphore.WaitAsync();
try
{
await taskGenerator();
}
finally
{
semaphore.Release();
}
}
预计taskGenerator或者是异步IO绑定代码,否则会打开一个新线程。但是,我无法确定要执行的工作是否是另一个工作。另外,正如我从SemaphoreSlim.WaitAsync continuation code得知的,如果信号量被解锁,WaitAsync()之后的代码将在同一个线程上运行,这对我来说不是很好。
无论如何,下面是我的实现,以防万一谁想到。不幸的是,我还没有理解如何动态减少池线程数,但这是另一个问题的主题。
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace TaskStartLatency
{
public interface IBJEThreadPool
{
void SetThreadCount(int threadCount);
void Terminate();
Task Run(Action action);
Task Run(Action<CancellationToken> action);
Task Run(Func<Task> action);
Task Run(Func<CancellationToken, Task> action);
int IdleThreadCount { get; }
}
public class BJEThreadPool : IBJEThreadPool
{
private interface IActionContext
{
Task Run(CancellationToken ct);
TaskCompletionSource<object> TaskCompletionSource { get; }
}
private class ActionContext : IActionContext
{
private readonly Action m_action;
public ActionContext(Action action)
{
m_action = action;
TaskCompletionSource = new TaskCompletionSource<object>();
}
#region Implementation of IActionContext
public Task Run(CancellationToken ct)
{
m_action();
return null;
}
public TaskCompletionSource<object> TaskCompletionSource { get; private set; }
#endregion
}
private class CancellableActionContext : IActionContext
{
private readonly Action<CancellationToken> m_action;
public CancellableActionContext(Action<CancellationToken> action)
{
m_action = action;
TaskCompletionSource = new TaskCompletionSource<object>();
}
#region Implementation of IActionContext
public Task Run(CancellationToken ct)
{
m_action(ct);
return null;
}
public TaskCompletionSource<object> TaskCompletionSource { get; private set; }
#endregion
}
private class AsyncActionContext : IActionContext
{
private readonly Func<Task> m_action;
public AsyncActionContext(Func<Task> action)
{
m_action = action;
TaskCompletionSource = new TaskCompletionSource<object>();
}
#region Implementation of IActionContext
public Task Run(CancellationToken ct)
{
return m_action();
}
public TaskCompletionSource<object> TaskCompletionSource { get; private set; }
#endregion
}
private class AsyncCancellableActionContext : IActionContext
{
private readonly Func<CancellationToken, Task> m_action;
public AsyncCancellableActionContext(Func<CancellationToken, Task> action)
{
m_action = action;
TaskCompletionSource = new TaskCompletionSource<object>();
}
#region Implementation of IActionContext
public Task Run(CancellationToken ct)
{
return m_action(ct);
}
public TaskCompletionSource<object> TaskCompletionSource { get; private set; }
#endregion
}
private readonly CancellationTokenSource m_ctsTerminateAll = new CancellationTokenSource();
private readonly BlockingCollection<IActionContext> m_bus = new BlockingCollection<IActionContext>();
private readonly LinkedList<Thread> m_threads = new LinkedList<Thread>();
private int m_idleThreadCount;
private static int s_threadCount;
public BJEThreadPool(int threadCount)
{
ReserveAdditionalThreads(threadCount);
}
private void ReserveAdditionalThreads(int n)
{
for (int i = 0; i < n; ++i)
{
var index = Interlocked.Increment(ref s_threadCount) - 1;
var t = new Thread(Start)
{
IsBackground = true,
Name = "BJE Thread " + index
};
Interlocked.Increment(ref m_idleThreadCount);
t.Start();
m_threads.AddLast(t);
}
}
private void Start()
{
try
{
foreach (var actionContext in m_bus.GetConsumingEnumerable(m_ctsTerminateAll.Token))
{
RunWork(actionContext).Wait();
}
}
catch (OperationCanceledException)
{
}
catch
{
// Should never happen - log the error
}
Interlocked.Decrement(ref m_idleThreadCount);
}
private async Task RunWork(IActionContext actionContext)
{
Interlocked.Decrement(ref m_idleThreadCount);
try
{
var task = actionContext.Run(m_ctsTerminateAll.Token);
if (task != null)
{
await task;
}
actionContext.TaskCompletionSource.SetResult(null);
}
catch (OperationCanceledException)
{
actionContext.TaskCompletionSource.TrySetCanceled();
}
catch (Exception exc)
{
actionContext.TaskCompletionSource.TrySetException(exc);
}
Interlocked.Increment(ref m_idleThreadCount);
}
private Task PostWork(IActionContext actionContext)
{
m_bus.Add(actionContext);
return actionContext.TaskCompletionSource.Task;
}
#region Implementation of IBJEThreadPool
public void SetThreadCount(int threadCount)
{
if (threadCount > m_threads.Count)
{
ReserveAdditionalThreads(threadCount - m_threads.Count);
}
else if (threadCount < m_threads.Count)
{
throw new NotSupportedException();
}
}
public void Terminate()
{
m_ctsTerminateAll.Cancel();
foreach (var t in m_threads)
{
t.Join();
}
}
public Task Run(Action action)
{
return PostWork(new ActionContext(action));
}
public Task Run(Action<CancellationToken> action)
{
return PostWork(new CancellableActionContext(action));
}
public Task Run(Func<Task> action)
{
return PostWork(new AsyncActionContext(action));
}
public Task Run(Func<CancellationToken, Task> action)
{
return PostWork(new AsyncCancellableActionContext(action));
}
public int IdleThreadCount
{
get { return m_idleThreadCount; }
}
#endregion
}
public static class Extensions
{
public static Task WithCancellation(this Task task, CancellationToken token)
{
return task.ContinueWith(t => t.GetAwaiter().GetResult(), token);
}
}
class Program
{
static void Main()
{
const int THREAD_COUNT = 16;
var pool = new BJEThreadPool(THREAD_COUNT);
var tcs = new TaskCompletionSource<bool>();
var tasks = new List<Task>();
var allRunning = new CountdownEvent(THREAD_COUNT);
for (int i = pool.IdleThreadCount; i > 0; --i)
{
var index = i;
tasks.Add(pool.Run(async ct =>
{
Console.WriteLine("Started action " + index);
allRunning.Signal();
await tcs.Task.WithCancellation(ct);
Console.WriteLine(" Ended action " + index);
}));
}
Console.WriteLine("pool.IdleThreadCount = " + pool.IdleThreadCount);
allRunning.Wait();
Debug.Assert(pool.IdleThreadCount == 0);
int expectedIdleThreadCount = THREAD_COUNT;
Console.WriteLine("Press [c]ancel, [e]rror, [a]bort or any other key");
switch (Console.ReadKey().KeyChar)
{
case 'c':
Console.WriteLine("ancel All");
tcs.TrySetCanceled();
break;
case 'e':
Console.WriteLine("rror All");
tcs.TrySetException(new Exception("Failed"));
break;
case 'a':
Console.WriteLine("bort All");
pool.Terminate();
expectedIdleThreadCount = 0;
break;
default:
Console.WriteLine("Done All");
tcs.TrySetResult(true);
break;
}
try
{
Task.WaitAll(tasks.ToArray());
}
catch (AggregateException exc)
{
Console.WriteLine(exc.Flatten().InnerException.Message);
}
Debug.Assert(pool.IdleThreadCount == expectedIdleThreadCount);
pool.Terminate();
Console.WriteLine("Press any key");
Console.ReadKey();
}
}
}
你会如何总结你的问题?比如'机器不应该同时运行超过N个工作项目'? – 2014-09-02 21:33:27
我会说目前的标题相当准确 - 我有一个不支持异步工作项的自定义线程池,问题是如何修改它以支持它们。 – mark 2014-09-02 21:36:30
由于许多'异步'操作实际上并不使用新线程来运行你的问题有点奇怪... – 2014-09-02 21:40:52