2010-04-03 71 views
4

使用F#中的原始套接字编写异步Ping以使用尽可能少的线程启用并行请求。不使用“System.Net.NetworkInformation.Ping”,因为它似乎为每个请求分配一个线程。我也对使用F#异步工作流感兴趣。如何检测使用异步Socket.BeginReceive时的超时?

下面的同步版本正确超时目标主机不存在/响应,但异步版本挂起。两者都在主机确实响应时工作。不知道这是一个.NET的问题,或一个F#之一...

任何想法?

(注:该过程必须以管理员身份运行,让原始套接字的访问)

这将引发超时:

let result = Ping.Ping (IPAddress.Parse("192.168.33.22"), 1000) 

然而,这种挂起:

let result = Ping.AsyncPing (IPAddress.Parse("192.168.33.22"), 1000) 
      |> Async.RunSynchronously 

下面的代码...

module Ping 

open System 
open System.Net 
open System.Net.Sockets 
open System.Threading 

//---- ICMP Packet Classes 

type IcmpMessage (t : byte) = 
    let mutable m_type = t 
    let mutable m_code = 0uy 
    let mutable m_checksum = 0us 

    member this.Type 
     with get() = m_type 

    member this.Code 
     with get() = m_code 

    member this.Checksum = m_checksum 

    abstract Bytes : byte array 

    default this.Bytes 
     with get() = 
      [| 
       m_type 
       m_code 
       byte(m_checksum) 
       byte(m_checksum >>> 8) 
      |] 

    member this.GetChecksum() = 
     let mutable sum = 0ul 
     let bytes = this.Bytes 
     let mutable i = 0 

     // Sum up uint16s 
     while i < bytes.Length - 1 do 
      sum <- sum + uint32(BitConverter.ToUInt16(bytes, i)) 
      i <- i + 2 

     // Add in last byte, if an odd size buffer 
     if i <> bytes.Length then 
      sum <- sum + uint32(bytes.[i]) 

     // Shuffle the bits 
     sum <- (sum >>> 16) + (sum &&& 0xFFFFul) 
     sum <- sum + (sum >>> 16) 
     sum <- ~~~sum 
     uint16(sum) 

    member this.UpdateChecksum() = 
     m_checksum <- this.GetChecksum() 


type InformationMessage (t : byte) = 
    inherit IcmpMessage(t) 

    let mutable m_identifier = 0us 
    let mutable m_sequenceNumber = 0us 

    member this.Identifier = m_identifier 
    member this.SequenceNumber = m_sequenceNumber 

    override this.Bytes 
     with get() = 
      Array.append (base.Bytes) 
         [| 
          byte(m_identifier) 
          byte(m_identifier >>> 8) 
          byte(m_sequenceNumber) 
          byte(m_sequenceNumber >>> 8) 
         |] 

type EchoMessage() = 
    inherit InformationMessage(8uy) 
    let mutable m_data = Array.create 32 32uy 
    do base.UpdateChecksum() 

    member this.Data 
     with get() = m_data 
     and set(d) = m_data <- d 
         this.UpdateChecksum() 

    override this.Bytes 
     with get() = 
      Array.append (base.Bytes) 
         (this.Data) 

//---- Synchronous Ping 

let Ping (host : IPAddress, timeout : int) = 
    let mutable ep = new IPEndPoint(host, 0) 
    let socket = new Socket(AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp) 
    socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout) 
    socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout) 
    let packet = EchoMessage() 
    let mutable buffer = packet.Bytes 

    try 
     if socket.SendTo(buffer, ep) <= 0 then 
      raise (SocketException()) 
     buffer <- Array.create (buffer.Length + 20) 0uy 

     let mutable epr = ep :> EndPoint 
     if socket.ReceiveFrom(buffer, &epr) <= 0 then 
      raise (SocketException()) 
    finally 
     socket.Close() 

    buffer 

//---- Entensions to the F# Async class to allow up to 5 paramters (not just 3) 

type Async with 
    static member FromBeginEnd(arg1,arg2,arg3,arg4,beginAction,endAction,?cancelAction): Async<'T> = 
     Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,iar,state)), endAction, ?cancelAction=cancelAction) 
    static member FromBeginEnd(arg1,arg2,arg3,arg4,arg5,beginAction,endAction,?cancelAction): Async<'T> = 
     Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,arg5,iar,state)), endAction, ?cancelAction=cancelAction) 

//---- Extensions to the Socket class to provide async SendTo and ReceiveFrom 

type System.Net.Sockets.Socket with 

    member this.AsyncSendTo(buffer, offset, size, socketFlags, remoteEP) = 
     Async.FromBeginEnd(buffer, offset, size, socketFlags, remoteEP, 
          this.BeginSendTo, 
          this.EndSendTo) 
    member this.AsyncReceiveFrom(buffer, offset, size, socketFlags, remoteEP) = 
     Async.FromBeginEnd(buffer, offset, size, socketFlags, remoteEP, 
          this.BeginReceiveFrom, 
          (fun asyncResult -> this.EndReceiveFrom(asyncResult, remoteEP))) 

//---- Asynchronous Ping 

let AsyncPing (host : IPAddress, timeout : int) = 
    async { 
     let ep = IPEndPoint(host, 0) 
     use socket = new Socket(AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp) 
     socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout) 
     socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout) 

     let packet = EchoMessage() 
     let outbuffer = packet.Bytes 

     try 
      let! result = socket.AsyncSendTo(outbuffer, 0, outbuffer.Length, SocketFlags.None, ep) 
      if result <= 0 then 
       raise (SocketException()) 

      let epr = ref (ep :> EndPoint) 
      let inbuffer = Array.create (outbuffer.Length + 256) 0uy 
      let! result = socket.AsyncReceiveFrom(inbuffer, 0, inbuffer.Length, SocketFlags.None, epr) 
      if result <= 0 then 
       raise (SocketException()) 
      return inbuffer 
     finally 
      socket.Close() 
    } 
+0

有要重新发明System.Net.NetworkInformation.Ping.SendAsync()什么特别的原因?它已经支持超时。 – 2010-04-03 17:32:20

+0

SendAsync/SendToAsync与上面的AsyncSendTo不一样...前者不与F#异步工作流集成(极大地简化了编写异步代码)。 – 2010-04-03 18:12:02

+0

呃,重点是你不必写它。 – 2010-04-03 18:44:15

回答

2

经过一番思考,提出了以下几点。此代码将一个AsyncReceiveEx成员添加到Socket,其中包含一个超时值。它在接收方法中隐藏了看门狗定时器的细节......非常整齐和独立。现在,这就是我正在寻找的!

请参阅下面的完整异步ping示例。

不知道,如果锁是必要的,但有备无患......

type System.Net.Sockets.Socket with 
    member this.AsyncSend(buffer, offset, size, socketFlags, err) = 
     Async.FromBeginEnd(buffer, offset, size, socketFlags, err, 
          this.BeginSend, 
          this.EndSend, 
          this.Close) 

    member this.AsyncReceive(buffer, offset, size, socketFlags, err) = 
     Async.FromBeginEnd(buffer, offset, size, socketFlags, err, 
          this.BeginReceive, 
          this.EndReceive, 
          this.Close) 

    member this.AsyncReceiveEx(buffer, offset, size, socketFlags, err, (timeoutMS:int)) = 
     async { 
      let timedOut = ref false 
      let completed = ref false 
      let timer = new System.Timers.Timer(double(timeoutMS), AutoReset=false) 
      timer.Elapsed.Add(fun _ -> 
       lock timedOut (fun() -> 
        timedOut := true 
        if not !completed 
        then this.Close() 
        ) 
       ) 
      let complete() = 
       lock timedOut (fun() -> 
        timer.Stop() 
        timer.Dispose() 
        completed := true 
        ) 
      return! Async.FromBeginEnd(buffer, offset, size, socketFlags, err, 
           (fun (b,o,s,sf,e,st,uo) -> 
            let result = this.BeginReceive(b,o,s,sf,e,st,uo) 
            timer.Start() 
            result 
           ), 
           (fun result -> 
            complete() 
            if !timedOut 
            then err := SocketError.TimedOut; 0 
            else this.EndReceive(result, err) 
           ), 
           (fun() -> 
            complete() 
            this.Close() 
            ) 
           ) 
      } 

下面是一个完整平安的例子。为了避免耗尽源端口并防止一次收到太多的回复,它一次扫描一个class-c子网。

module Ping 

open System 
open System.Net 
open System.Net.Sockets 
open System.Threading 

//---- ICMP Packet Classes 

type IcmpMessage (t : byte) = 
    let mutable m_type = t 
    let mutable m_code = 0uy 
    let mutable m_checksum = 0us 

    member this.Type 
     with get() = m_type 

    member this.Code 
     with get() = m_code 

    member this.Checksum = m_checksum 

    abstract Bytes : byte array 

    default this.Bytes 
     with get() = 
      [| 
       m_type 
       m_code 
       byte(m_checksum) 
       byte(m_checksum >>> 8) 
      |] 

    member this.GetChecksum() = 
     let mutable sum = 0ul 
     let bytes = this.Bytes 
     let mutable i = 0 

     // Sum up uint16s 
     while i < bytes.Length - 1 do 
      sum <- sum + uint32(BitConverter.ToUInt16(bytes, i)) 
      i <- i + 2 

     // Add in last byte, if an odd size buffer 
     if i <> bytes.Length then 
      sum <- sum + uint32(bytes.[i]) 

     // Shuffle the bits 
     sum <- (sum >>> 16) + (sum &&& 0xFFFFul) 
     sum <- sum + (sum >>> 16) 
     sum <- ~~~sum 
     uint16(sum) 

    member this.UpdateChecksum() = 
     m_checksum <- this.GetChecksum() 


type InformationMessage (t : byte) = 
    inherit IcmpMessage(t) 

    let mutable m_identifier = 0us 
    let mutable m_sequenceNumber = 0us 

    member this.Identifier = m_identifier 
    member this.SequenceNumber = m_sequenceNumber 

    override this.Bytes 
     with get() = 
      Array.append (base.Bytes) 
         [| 
          byte(m_identifier) 
          byte(m_identifier >>> 8) 
          byte(m_sequenceNumber) 
          byte(m_sequenceNumber >>> 8) 
         |] 

type EchoMessage() = 
    inherit InformationMessage(8uy) 
    let mutable m_data = Array.create 32 32uy 
    do base.UpdateChecksum() 

    member this.Data 
     with get() = m_data 
     and set(d) = m_data <- d 
         this.UpdateChecksum() 

    override this.Bytes 
     with get() = 
      Array.append (base.Bytes) 
         (this.Data) 

//---- Entensions to the F# Async class to allow up to 5 paramters (not just 3) 

type Async with 
    static member FromBeginEnd(arg1,arg2,arg3,arg4,beginAction,endAction,?cancelAction): Async<'T> = 
     Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,iar,state)), endAction, ?cancelAction=cancelAction) 
    static member FromBeginEnd(arg1,arg2,arg3,arg4,arg5,beginAction,endAction,?cancelAction): Async<'T> = 
     Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,arg5,iar,state)), endAction, ?cancelAction=cancelAction) 

//---- Extensions to the Socket class to provide async SendTo and ReceiveFrom 

type System.Net.Sockets.Socket with 

    member this.AsyncSend(buffer, offset, size, socketFlags, err) = 
     Async.FromBeginEnd(buffer, offset, size, socketFlags, err, 
          this.BeginSend, 
          this.EndSend, 
          this.Close) 

    member this.AsyncReceive(buffer, offset, size, socketFlags, err) = 
     Async.FromBeginEnd(buffer, offset, size, socketFlags, err, 
          this.BeginReceive, 
          this.EndReceive, 
          this.Close) 

    member this.AsyncReceiveEx(buffer, offset, size, socketFlags, err, (timeoutMS:int)) = 
     async { 
      let timedOut = ref false 
      let completed = ref false 
      let timer = new System.Timers.Timer(double(timeoutMS), AutoReset=false) 
      timer.Elapsed.Add(fun _ -> 
       lock timedOut (fun() -> 
        timedOut := true 
        if not !completed 
        then this.Close() 
        ) 
       ) 
      let complete() = 
       lock timedOut (fun() -> 
        timer.Stop() 
        timer.Dispose() 
        completed := true 
        ) 
      return! Async.FromBeginEnd(buffer, offset, size, socketFlags, err, 
           (fun (b,o,s,sf,e,st,uo) -> 
            let result = this.BeginReceive(b,o,s,sf,e,st,uo) 
            timer.Start() 
            result 
           ), 
           (fun result -> 
            complete() 
            if !timedOut 
            then err := SocketError.TimedOut; 0 
            else this.EndReceive(result, err) 
           ), 
           (fun() -> 
            complete() 
            this.Close() 
            ) 
           ) 
      } 

//---- Asynchronous Ping 

let AsyncPing (ip : IPAddress, timeout : int) = 
    async { 
     use socket = new Socket(AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp) 
     socket.Connect(IPEndPoint(ip, 0)) 

     let pingTime = System.Diagnostics.Stopwatch() 
     let packet = EchoMessage() 
     let outbuffer = packet.Bytes 
     let err = ref (SocketError()) 

     let isAlive = ref false 
     try 
      pingTime.Start() 
      let! result = socket.AsyncSend(outbuffer, 0, outbuffer.Length, SocketFlags.None, err) 
      pingTime.Stop() 

      if result <= 0 then 
       raise (SocketException(int(!err))) 

      let inbuffer = Array.create (outbuffer.Length + 256) 0uy 

      pingTime.Start() 
      let! reply = socket.AsyncReceiveEx(inbuffer, 0, inbuffer.Length, SocketFlags.None, err, timeout) 
      pingTime.Stop() 

      if result <= 0 && not (!err = SocketError.TimedOut) then 
       raise (SocketException(int(!err))) 

      isAlive := not (!err = SocketError.TimedOut) 
          && inbuffer.[25] = 0uy // Type 0 = echo reply (redundent? necessary?) 
          && inbuffer.[26] = 0uy // Code 0 = echo reply (redundent? necessary?) 
     finally 
      socket.Close() 

     return (ip, pingTime.Elapsed, !isAlive) 
    } 

let main() = 
    let pings net = 
     seq { 
      for node in 0..255 do 
       let ip = IPAddress.Parse(sprintf "192.168.%d.%d" net node) 
       yield Ping.AsyncPing(ip, 1000) 
      } 

    for net in 0..255 do 
     pings net 
     |> Async.Parallel 
     |> Async.RunSynchronously 
     |> Seq.filter (fun (_,_,alive) -> alive) 
     |> Seq.iter (fun (ip, time, alive) -> 
          printfn "%A %dms" ip time.Milliseconds) 

main() 
System.Console.ReadKey() |> ignore 
+0

这现在非常接近你想要的。但是这仍然有一个问题,请参阅我的最新答案。 – Brian 2010-04-07 16:43:48

+0

更新了代码以反映您的修复。谢谢! – 2010-04-07 18:07:32

1

一对夫妇......

首先,可以将.NET模式调整为F#异步。 FSharp.Core库为WebClient执行此操作;我想你可以在这里使用相同的模式。这里的Web客户端代码

type System.Net.WebClient with 
    member this.AsyncDownloadString (address:Uri) : Async<string> = 
     let downloadAsync = 
      Async.FromContinuations (fun (cont, econt, ccont) -> 
        let userToken = new obj() 
        let rec handler = 
          System.Net.DownloadStringCompletedEventHandler (fun _ args -> 
           if userToken = args.UserState then 
            this.DownloadStringCompleted.RemoveHandler(handler) 
            if args.Cancelled then 
             ccont (new OperationCanceledException()) 
            elif args.Error <> null then 
             econt args.Error 
            else 
             cont args.Result) 
        this.DownloadStringCompleted.AddHandler(handler) 
        this.DownloadStringAsync(address, userToken) 
       ) 
      async { 
       use! _holder = Async.OnCancel(fun _ -> this.CancelAsync()) 
       return! downloadAsync 
      } 

,我想你可以SendAsync/SendAsyncCancel/PingCompleted做同样的(我还没有仔细通认为它)。

二,将您的方法命名为AsyncPing,而不是PingAsync。 F#异步方法名为AsyncFoo,而具有事件模式的方法名为FooAsync

我没仔细看过你的代码,试图找出错误可能在哪里。

+0

将PingAsync重命名为AsyncPing。如果时间允许,我会研究其他想法,看看它是否克服了我的超时问题。 – 2010-04-03 18:37:36

+0

试图将其封装在Async.FromContinuations中,但它仍然存在创建数百个线程的问题,否则将无法扩展(在并行ping B类时耗尽内存)。代码发布在一个单独的答案,以防有人发现它的使用... – 2010-04-03 23:11:07

0

这是一个使用Async.FromContinuations的版本。

但是,这不是我的问题的答案,因为它不能缩放。该代码可能对某人有用,所以在此张贴。

这不是答案的原因是因为System.Net.NetworkInformation.Ping似乎使用每个Ping一个线程和相当多的内存(可能由于线程堆栈空间)。尝试ping整个B类网络将耗尽内存并使用100个线程,而使用原始套接字的代码只使用少量线程并且不到10Mb。

type System.Net.NetworkInformation.Ping with 
    member this.AsyncPing (address:IPAddress) : Async<PingReply> = 
     let pingAsync = 
      Async.FromContinuations (fun (cont, econt, ccont) -> 
        let userToken = new obj() 
        let rec handler = 
          PingCompletedEventHandler (fun _ args -> 
           if userToken = args.UserState then 
            this.PingCompleted.RemoveHandler(handler) 
            if args.Cancelled then 
             ccont (new OperationCanceledException()) 
            elif args.Error <> null then 
             econt args.Error 
            else 
             cont args.Reply) 
        this.PingCompleted.AddHandler(handler) 
        this.SendAsync(address, 1000, userToken) 
       ) 
     async { 
      use! _holder = Async.OnCancel(fun _ -> this.SendAsyncCancel()) 
      return! pingAsync 
     } 

let AsyncPingTest() = 
    let pings = 
     seq { 
      for net in 0..255 do 
       for node in 0..255 do 
        let ip = IPAddress.Parse(sprintf "192.168.%d.%d" net node) 
        let ping = new Ping() 
        yield ping.AsyncPing(ip) 
      } 
    pings 
    |> Async.Parallel 
    |> Async.RunSynchronously 
    |> Seq.iter (fun result -> 
         printfn "%A" result) 
0

编辑:代码更改为工作版本。

James,我修改了你的代码,它看起来可以和你的版本一样工作,但是使用MailboxProcessor作为超时处理引擎。代码速度比您的版本低4倍,但使用的内存减少了1.5倍。

let AsyncPing (host: IPAddress) timeout = 
    let guard = 
     MailboxProcessor<AsyncReplyChannel<Socket*byte array>>.Start(
      fun inbox -> 
      async { 
       try 
        let socket = new Socket(AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp) 
        try 
         let ep = IPEndPoint(host, 0) 
         let packet = EchoMessage() 
         let outbuffer = packet.Bytes 
         let! reply = inbox.Receive() 
         let! result = socket.AsyncSendTo(outbuffer, 0, outbuffer.Length, SocketFlags.None, ep) 
         if result <= 0 then 
          raise (SocketException()) 
         let epr = ref (ep :> EndPoint) 
         let inbuffer = Array.create (outbuffer.Length + 256) 0uy 
         let! result = socket.AsyncReceiveFrom(inbuffer, 0, inbuffer.Length, SocketFlags.None, epr) 
         if result <= 0 then 
          raise (SocketException()) 
         reply.Reply(socket,inbuffer) 
         return() 
        finally 
         socket.Close() 
       finally 
        () 
      }) 
    async { 
     try 
      //#1: blocks thread and as result have large memory footprint and too many threads to use 
      //let socket,r = guard.PostAndReply(id,timeout=timeout) 

      //#2: suggested by Dmitry Lomov 
      let! socket,r = guard.PostAndAsyncReply(id,timeout=timeout) 
      printfn "%A: ok" host 
      socket.Close() 
     with 
      _ -> 
       printfn "%A: failed" host 
       () 
     } 

//test it 
//timeout is ms interval 
//i.e. 10000 is equal to 10s 
let AsyncPingTest timeout = 
    seq { 
     for net in 1..254 do 
      for node in 1..254 do 
       let ip = IPAddress.Parse(sprintf "192.168.%d.%d" net node) 
       yield AsyncPing ip timeout 
    } 
    |> Async.Parallel 
    |> Async.RunSynchronously 
+0

我很确定这个将泄漏一个套接字和一个待处理的异步接收。使用邮箱是一个好主意,但它需要一个额外的命令来发送一个Close到套接字来解除悬而未决的I/O。看到我接受的答案是另一种方法。 – 2010-04-07 21:44:11

+0

@James:我发现泄漏,现在它也可以工作:)您的解决方案速度可达4倍。 – ssp 2010-04-22 09:04:56

7

詹姆斯,你自己接受的答案有一个问题,我想指出。您只分配一个计时器,使AsyncReceiveEx返回的异步对象成为有状态的一次性对象。下面是我下调一个类似的例子:

let b,e,c = Async.AsBeginEnd(Async.Sleep) 

type Example() = 
    member this.Close() =() 
    member this.AsyncReceiveEx(sleepTime, (timeoutMS:int)) = 
     let timedOut = ref false 
     let completed = ref false 
     let timer = new System.Timers.Timer(double(timeoutMS), AutoReset=false) 
     timer.Elapsed.Add(fun _ -> 
      lock timedOut (fun() -> 
       timedOut := true 
       if not !completed 
       then this.Close() 
       ) 
      ) 
     let complete() = 
      lock timedOut (fun() -> 
       timer.Stop() 
       timer.Dispose() 
       completed := true 
       ) 
     Async.FromBeginEnd(sleepTime, 
          (fun st -> 
           let result = b(st) 
           timer.Start() 
           result 
          ), 
          (fun result -> 
           complete() 
           if !timedOut 
           then printfn "err";() 
           else e(result) 
          ), 
          (fun() -> 
           complete() 
           this.Close() 
           ) 
          ) 

let ex = new Example() 
let a = ex.AsyncReceiveEx(3000, 1000) 
Async.RunSynchronously a 
printfn "ok..." 
// below throws ODE, because only allocated one Timer 
Async.RunSynchronously a 

理想情况下,你希望每一个通过AsyncReceiveEx返回有同样的表现异步的,这意味着每次运行都需要自己的计时器的“运行”,并设置参考标志。这是很容易修复正是如此:

let b,e,c = Async.AsBeginEnd(Async.Sleep) 

type Example() = 
    member this.Close() =() 
    member this.AsyncReceiveEx(sleepTime, (timeoutMS:int)) = 
     async { 
     let timedOut = ref false 
     let completed = ref false 
     let timer = new System.Timers.Timer(double(timeoutMS), AutoReset=false) 
     timer.Elapsed.Add(fun _ -> 
      lock timedOut (fun() -> 
       timedOut := true 
       if not !completed 
       then this.Close() 
       ) 
      ) 
     let complete() = 
      lock timedOut (fun() -> 
       timer.Stop() 
       timer.Dispose() 
       completed := true 
       ) 
     return! Async.FromBeginEnd(sleepTime, 
          (fun st -> 
           let result = b(st) 
           timer.Start() 
           result 
          ), 
          (fun result -> 
           complete() 
           if !timedOut 
           then printfn "err";() 
           else e(result) 
          ), 
          (fun() -> 
           complete() 
           this.Close() 
           ) 
          ) 
     } 
let ex = new Example() 
let a = ex.AsyncReceiveEx(3000, 1000) 
Async.RunSynchronously a 
printfn "ok..." 
Async.RunSynchronously a 

唯一的变化是把AsyncReceiveEx的体内async{...}并有最后一行return!

+0

很好抓,谢谢!答案已更新以反映此修复程序。 – 2010-04-07 17:47:04