2009-09-30 77 views
8
 
(fileNameToCharStream "bigfile" 
|>> fuse [length; 
      splitBy (fun x -> x = ' ' || x = '\n') removeEmpty |>> length; 
      splitBy (fun x -> x = '\n') keepEmpty |>> length; 
     ]) 
    (*fuse "fuses" the three functions to run concurrently*) 
|> run 2 (*forces to run in parallel on two threads*) 
|> (fun [num_chars; num_words; num_lines] -> 
     printfn "%d %d %d" 
      num_chars num_words, num_lines)) 

我想使这段代码工作在以下方式: 将原始流拆分为两个正好在中间;然后 为每个半运行一个单独的计算,即 计算3件事:长度(即字符数), 字数,行数。 但是,我不想有一个问题,如果 我错误地分裂了一个字。这必须是 照顾。该文件只能读取一次。并行流水线

我该如何编程指定的功能和操作符>> >>? 这可能吗?

+0

这可能是因为美国还没有醒来,但悬而未决的是,您可能要查找的关键字“异步”获得更好的想法是什么是可能的。 – Benjol 2009-09-30 08:42:22

+0

你认为什么样的签名会融合,运行和| >>会有什么特点?例如,你的三元素列表变成了三元组? – Gabriel 2009-09-30 15:55:47

+0

对,我的意思是: |(fun [num_chars; num_words; num_lines] - > – 2009-09-30 20:27:06

回答

8

它看起来像你的要求很多。我会留给你弄清楚字符串操作,但是我会告诉你如何定义一个并行执行一系列操作的操作符。

第1步:写fuse功能

你的保险丝功能似乎映射单个输入使用多种功能,这是很容易写出如下:

//val fuse : seq<('a -> 'b)> -> 'a -> 'b list 
let fuse functionList input = [ for f in functionList -> f input] 

注意,所有的你的映射函数需要有相同的类型。

步骤2:定义操作者执行的功能并行

标准并行映射函数可以被写为如下:

//val pmap : ('a -> 'b) -> seq<'a> -> 'b array 
let pmap f l = 
    seq [for a in l -> async { return f a } ] 
    |> Async.Parallel 
    |> Async.RunSynchronously 

据我所知,Async.Parallel将并行执行异步操作,在任何给定时间执行的并行任务数量等于机器上的内核数量(如果我错了,有人可以纠正我)。因此,在双核心机器上,当调用此函数时,我们的机器上最多应该有2个线程运行。这是一件好事,因为我们不希望每个内核运行多个线程都会加速(实际上,额外的上下文切换可能会减慢速度)。

我们可以在pmapfuse来定义操作|>>

//val (|>>) : seq<'a> -> seq<('a -> 'b)> -> 'b list array 
let (|>>) input functionList = pmap (fuse functionList) input 

所以|>>运营商采取了一堆的输入,并将其使用大量不同输出的地图。到目前为止,如果我们把所有这些组合起来,我们得到以下(在FSI):

> let countOccurrences compareChar source = 
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0) 

let length (s : string) = s.Length 

let testData = "Juliet is awesome|Someone should give her a medal".Split('|') 
let testOutput = 
    testData 
    |>> [length; countOccurrences 'J'; countOccurrences 'o'];; 

val countOccurrences : 'a -> seq<'a> -> int 
val length : string -> int 
val testData : string [] = 
    [|"Juliet is awesome"; "Someone should give her a medal"|] 
val testOutput : int list array = [|[17; 1; 1]; [31; 0; 3]|] 

testOutput包含两个元素,两者都是并行计算。

步骤3:聚合元素成单个输出

好了,所以现在我们有了由我们的阵列中的每个元素表示的部分结果,我们希望给我们的部分结果合并到单个聚集体。我假设数组中的每个元素都应该合并为相同的函数,因为输入中的每个元素都具有相同的数据类型。

这里是一个非常丑陋的功能,我写了作业:

> let reduceMany f input = 
    input 
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]);; 

val reduceMany : ('a -> 'a -> 'a) -> seq<'a list> -> 'a list 

> reduceMany (+) testOutput;; 
val it : int list = [48; 1; 4] 

reduceMany采用N长度序列的序列,并返回一个正长数组作为输出。如果你能想出更好的方式来写这个功能,是我的客人:)

为上述解码输出:

  • 48 =我的两个输入字符串的长度之和。请注意,原始字符串是49个字符,但将其分割为“|”每个“|”吃了一个字符。
  • 1 =我输入中'J'的所有实例的总和
  • 4 =所有“O”实例的总和。

第4步:把一切融合在一起

let pmap f l = 
    seq [for a in l -> async { return f a } ] 
    |> Async.Parallel 
    |> Async.RunSynchronously 

let fuse functionList input = [ for f in functionList -> f input] 

let (|>>) input functionList = pmap (fuse functionList) input 

let reduceMany f input = 
    input 
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]) 

let countOccurrences compareChar source = 
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0) 

let length (s : string) = s.Length 

let testData = "Juliet is awesome|Someone should give her a medal".Split('|') 
let testOutput = 
    testData 
    |>> [length; countOccurrences 'J'; countOccurrences 'o'] 
    |> reduceMany (+)