I think I have a decent implementation, though I'm not certain I've used everything as it was initially intended. Any thoughts are appreciated.

1
2
3
4
5
6
7
8
9
10
11
module AsyncEx

open System

type Async<'a> with
  member this.ToObservable() = 
    { new IObservable<'a> with
        member x.Subscribe(o:IObserver<'a>) =
          Async.StartWithContinuations(this, o.OnNext, o.OnError, (fun c -> o.OnCompleted()))
          { new IDisposable with
              member x.Dispose() = Async.CancelDefaultToken() } }
By on 9/12/2010 5:26 PM ()

I guess it's a decent implementaion but I think there might be one problem:

The Async<'T> will response only once so you might want to "raise" the OnCompleted just after the OnNext - just in case that the observer needs to know the "end"

By on 9/12/2010 9:32 PM ()

Thanks, Chris. That's what I was afraid of. I don't want it to respond only once but wrap an Async as a true IObservable. The OnCompleted was really placed in the wrong part of StartWithContinuations anyway. Any thoughts as to how to achieve what I'm after?

Thanks,

Ryan

By on 9/12/2010 11:22 PM ()

You can wrap Async into another Async

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
let toObservable (a : Async<'T>) = 
    { new System.IObservable<'T> with
        member this.Subscribe(observer) = 
            if observer = null then nullArg "observer"

            let cts = new System.Threading.CancellationTokenSource()
            
            // CancellationTokenSource can hold WaitHandle inside so it should definitly be disposed
            // use 'invoked' flag to make sure that method (either Cancel or Dispose) will be called only once
            let invoked = ref 0
            let cancelOrDispose cancel  = 
                if System.Threading.Interlocked.CompareExchange(invoked, 1, 0) = 0 
                then 
                    if cancel then cts.Cancel() else cts.Dispose()

            let wrapper = async {
                try
                    try
                        let! r = a
                        observer.OnNext(r)
                        observer.OnCompleted()
                    with
                        e -> observer.OnError(e)
                finally
                    cancelOrDispose(false)
                } 
            Async.StartImmediate(computation = wrapper, cancellationToken = cts.Token)

            { new System.IDisposable with member this.Dispose() = cancelOrDispose(true) }
        }
By on 9/13/2010 4:18 AM ()
IntelliFactory Offices Copyright (c) 2011-2012 IntelliFactory. All rights reserved.
Home | Products | Consulting | Trainings | Blogs | Jobs | Contact Us | Terms of Use | Privacy Policy | Cookie Policy
Built with WebSharper