2017-06-01 90 views
2

我试图编写一个函数(OnceAsync f),它确保在服务器(即多线程环境)上只运行一次异步函数。我认为这很容易,但它很快变得复杂(锁,忙等待!!)OnceAsync:一次运行f#异步函数一次

这是我的解决方案,但我认为它是过度设计的;一定会有更好的办法。这应该在FSI工作:

let locked_counter init = 
    let c = ref init 
    fun x -> lock c <| fun() -> 
     c := !c + x 
     !c 
let wait_until finished = async { 
    while not(finished()) do 
     do! Async.Sleep(1000) 
} 

let OnceAsync f = 
    // - ensure that the async function, f, is only called once 
    // - this function always returns the value, f() 
    let mutable res = None 
    let lock_inc = locked_counter 0 

    async { 
     let count = lock_inc 1 

     match res, count with 
     | None, 1 -> // 1st run 
      let! r = f 
      res <- Some r 
     | None, _ -> // nth run, wait for 1st run to finish 
      do! wait_until (fun() -> res.IsSome) 
     | _ ->()  // 1st run done, return result 

     return res.Value 
    } 

您可以使用此代码来测试是否OnceAsync是正确的:

let test() = 
    let mutable count = 0 

    let initUser id = async { 
     do! Async.Sleep 1000 // simulate work 
     count <- count + 1 
     return count 
    } 

    //let fmem1 = (initUser "1234") 
    let fmem1 = OnceAsync (initUser "1234") 

    async { 
     let ps = Seq.init 20 (fun i -> fmem1) 
     let! rs = ps |> Async.Parallel 
     printfn "rs = %A" rs  // outputs: [|1; 1; 1; 1; 1; ....; 1|] 
    } 

test() |> Async.Start 

回答

3

如果它适合,最简单的方法整体是使用Async.StartChild。与你的解决方案不同,即使结果从未被实际使用过,也会导致函数运行,例如,在Seq.init 0的情况下。

//let fmem1 = OnceAsync (initUser "1234") 

async { 
    let! fmem1 = Async.StartChild (initUser "1234") 
    let ps = Seq.init 20 (fun i -> fmem1) 
    let! rs = ps |> Async.Parallel 
    printfn "rs = %A" rs  // outputs: [|1; 1; 1; 1; 1; ....; 1|] 
} |> Async.RunSynchronously 

最相似的你最简单的方法是使用一个TaskCompletionSource如下:

let OnceAsync f = 
    let count = ref 0 
    let tcs = TaskCompletionSource<_>() 
    async { 
    if Interlocked.Increment(count) = 1 then 
     let! r = f 
     tcs.SetResult r 
    return! Async.AwaitTask tcs.Task 
    } 

一个功能更强大的方法是使用一个MailboxProcessor并将它后第一次运行缓存的结果,随后回应所有后续请求。

let OnceAsync f = 
    let handler (agent: MailboxProcessor<AsyncReplyChannel<_>>) = 
    let rec run resultOpt = 
     async { 
     let! chan = agent.Receive() 
     let! result = 
      match resultOpt with 
      | None -> f 
      | Some result -> async.Return result 
     chan.Reply result 
     return! run (Some result) 
     } 
    run None 
    let mbp = MailboxProcessor.Start handler 
    async { return! mbp.PostAndAsyncReply id } 
+1

不错!我希望摆脱锁定和等待,但这比我所拥有的要好得多。 – Ray

+1

@Ray我添加了一个'MailboxProcessor'示例,但是IIRC它确实在引擎盖下使用了锁,所以我不知道是否可以完全脱离它们。 –

+1

看起来我无法摆脱锁。 TaskCompletionSource解决方案起作用。虽然我不能使用Interlocked.Increment;我只会制作我自己的版本。 – Ray