2009-05-30 55 views
4

我是一名机械工程研究生,我的顾问刚要我为我们的一个传感器项目编写数据可视化工具。由于是夏天,他希望我可以有一些乐趣,所以我认为现在是学习科学计算语言的好时机,所以我继续前进,并且投入到F#中。你如何在F#中解决这个问题? (高频传感器数据)

由于我是新的函数式编程范例,我在构建我的程序时遇到了一些困难,尤其是考虑到可以轻松地将FO中的OO/FP结合起来。我的任务如下:

  1. 我们有几十个传感器不断报告数据(每隔几秒钟一次)。
  2. 我需要同时连接所有传感器,创建每个传感器输出的内存时间序列,然后实时计算这些时间序列的各种统计数据。
  3. 每隔几个小时,我需要将数据刷新到二进制文件中以用于日志记录。

我应该如何设计我的应用程序?我想过这样的事情: 1.我打算连接到每个传感器以开始接收数据,然后将这些数据转储到消息队列中。 2.我会有一个事件驱动的处理函数接收队列中的数据。当收到数据时,会确定数据来自哪个传感器,然后将数据放入相应传感器的时间序列对象中。 3.每当传感器数据时间序列对象被添加时,我都可以触发事件并让我的统计功能为传感器收集新数据。

显然我需要在这个应用程序中保持某种状态。所以我会添加下面的可变数据结构。我会使用一个通用的.NET可调整大小的List来存储我的时间序列数据,并实现一个新的派生方法来触发数据添加事件。我可以在一个Dictionary中存储sensorid和实际时间序列容器之间的映射(当数据从队列中弹出时,我可以读取sensorid字段,获取该sensorid的时间序列容器,然后轻松添加新数据)。我还可以有第二个字典来存储sensorid和各个时间序列之间的映射,这些时间序列包含感应时间序列的统计信息)。当添加主传感器时间序列时,它会触发一个事件来调用所有统计函数,以便在新数据上自行运行,并将其信息存储在该传感器ID的适当字典中。

我还没有想过如何保存数据,但我想我可以用数据写出二进制文件。

任何意见,想法或参考表示赞赏。

谢谢:)

+0

传感器阵列的接口是什么样的?你会收到一个输入的传感器事件流,还是来自每个传感器的流?我认为这是即时的(例如显示最后一分钟或某些类似的滑动窗口移动平均值),所以您需要不断更新计算的统计数据? – Brian 2009-05-30 00:19:23

+0

传感器时间序列对象(可调整大小的数组) - 它只是带有onadd事件的List。不,我从每个传感器获取流,这就是为什么我要说我需要汇总这些数据并将其放在队列中以便于消耗/处理。是的,可视化是实时的,计算的统计数据也需要实时更新。 – 2009-05-30 00:35:02

回答

2

我对此可能完全错误,并且它不是您的问题的答案,所以请将意见投给我,但是我对F#感兴趣的一年就是没有那种感觉在科学/技术环境(我知道oneexception)中有很多使用F#的经验的人(或'在这里')有很多人。

这可能是因为微软把他们的努力集中在了金融界的F#外展上 - 相当不幸回顾的时机!

这并不意味着你不会为你的具体问题得到很好的答案,你只是不可能得到“哦,是的,我上周做了类似的事情,注意到这个问题”等等...

撇开,你知道units of measure in F#?非常有趣的功能与任何技术相关。

5

我建议对执行F#中的新项目,直到你的语言得到了更好的处理,否则你会刚刚结束了在F#的语法编写C#代码。至少在工作中的项目中,最好使用你知道的工具而不是使用公司的资金来试验新技术。

但是,既然您问了,我会使用mailbox processor作为所有传感器输入的线程安全消息处理队列。消息队列可以重新计算收到的每条消息的统计信息。关于我的头顶,我正在考虑这样的设置:

type SensorMsg<'a, 'b> = 
    | Fetch of 'a AsyncReplyChannel 
    | Post of 'b 
    | Die 

type SensorMessageQueue<'a, 'b>(emptyStats : 'a, compute : 'a -> 'b -> 'a) = 
    let queue = MailboxProcessor.Start(fun inbox -> 
      let rec loop stats = 
       async { 
        let! msg = inbox.Receive() 
        match msg with 
        | Die -> return() 
        | Post(x) -> return! loop (compute stats x) 
        | Fetch(x) -> x.Reply(stats); return! loop stats 
       } 
      loop emptyStats 
     ) 

    member this.Post(x) = queue.Post(Post(x)) 
    member this.Fetch() = queue.PostAndReply(fun replyChannel -> Fetch(replyChannel)) 
    member this.Die() = queue.Post(Die) 

类似这样的东西可以保存传感器的实时运行统计信息。例如,可以说,我想后的东西,将举行一个正在运行的平均值:

let averager = 
    SensorMessageQueue(
      (0, 0), (* initial state of sum, total *) 
      (fun (sum, total) input -> sum + input, total + 1) 
     ) 

averager.Post(75) 
averager.Post(90) 
averager.Post(80) 
let x = averager.Fetch() (* returns (245, 3) *) 
averager.Post(100) 
let y = averager.Fetch() (* returns (345, 4) *) 

像这样的设置是比较容易的工作,线程安全的,并且不使用可变的状态(全“状态”的存在在封闭的论点中)。如果你仔细想想,这基本上是一个荣耀的seq.unfold,只有使用邮箱处理器才能实现。这可能是矫枉过正,可能是正确的,或者可能正是你的项目所需要的,这取决于你的要求。