Do you have a reply channel in the post code? Do all lines of code in the handler have replies?

To me, it sounds like it's waiting for a reply that isn't coming. But, that's just a guess. If you replace PostAndReply with TryPostAndReply, does it still give you the error?

By on 1/12/2010 8:53 AM ()

Thanks - my implementation uses the approach shown by Matthew Podwysocki in [link:codebetter.com] which as I read it provides all that is needed. (of course, I'm still trying to get everything straight in my head, so I may not be following it properly - particularly since it doesn't work.[:)])

Thanks to Brian's direction, I can now try to provide my code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
 

//RequestGate manages a semaphore.   The Semaphore is released when the acquiring object is disposed of
//no explicit release is required.
type RequestGate(n:int) =
    let semaphore = new Semaphore(initialCount=n, maximumCount=n)
      member x.AcquireAsync(?timeout) = 
 async {let! ok = Async.AwaitWaitHandle(semaphore,?millisecondsTimeout = timeout)
    if ok then
    return
    {new System.IDisposable with
    member x.Dispose() = 
          semaphore.Release() |> ignore }
    else
     log.Error "Semaphore failure"
       return! failwith "couldn't acquire a semphore" 
} 


      let pageRequestGate = new RequestGate(8)

 

// Curry the arguments
       let curry4 f  x y z r  = f ( x, y, z, r )
       let curry3 f  x y z   = f ( x, y, z )

// Asynchronous post
       let (<--) (m:_ MailboxProcessor) msg = m.Post msg

// Post and reply operator
       let (<->) (m:_ MailboxProcessor) msg = m.TryPostAndReply(fun replyChannel -> msg replyChannel)


//PipelineHandler implements a Mailbox processor that initiates a new thread for each URL to 
//be processed.   It runs under the control of a semaphore (pageRequestGate) to limit the number of 
//pages being processed at one time. 
//runPipeline is a dependency injection for XYZZYAsyncPipelineManager
       type PipelineHandler (runPipeline : IDisposable -> string -> string -> MarketType -> PipelineWebProcessResult list ) =
       let rec runPipe =
       MailboxProcessor.Start(fun inbox ->
       let rec waitForMsg () =
       async {use! pageLimit = pageRequestGate.AcquireAsync() 
       let! msg = inbox.Receive()                           
       let! _ = 
       async{
       match msg with
         | CallMsg (uri, parentUri, market, replyChannel) -> 
         let urlList = runPipeline (pageLimit : IDisposable) (uri : string) (parentUri : string) (market : MarketType)
        try
        replyChannel.Reply(urlList)
        with
        | _ as e -> Console.WriteLine e.Message
        Console.WriteLine ("Call with Reply: " + uri)
        | CallMsgNoReply (uri, parentUri, market) -> 
        runPipeline (pageLimit : IDisposable) (uri : string) (parentUri : string) (market : MarketType)
        |> ignore
        Console.WriteLine ("Call without Reply: " + uri) 
        | _ -> failwith "Invalid Message"
        return! waitForMsg()
      } |> Async.StartChild
       return! waitForMsg() 
      }
      waitForMsg ())
        member x.ProcessPageWithReply (uri : string) (parentUri : string) (market : MarketType) =
       runPipe <-> curry4 CallMsg uri parentUri market
       member x.ProcessPageWithoutReply (uri : string) (parentUri : string) (market : MarketType) =
       runPipe <-- curry3 CallMsgNoReply uri parentUri market

The calling code looks like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 

let documentPipelineHandler = new PipelineHandler (XYZZYAsyncPipelineManager)

let rec processDocTree (uri : string) (parentUri : string) (market : MarketType) =
    if (uri.ToLower().EndsWith(".doc")) then
        documentPipelineHandler.ProcessPageWithoutReply uri parentUri market 
         |> ignore
    else
        let resList = documentPipelineHandler.ProcessPageWithReply uri parentUri market
        match resList with
                | Some(rList) ->
                    for res in rList do
                        match res with
                            | Page (x, y, z) -> 
                                processDocTree x y z
                            | EmptyPage -> ()
                | None -> ()

Thanks,

--Don

By on 1/12/2010 12:31 PM ()
IntelliFactory Offices Copyright (c) 2011-2012 IntelliFactory. All rights reserved.
Home | Products | Consulting | Trainings | Blogs | Jobs | Contact Us | Terms of Use | Privacy Policy | Cookie Policy
Built with WebSharper