2013-03-26 54 views
3

我尝试了很多与无扩展和现在我试图让我在其中可以排队程序和任何方式我希望同时能够执行它们的系统向用户发送通知。拍摄动作<T>的通用调度与无扩展

我现在有我的数据库访问封装暴露添加用户的方法UserAccess类中。在那个方法中,我想排列一个将用户添加到数据库的操作。所以我做了一个T类的JobProcessor,它公开了一个方法QueueJob(Action)并让我的用户实现了这个类。我的问题是我看不到如何从Observable的OnNext方法内调用Action,因为该操作需要用户参数。

我的攻角必须是错误的,都必须有我的设计的把握问题。例如,我知道我应该以某种方式将我的用户传递给QueueJob过程,但我不知道如何以干净的方式进行操作。

public class UserAccess : JobProcessor<User> 
    { 
     public void AddUser(User user) 
     { 
      QueueJob(usr => 
        { 
         using (var db = new CenterPlaceModelContainer()) 
         { 
          db.Users.Add(usr); 
         } 

        }); 
     [...] 

    public abstract class JobProcessor<T> 
    { 
     // Either Subject<T> or Subject<Action<T>> 
     private Subject<Action<T>> JobSubject = new Subject<Action<T>>(); 

     public JobProcessor() 
     { 
      JobSubject 
      /* Insert Rx Operators Here */ 
      .Subscribe(OnJobNext, OnJobError, OnJobComplete); 
     } 

     private void OnJobNext(Action<T> action) 
     { 
      // ??? 
     } 

     private void OnJobError(Exception exception) 
     { 

     } 

     private void OnJobComplete() 
     { 

     } 

     public void QueueJob(Action<T> action) 
     { 
      JobSubject.OnNext(action); 
     } 
    } 

编辑1:

我试图QueueJob的签名更改为

QueueJob(T entity, Action<T> action) 

现在我能做的

QueueJob(user, usr => { ... }); 

,但它似乎不是很直观。我还没有看到你通过实体和动作的许多框架。因此,我可能不需要JobProcessor。

编辑2: 我改变了我的JobProcessor的主题类型,主题,完全移除吨。由于我不需要在程序中包含用户,因为我可以在外部引用它。现在唯一的问题是,如果用户传递给QueueJob的动作在Action执行的实际时间之间发生变化,那么用户将获得修改后的信息。不受欢迎,但我想我会继续寻找解决方案。

我的代码现在(用于缓冲样品):

public abstract class JobProcessor 
{ 
    public Subject<Action> JobSubject = new Subject<Action>(); 

    public JobProcessor() 
    { 
     JobSubject 
      .Buffer(3) 
      .Subscribe(OnJobNext, OnJobError, OnJobComplete); 
    } 

    private void OnJobNext(IList<Action> actionsList) 
    { 
     foreach (var element in actionsList) 
     { 
      element(); 
     } 
    } 

    private void OnJobError(Exception exception) 
    { 

    } 

    private void OnJobComplete() 
    { 

    } 

    public void QueueJob(Action action) 
    { 
     JobSubject.OnNext(action); 
    } 
} 

回答

1

我不坦率地知道什么是你的'目标”在这里 - 但我认为你得到它向后有点...

通常受试者通过财产暴露如
IObservable<Action<T>> NewJob {get{return _subject;}}
...什么的。 (题目变成可观察的 - 主题在本质上是双 - 为什么它的具体 - 还有一点点争议 - 但良好的周边等播放)

而你只需要调用OnNext从类中 - 像你这样。

但是,您通常不会自己订阅观察值
......您让外部用户通过“挂钩”进入您的财产并定义订阅 - 这会在他们到达时获取新项目。

这是简化的,当然,也有很多情况下,很多用途,但是这可能帮助我希望

+0

事实上,我想我可能需要一些指导,因此我的问题在这里。不过,我希望有一个我可以处理的行为队列。可能是每秒一次或者仅仅累计10秒的动作,而他们只处理其中的一半,而忽略其余部分。我还希望能够通知何时正在处理某个操作,并将其通知给谁订阅。这就是为什么我想到用OnNext和User信息的序列。 – 2013-03-26 01:13:59

+0

......你在一个很好的轨道上,只是重新组织一下 - 然后我不知道问一些具体的问题,这对我来说太笼统了。在'Throttle' - 你可以用它积累更多或类似的东西(这里有几种方法) – NSGaga 2013-03-26 01:21:04

+0

@Lououx我想我明白你的意思 - 我会尝试稍后在 – NSGaga 2013-03-26 14:51:18

1

我最初的反应是的IObservable通常是最适合用于创建不变的数据结构序列,而不是法球/代表/动作。

接下来我会建议,如果你想'排程'行动以队列方式处理,那么在Rx中的IScheduler实现看起来是一个完美的合适!

另外,如果你真的想创建一个ProduceConsumer队列,那么我不认为Rx实际上是最适合这个的。即如果你将一堆消息放入一个队列中,然后让一些消费者阅读这些消息并处理它们,我会寻找不同的框架。

+0

上添加一个实现。同意在生产环境中不使用Rx只是为了使用Rx,但因为我真的很喜欢试验,所以我认为Rx对于表示可以以不同速率播放的执行序列来说是一个有趣的想法。就像在数据库中创建语句一样。我正在尝试去一个非常通用的课程,但我想我迷失在这个过程和乐趣中。 – 2013-03-27 00:07:37

+0

很酷。这绝对是学习Rx的最好方式,通过让你的手变脏,尝试一些代码,看看哪些是有效的,哪些不行。好东西。 – 2013-03-27 10:45:36

1

首先,我必须同意Lee和NSGaga的意见,你可以可能不想这样做 - 还有其他模式可以与生产者/消费者队列更加一致(我认为)你正试图在这里完成。

这就是说,既然我无法抗拒挑战......只需稍作调整,您就可以消除您的“我该通过什么行动?”的直接问题。仅通过捕捉传递的用户参数,并使其成为一个直线上升的Action - 这里的一些修改代码:

public class UserAccess : JobProcessor 
{ 
    public void AddUser(User user) 
    { 
     QueueJob(() => 
       { 
        using (var db = new CenterPlaceModelContainer()) 
        { 
         db.Users.Add(user); 
        } 

       }); 
    [...] 

public abstract class JobProcessor 
{ 
    // Subject<Action> 
    private Subject<Action> JobSubject = new Subject<Action>(); 

    public JobProcessor() 
    { 
     JobSubject 
     /* Insert Rx Operators Here */ 
     .Subscribe(OnJobNext, OnJobError, OnJobComplete); 
    } 

    private void OnJobNext(Action action) 
    { 
     // Log something saying "Yo, I'm executing an action" here? 
     action(); 
    } 

    private void OnJobError(Exception exception) 
    { 
     // Log something saying "Yo, something broke" here? 
    } 

    private void OnJobComplete() 
    { 
     // Log something saying "Yo, we shut down" here? 
    } 

    public void QueueJob(Action action) 
    { 
     JobSubject.OnNext(action); 
    } 
} 
+0

是的捕获更有意义(以前并不真正工作imo) - 但它也引发了更多的问题:)。我宁愿排队用户 - 那么你会得到一个用户列表,并对其进行一次Db扫描。还有一些其他的东西。这是一个很好的例子/问题,虽然@Lououx - 这是一个很好的时间来迎接挑战:)。 – NSGaga 2013-03-26 14:33:45

0

我完成我的设计,我发现我喜欢的东西。如果其他人需要它,这里是代码。

public class JobProcessor<T> : IDisposable where T : new() 
{ 
    private ISubject<Action<T>> jobsProcessor = new Subject<Action<T>>(); 

    private IDisposable disposer; 

    private T _jobProvider = new T(); 

    public JobProcessor(Func<ISubject<Action<T>>, IObservable<IEnumerable<Action<T>>>> initializer) 
    { 
     Console.WriteLine("Entering JobProcessor Constructor"); 

     disposer = initializer(jobsProcessor) 
      .Subscribe(OnJobsNext, OnJobsError, OnJobsComplete); 

     Console.WriteLine("Leaving JobProcessor Constructor"); 
    } 

    private void OnJobsNext(IEnumerable<Action<T>> actions) 
    { 
     Debug.WriteLine("Entering OnJobsNext"); 

     foreach (var action in actions) 
     { 
      action(_jobProvider); 
     } 

     Debug.WriteLine("Leaving OnJobsNext"); 
    } 

    private void OnJobsError(Exception ex) 
    { 
     Debug.WriteLine("Entering OnJobsError"); 

     Debug.WriteLine(ex.Message); 

     Debug.WriteLine("Leaving OnJobsError"); 
    } 

    private void OnJobsComplete() 
    { 
     Debug.WriteLine("Entering OnJobsComplete"); 

     Debug.WriteLine("Leaving OnJobsComplete"); 
    } 

    public void QueueJob(Action<T> action) 
    { 
     Debug.WriteLine("Entering QueueJobs"); 

     jobsProcessor.OnNext(action); 

     Debug.WriteLine("Leaving QueueJobs"); 
    } 

    public void Dispose() 
    { 
     disposer.Dispose(); 
    } 
} 

我选择了一个通用make来支持架构层在那里我可以并发的层,在那里我可以选择多快或多慢,我的执行可以使用JobProcessor。 JobProcessor构造函数接受一个Func用于在代码中的其他位置声明Observable序列,并生成一个处理器,按照该序列描述的顺序执行作业。 OnNext接受IEnumerable>来支持像.Buffer(3)这样的序列,它可以同时返回一批操作。的缺点是,创建一个序列在时间返回单个动作时,我需要这样做

var x = new JobProcessor<DatabaseAccess<User>>(subject => subject.Select(action => action.Yield())); 

T的收率()扩展methof返回单个元件的枚举。我在这里发现它Passing a single item as IEnumerable<T>