Hi,
if you need to handle only a reasonably small (and limited) number of events, then it should be quite easy to extend the implementation of Async.AwaitObservable (in fact, I think that some my code sample includes a version taking three events).

Anyway, a general approach is to combine the events in advance using functions from the "Observable" module. This is quite similar to what AwaitObservable does internally. When you write this yourself, it is probably a better idea to define your own discriminated union (to represent different cases) instead of the generic Choice to make code more readable. I would probably write something like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
 
type FileSystemEvent =
  | Deleted of FileSystemEventArgs
  | Created of FileSystemEventArgs
  | Renamed of RenamedEventArgs

let rec loop() = async {
    let! args = 
      // Create a list of 'Event<FileSystemEvent>' events that 
      // represent all different events that you want to handle
      [ Observable.map Deleted fileSystemWatcher.Deleted;
        Observable.map Renamed fileSystemWatcher.Renamed;
        Observable.map Created fileSystemWatcher.Created ]
      // Combine them into a single event that occurs 
      // whenever any of the events occur
      |> List.reduce Observable.merge
      // Wait for the first one using AwaitObservable 
      |> Async.AwaitObservable
       
    // Implement different handling for different events
    match args with
    | Deleted(args) ->
        printfn "Deleted: %s" args.FullPath
        return! loop()  
    | Renamed(args) ->
        printfn "Renamed: %s - %s" args.OldFullPath args.FullPath
        return! loop() 
    | Created(args) ->
        printfn "Created: %s" args.FullPath
        return! loop() }  

In some sense, this code is one step between event-based solution and an agent. The type FileSystemEvent represents the type of messages that your agent (if you were using them) would handle, but the code is simpler. There are two important notes:

  • Can the file system events occur in parallel - e.g. next one is triggered while you're still processing an earlier event? If yes, then you'll need to use agents - the code above works only for synchronized events (e.g. through user interface message loop).
  • You need to use Observable module & AwaitObservable in this case (instead of Event versions), because this is exactly the case where Event causes memory leaks.
By on 5/12/2011 4:08 PM ()

After discussing this with Brian McNamara, I figured out that there is an alternative way to write this. I don't think it is particularly readable (because it is a bit more tricky), so you may still prefer the version with discriminated union.

The trick is to return asynchronous workflows that represent handlers of events instead of discriminated union values:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 
let rec loop() = async {
    let! op = 
      [ fileSystemWatcher.Deleted |> Observable.map (fun args -> async {
          printfn "Deleted: %s" args.FullPath
          return! loop() })
        fileSystemWatcher.Renamed |> Observable.map (fun args -> async {
          printfn "Renamed: %s - %s" args.OldFullPath args.FullPath
          return! loop() })
        fileSystemWatcher.Created |> Observable.map (fun args -> async {
          printfn "Created: %s" args.FullPath
          return! loop() }) ]
      |> List.reduce Observable.merge
      |> Async.AwaitObservable
    return! op }

The "AwaitObservable" method waits for the first event and then returns the workflow (created in the map function) that should be executed to handle the event. The workflow is executed using "return!"

By on 5/12/2011 4:31 PM ()

Hey, thanks for the explanations and alternatives, and Brian too!

I'm not sure if I can appreciate the trickiness mentioned in the second example as to me it looks like a cleaner approach where there's an array of asynchronous workflows and no "artificial" discriminated union. :-) As far as I can currently understand, an appropriate asynchronous workflow will be activated corresponding to a fired event . It doesn't look like there should be any problems, they are indepedent of each other. This construct does make me wonder, however, how is one supposed to catch and recover from exceptions? I tried to introduce Async.Catch to various places and then to get my hand on the Choice tuple, but in vain, I got various kinds of compiler errors.

In my demonstration, the events can fire simultaneously (it's a console program). I infer from the example code that the synhronize function will handle this correctly, as it grabs the appropriate context and uses it. In any event, I think I need to go with agents but that's all right, I've thought about getting them to this demo of mine too. A question before sweating many nightly hours, is there some obstacles on using agents and the second approach? My current understanding is that I should start a recursive loop in the Start function, handle the events and post some preprocessed information to another agent for even further processing (and I think I'd like to go with a design leaning toward massive processing, maybe not this big yet). Something akin to tomasp's two part series about agents and blockinqueues.

By on 5/18/2011 1:34 PM ()

To continue with my own ideas, this is how one should take care of exceptions, I think

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
let rec loop() = async {
        try
            let! op =
              [ fileSystemWatcher.Deleted |> Observable.map(fun args -> async {
                    try
                      printfn "Deleted: %s" args.FullPath
                      return! loop() 
                    with
                    | ex -> printfn "%s" (ex.Message)
                })
                fileSystemWatcher.Renamed |> Observable.map(fun args -> async {

                  printfn "Renamed: %s - %s" args.OldFullPath args.FullPath
                  return! loop() })
                fileSystemWatcher.Created |> Observable.map(fun args -> async {
                  printfn "Created: %s" args.FullPath
                  return! loop() })
                fileSystemWatcher.Changed |> Observable.map(fun args -> async {
                  printfn "Changed: %s" args.FullPath
                  return! loop() })

              ]
              |> List.reduce Observable.merge
              |> Async.AwaitObservable
            return! op
        with
        | ex -> printfn "%s" (ex.Message);
    }

though I'm not quite sure if the outermost exception handling block is needed
or even unbeneficial (I've included an exception handling block only on the
first workflow). I have somewhat trouble to visualize the situation in
terms I can relate to, i.e. the "C languages" like C#, C++ or Java. Maybe
it's time to consider literature studies also. :-)

By on 5/19/2011 1:28 PM ()
IntelliFactory Offices Copyright (c) 2011-2012 IntelliFactory. All rights reserved.
Home | Products | Consulting | Trainings | Blogs | Jobs | Contact Us | Terms of Use | Privacy Policy | Cookie Policy
Built with WebSharper