I think there's a "'white lie" in your question; I don't think that your async snippet can lose events unless one of: Process() fire events, Process() pumps messages, Process() is async and yields the GUI thread. (But I might be wrong.)

In any case, I can imagine wrapping an event stream in a queue so you never lose a message. Here was my attempt, that has undergone little testing and analysis, but I think might be right.

The NonBlockingEventQueue class takes an event, which it immediately subscribes to. Every event it hears goes in a queue. You can fetch elements via TryGetAsync, or unsubscribe via Dispose(). Everything (constructor, TryGetAsync, Dispose) must be called on the UI thread.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
type NonBlockingEventQueue<'Del,'Args when 'Del : delegate<'Args,unit> and 'Del :> System.Delegate>(e:IEvent<'Del,'Args>) =
	let mutable q = new System.Collections.Generic.Queue<_>()
	let signal = new Event<unit>()
	let unsub = Observable.subscribe (fun x -> q.Enqueue(x); signal.Trigger()) e
	let mutable finished = false
	member this.TryGetAsync() =
		async {
			if q.Count <> 0 then
				let r = q.Dequeue()
				return Some(r)
			elif finished then
				return None
			else
				do! Async.AwaitEvent signal.Publish 
				return! this.TryGetAsync()
		}
	interface System.IDisposable with
		member this.Dispose() =
			if not finished then
				finished <- true
				unsub.Dispose()

open System.Windows.Forms 
[<System.STAThread>]
do 
	let form = new Form(Visible = true)
	let q = new NonBlockingEventQueue<_,_>(form.MouseMove)
	async {
		while true do
			let! mm = q.TryGetAsync()
			match mm with
			| Some(mm) -> printfn "%d %d" mm.X mm.Y 
			| None -> printfn "no more"
	} |> Async.StartImmediate         
	async {
		do! Async.Sleep(5000)
		(q :> System.IDisposable).Dispose()
	} |> Async.StartImmediate         

	Application.Run(form)
By on 9/22/2010 12:36 PM ()

On closer inspection, it looks to me like NonBlockingEventQueue in its current form doesn't actually solve anything. Systems.Collections.Generic.Queue isn't thread-safe, so the event "e" under observation must also fire only on the UI thread. Additionally it's still only listening for signal while Async.AwaitEvent is running, so even if you were using a thread-safe queue you'd sometimes miss an event's arrival and stay dormant (although when the next event arrived you'd wake up and notice both), which could lead to deadlock in some algorithms. Again this isn't a problem when all code runs on the UI thread, since there isn't anyone who would fire events while TryGetAsync is still processing--but in the general case (with caveats noted by Brian's post) you don't notice problems with missing events, period, when all code is running on the UI thread. So you can only use NonBlockingEventQueue in its current form in scenarios where it isn't really needed.
The first issue is simple to fix using standard techniques (SynchronizationContext or locks). The second issue requires using a signal which remains signalled until it gets noticed/handled: a ManualResetEvent is suitable. Here's one example implementation, which may have subtle concurrency bugs that I haven't noticed. It also has a wart in that it requires the SynchronizationContext to be passed in:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
open System
open System.Threading

type CaptureEvents<'Delegate, 'Args> when 'Delegate :> Delegate and 'Delegate : delegate<'Args, unit> (event : IEvent<'Delegate,'Args>, ctx : SynchronizationContext) =
	let events = System.Collections.ObjectModel.Collection<'Args>()
	let moreEventsAvailable = new ManualResetEvent(false)
	let w = event.Subscribe(fun (args : _) -> 
			ctx.Post((fun _ ->
				events.Add(args)
				moreEventsAvailable.Set() |> ignore)
				, null)
			
		)
	member this.GetEvents() : Async<'Args list> =
		async {
			let! ready = Async.AwaitWaitHandle moreEventsAvailable
			let ret = ref [&#0;]
			ctx.Send((fun _ ->
						ret := Seq.toList events
						events.Clear()
						moreEventsAvailable.Reset() |> ignore;
				), null)
			return !ret
		}
	interface IDisposable with
		member this.Dispose() =
			do
				w.Dispose()
				moreEventsAvailable.Dispose()

type FooEventArgs(n : int) =
	inherit EventArgs()
	member this.Num = n

type FooHandler = delegate of obj * FooEventArgs -> unit

type Foo() =
	let e = Event<FooHandler, FooEventArgs>()
	[&#0;<CLIEvent>]
	member this.EventOfInterest = e.Publish
	member this.Provide i = e.Trigger(this, FooEventArgs(i))

let someObject = Foo()
let ctx = SynchronizationContext.Current // capture the GUI thread Sync context

async { 
	use e = new CaptureEvents<_,_>(someObject.EventOfInterest, ctx)
	while true do 
		let! events = e.GetEvents()
		for n in events do
			printfn "%d" n.Num
} |> Async.Start

for i in 1..10 do
	someObject.Provide i // Usually will only print "1"

Lossless event streams are tricky enough IMHO that it might make sense to an implementation in the standard library. For now I should probably stick to MailboxProcessor<'T> for my GUI control logic.
-Max

By on 9/22/2010 2:31 PM ()

Ok, yeah, I assumed you were talking about GUI events. If you're talking about events from other threads, then this is a thornier problem, and MailboxProcessor would be my go-to solution.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
let serializeEventsToGUI<'Del,'Args when 'Del : delegate<'Args,unit> and 'Del :> System.Delegate>(e:IEvent<'Del,'Args>) =
	let guiEv = new Event<'Args>()
	let ctxt = System.Threading.SynchronizationContext.Current 
	let mbox = new MailboxProcessor<'Args>(fun inbox ->
		async { 
			while true do
				let! msg = inbox.Receive()
				do! Async.SwitchToContext(ctxt)
				guiEv.Trigger(msg)
		})
	mbox.Start()
	e.Add(fun args -> mbox.Post(args))  // note, does not unsubscribe
	guiEv.Publish 

open System.Windows.Forms 
[<System.STAThread>]
do 
	let form = new Form(Visible = true)
	// trigger crazy events from crazy threads
	let ev = new Event<int*int>()
	let safeGuiEvent = serializeEventsToGUI(ev.Publish)
	for t in 1..5 do
		async { 
			do! Async.Sleep 100
			for x in 1..5 do 
				ev.Trigger(t,x)
				do! Async.Sleep 10
		} |> Async.Start 
	// pull all events in GUI, none are lost
	let count = ref 0
	async {
		while true do
			let! r = Async.AwaitEvent safeGuiEvent
			incr count
			printfn "%A - got %d msgs so far" r !count
	} |> Async.StartImmediate 
	Application.Run(form)
By on 9/22/2010 3:17 PM ()

Hi Brian,
First off, thanks for the example. It is shorter than the version I came up with and I am still trying to digest the differences.
Second, you can lose events even if Process() is printfn, no async stuff required.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
open System
type FooEventArgs(n : int) =
    inherit EventArgs()
    member this.Num = n

type FooHandler = delegate of obj * FooEventArgs -> unit

type Foo() =
    let e = Event<FooHandler,FooEventArgs>()
    [&#0;<CLIEvent>]
    member this.EventOfInterest = e.Publish
    member this.Provide i = e.Trigger(this, FooEventArgs(i))

let someObject = Foo()

async { 
while true do 
let! result = Async.AwaitEvent someObject.EventOfInterest 
printfn "%d" result.Num
} |> Async.Start

for i in 1..10 do
    someObject.Provide i // Usually will only print "1"

-Max

By on 9/22/2010 1:25 PM ()
IntelliFactory Offices Copyright (c) 2011-2012 IntelliFactory. All rights reserved.
Home | Products | Consulting | Trainings | Blogs | Jobs | Contact Us | Terms of Use | Privacy Policy | Cookie Policy
Built with WebSharper