2
我试图编写一个函数(OnceAsync f),它确保在服务器(即多线程环境)上只运行一次异步函数。我认为这很容易,但它很快变得复杂(锁,忙等待!!)OnceAsync:一次运行f#异步函数一次
这是我的解决方案,但我认为它是过度设计的;一定会有更好的办法。这应该在FSI工作:
let locked_counter init =
let c = ref init
fun x -> lock c <| fun() ->
c := !c + x
!c
let wait_until finished = async {
while not(finished()) do
do! Async.Sleep(1000)
}
let OnceAsync f =
// - ensure that the async function, f, is only called once
// - this function always returns the value, f()
let mutable res = None
let lock_inc = locked_counter 0
async {
let count = lock_inc 1
match res, count with
| None, 1 -> // 1st run
let! r = f
res <- Some r
| None, _ -> // nth run, wait for 1st run to finish
do! wait_until (fun() -> res.IsSome)
| _ ->() // 1st run done, return result
return res.Value
}
您可以使用此代码来测试是否OnceAsync是正确的:
let test() =
let mutable count = 0
let initUser id = async {
do! Async.Sleep 1000 // simulate work
count <- count + 1
return count
}
//let fmem1 = (initUser "1234")
let fmem1 = OnceAsync (initUser "1234")
async {
let ps = Seq.init 20 (fun i -> fmem1)
let! rs = ps |> Async.Parallel
printfn "rs = %A" rs // outputs: [|1; 1; 1; 1; 1; ....; 1|]
}
test() |> Async.Start
不错!我希望摆脱锁定和等待,但这比我所拥有的要好得多。 – Ray
@Ray我添加了一个'MailboxProcessor'示例,但是IIRC它确实在引擎盖下使用了锁,所以我不知道是否可以完全脱离它们。 –
看起来我无法摆脱锁。 TaskCompletionSource解决方案起作用。虽然我不能使用Interlocked.Increment;我只会制作我自己的版本。 – Ray