Here is a version using Async.FromContinuations.

However, this is NOT an answer to my problem, because it does not scale. The code may be useful to someone, so posting it here.

The reason this is not an answer is because System.Net.NetworkInformation.Ping does not scale: it appears to use one thread per Ping and quite a bit of memory (likely due to thread stack space). Attempting to ping an entire class-B network will run out of memory and use 100's of threads, whereas the (broken) code above using raw sockets (appears) to use only a few threads and a couple 100Mb -- though this may be incorrect, since that code does not completely work.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type System.Net.NetworkInformation.Ping with
    member this.AsyncPing (address:IPAddress) : Async =
        let pingAsync =
            Async.FromContinuations (fun (cont, econt, ccont) ->
                    let userToken = new obj()
                    let rec handler = 
                            PingCompletedEventHandler (fun _ args ->
                                if userToken = args.UserState then
                                    this.PingCompleted.RemoveHandler(handler)
                                    if args.Cancelled then
                                        ccont (new OperationCanceledException()) 
                                    elif args.Error <> null then
                                        econt args.Error
                                    else
                                        cont args.Reply)
                    this.PingCompleted.AddHandler(handler)
                    this.SendAsync(address, 1000, userToken)
                )
        async { 
            use! _holder = Async.OnCancel(fun _ -> this.SendAsyncCancel())
            return! pingAsync
        }

let AsyncPingTest() =
    let pings =
        seq {
            for net in 0..255 do
                for node in 0..255 do
                    let ip = IPAddress.Parse( sprintf "192.168.%d.%d" net node )
                    let ping = new Ping()
                    yield ping.AsyncPing( ip )
            }
    pings
    |> Async.Parallel
    |> Async.RunSynchronously
    |> Seq.iter ( fun result ->
                      printfn "%A" result )
By on 4/3/2010 9:13 PM ()

Your AsyncPing will wait until it actually receives the data. Since it doesn't receive anything, it will expectedly wait forever. You can use :

1
2
3
4
let result =
  let timeout = 3000
  let comp = socket.AsyncReceiveFrom( inbuffer, 0, inbuffer.Length, SocketFlags.None, epr )
  Async.RunSynchronously(comp, timeout)

This will raise a timeout exception if the data is not received soon enough.

Hope this helps

By on 4/4/2010 8:56 AM ()

Your AsyncPing will wait until it actually receives the data. Since it doesn't receive anything, it will expectedly wait forever.

Good suggestion on using Async.RunSynchronously with a timeout, but I think that applies to the entire parallel operation: when running 1000's of pings at once, they would all need to complete within that time otherwise an exception would be thrown.

I'm actually looking for the reason my AsyncReceiveFrom (or the underlying BCL BeginReceiveFrom) does not honor the timeout, whereas a call to the synchronous BCL ReceiveFrom DOES honor the timeout. On the surface it appears to be a BCL bug, but more likely it's something in my code.

Still looking for help on this one...

By on 4/4/2010 9:25 AM ()

I'm actually looking for the reason my AsyncReceiveFrom (or the underlying BCL BeginReceiveFrom) does not honor the timeout, whereas a call to the synchronous BCL ReceiveFrom DOES honor the timeout. On the surface it appears to be a BCL bug, but more likely it's something in my code.

The docs clearly state that the timeout only applies to the sync versions:

[link:msdn.microsoft.com]

By on 4/4/2010 10:56 AM ()

The docs clearly state that the timeout only applies to the sync versions

That's a shame... complicates the code quite a bit. Will post if I find a graceful solution.

By on 4/4/2010 2:00 PM ()

After some thought, came up with the following. This code adds an AsyncReceiveEx member to Socket, which includes a timeout value. It hides the details of the watchdog timer inside the receive method... very tidy and self contained. Now THIS is what I was looking for!

See the complete async ping example, further below.

Not sure if the locks are necessary, but better safe than sorry...

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
    type System.Net.Sockets.Socket with
        member this.AsyncSend( buffer, offset, size, socketFlags, err ) =
            Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
                                this.BeginSend,
                                this.EndSend,
                                this.Close )

        member this.AsyncReceive( buffer, offset, size, socketFlags, err ) =
            Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
                                this.BeginReceive,
                                this.EndReceive,
                                this.Close )

        member this.AsyncReceiveEx( buffer, offset, size, socketFlags, err, (timeoutMS:int) ) =
            async {
                let timedOut = ref false
                let completed = ref false
                let timer = new System.Timers.Timer( double(timeoutMS), AutoReset=false )
                timer.Elapsed.Add( fun _ ->
                    lock timedOut (fun () ->
                        timedOut := true
                        if not !completed
                        then this.Close()
                        )
                    )
                let complete() =
                    lock timedOut (fun () ->
                        timer.Stop()
                        timer.Dispose()
                        completed := true
                        )
                return! Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
                                    (fun (b,o,s,sf,e,st,uo) ->
                                        let result = this.BeginReceive(b,o,s,sf,e,st,uo)
                                        timer.Start()
                                        result
                                    ),
                                    (fun result ->
                                        complete()
                                        if !timedOut
                                        then err := SocketError.TimedOut; 0
                                        else this.EndReceive( result, err )
                                    ),
                                    (fun () ->
                                        complete()
                                        this.Close()
                                        )
                                    )
                }

Here is a complete Ping example. To avoid running out of source ports and to prevent getting too many replies at once, it scans one class-c subnet at a time.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
    module Ping

    open System
    open System.Net
    open System.Net.Sockets
    open System.Threading

    //---- ICMP Packet Classes

    type IcmpMessage (t : byte) =
        let mutable m_type = t
        let mutable m_code = 0uy
        let mutable m_checksum = 0us

        member this.Type
            with get() = m_type

        member this.Code
            with get() = m_code

        member this.Checksum = m_checksum

        abstract Bytes : byte array
        
        default this.Bytes
            with get() =
                [|
                    m_type
                    m_code
                    byte(m_checksum)
                    byte(m_checksum >>> 8)
                |]

        member this.GetChecksum() =
            let mutable sum = 0ul
            let bytes = this.Bytes
            let mutable i = 0

            // Sum up uint16s
            while i < bytes.Length - 1 do
                sum <- sum + uint32(BitConverter.ToUInt16( bytes, i ))
                i <- i + 2

            // Add in last byte, if an odd size buffer
            if i <> bytes.Length then
                sum <- sum + uint32(bytes.[i])
            
            // Shuffle the bits
            sum <- (sum >>> 16) + (sum &&& 0xFFFFul)
            sum <- sum + (sum >>> 16)
            sum <- ~~~sum
            uint16(sum)

        member this.UpdateChecksum() =
            m_checksum <- this.GetChecksum()


    type InformationMessage (t : byte) =
        inherit IcmpMessage(t)

        let mutable m_identifier = 0us
        let mutable m_sequenceNumber = 0us

        member this.Identifier = m_identifier
        member this.SequenceNumber = m_sequenceNumber

        override this.Bytes
            with get() =
                Array.append (base.Bytes)
                             [|
                                byte(m_identifier)
                                byte(m_identifier >>> 8)
                                byte(m_sequenceNumber)
                                byte(m_sequenceNumber >>> 8)
                             |]

    type EchoMessage() =
        inherit InformationMessage( 8uy )
        let mutable m_data = Array.create 32 32uy
        do base.UpdateChecksum()

        member this.Data
            with get()  = m_data
            and  set(d) = m_data <- d
                          this.UpdateChecksum()

        override this.Bytes
            with get() =
                Array.append (base.Bytes)
                             (this.Data)

    //---- Synchronous Ping

    let Ping (host : IPAddress, timeout : int ) =
        let mutable ep = new IPEndPoint( host, 0 )
        let socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
        socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout )
        socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout )
        let packet = EchoMessage()
        let mutable buffer = packet.Bytes

        try
            if socket.SendTo( buffer, ep ) <= 0 then
                raise (SocketException())
            buffer <- Array.create (buffer.Length + 20) 0uy

            let mutable epr = ep :> EndPoint
            if socket.ReceiveFrom( buffer, &epr ) <= 0 then
                raise (SocketException())
        finally
            socket.Close()

        buffer

    //---- Entensions to the F# Async class to allow up to 5 paramters (not just 3)

    type Async with
        static member FromBeginEnd(arg1,arg2,arg3,arg4,beginAction,endAction,?cancelAction): Async<'T> =
            Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,iar,state)), endAction, ?cancelAction=cancelAction)
        static member FromBeginEnd(arg1,arg2,arg3,arg4,arg5,beginAction,endAction,?cancelAction): Async<'T> =
            Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,arg5,iar,state)), endAction, ?cancelAction=cancelAction)

    //---- Extensions to the Socket class to provide async SendTo and ReceiveFrom

    type System.Net.Sockets.Socket with

        member this.AsyncSend( buffer, offset, size, socketFlags, err ) =
            Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
                                this.BeginSend,
                                this.EndSend,
                                this.Close )

        member this.AsyncReceive( buffer, offset, size, socketFlags, err ) =
            Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
                                this.BeginReceive,
                                this.EndReceive,
                                this.Close )

        member this.AsyncReceiveEx( buffer, offset, size, socketFlags, err, (timeoutMS:int) ) =
            async {
                let timedOut = ref false
                let completed = ref false
                let timer = new System.Timers.Timer( double(timeoutMS), AutoReset=false )
                timer.Elapsed.Add( fun _ ->
                    lock timedOut (fun () ->
                        timedOut := true
                        if not !completed
                        then this.Close()
                        )
                    )
                let complete() =
                    lock timedOut (fun () ->
                        timer.Stop()
                        timer.Dispose()
                        completed := true
                        )
                return! Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
                                    (fun (b,o,s,sf,e,st,uo) ->
                                        let result = this.BeginReceive(b,o,s,sf,e,st,uo)
                                        timer.Start()
                                        result
                                    ),
                                    (fun result ->
                                        complete()
                                        if !timedOut
                                        then err := SocketError.TimedOut; 0
                                        else this.EndReceive( result, err )
                                    ),
                                    (fun () ->
                                        complete()
                                        this.Close()
                                        )
                                    )
                }


    //---- Asynchronous Ping

    let AsyncPing (ip : IPAddress, timeout : int ) =  
        async {
            use socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
            socket.Connect( IPEndPoint( ip, 0 ) )

            let pingTime = System.Diagnostics.Stopwatch()
            let packet = EchoMessage()
            let outbuffer = packet.Bytes
            let err = ref (SocketError())

            let isAlive = ref false
            try
                pingTime.Start()
                let! result = socket.AsyncSend( outbuffer, 0, outbuffer.Length, SocketFlags.None, err )
                pingTime.Stop()

                if result <= 0 then
                    raise (SocketException(int(!err)))
                
                let inbuffer = Array.create (outbuffer.Length + 256) 0uy 

                pingTime.Start()
                let! reply = socket.AsyncReceiveEx( inbuffer, 0, inbuffer.Length, SocketFlags.None, err, timeout )
                pingTime.Stop()

                if result <= 0 && not (!err = SocketError.TimedOut) then
                    raise (SocketException(int(!err)))

                isAlive := not (!err = SocketError.TimedOut)
                              && inbuffer.[25] = 0uy // Type 0 = echo reply (redundent? necessary?)
                              && inbuffer.[26] = 0uy // Code 0 = echo reply (redundent? necessary?)
            finally
                socket.Close()
            
            return (ip, pingTime.Elapsed, !isAlive )
        }

    let main() =
        let pings net =
            seq {
                for node in 0..255 do
                    let ip = IPAddress.Parse( sprintf "192.168.%d.%d" net node )
                    yield Ping.AsyncPing( ip, 1000 )
                }

        for net in 0..255 do
            pings net
            |> Async.Parallel
            |> Async.RunSynchronously
            |> Seq.filter ( fun (_,_,alive) -> alive )
            |> Seq.iter ( fun (ip, time, alive) ->
                              printfn "%A %dms" ip time.Milliseconds)
 
    main()
    System.Console.ReadKey() |> ignore
By on 4/7/2010 9:16 AM ()

Nice solution – but I’m wondering why you are switching from connectionless UDP to connection full TCP only in your latest example?

By on 6/13/2010 3:19 AM ()
IntelliFactory Offices Copyright (c) 2011-2012 IntelliFactory. All rights reserved.
Home | Products | Consulting | Trainings | Blogs | Jobs | Contact Us | Terms of Use | Privacy Policy | Cookie Policy
Built with WebSharper