I don't entirely follow what you are trying to compute on the data. Would you be able to post the code for the sequential version of your algorithm and also why you want to parallelize this code? In general it doesn't make much sense to ask "how do I parallelize this?" without giving some understanding of where the computational and data bottlenecks are for your specific application. For example in dmp's solution: whether this is an acceptable solution or not depends in part on what are the extrinsic characteristics of s. If s is a terabyte data source being streamed from a probe on Mars then this solution fails on the first line:

1
Seq.length s

.

By on 9/25/2008 8:31 AM ()

The sequential solution is as easy as:

1
2
let z = Seq.map2 f x y 

I like your Mars probe metaphor. Yes, x and y are infinite sequences and dmp's solution would get stuck on the first line

1
Seq.length s

.

What I'm looking for is something that would look like this:

1
2
let z = Seq.parallelMap2 f x y

and the way I envisioned this working is the following.

The function would internally extract chunks of data out of the sequences and send it off to workers for processing (chunk size defined in config). As soon as workers finish their job it would come back to the function and be ready to be returned as the next movenext of the enumerator.

Cheers,

E.

By on 9/25/2008 11:19 AM ()

Ok. I don't think there is a way to do this with F# async workflows without implementing all the parallel look-ahead evaluation stuff yourself.

However, you should be able to easily implement this function using PLINQ, although I have not tried.

1
let parallelMap2 f x y = ParallelEnumerable.Select(Seq.map2 (fun x y -> x,y) x y, fun (x,y) -> f x y)
By on 9/25/2008 12:56 PM ()

hi,

and how would you do a simple List.map and List.fold_left pattern with PLINQ (in F#)?

Maybe if i want to square each list item and computing an average of the squares.

Regards Steffen

By on 10/23/2008 3:43 AM ()

Ok, I found it:

1
2
3
4
5
let sq x = x*x
let (data:IParallelEnumerable<int>) = [1..100].AsParallel()
let avg =
  ParallelEnumerable.Select(data, sq)
   |> ParallelEnumerable.Average
By on 10/23/2008 6:05 AM ()

I am confused.
Do you want to compute the sum of an infinite sequence of vectors, compute the sum of a fixed number of "infinite vectors", or compute the sum of an infinite sequence of infinite vectors?
In other words, is the type of your input a sequence of tuples, a tuple of sequences, or a sequence of sequences?

By on 9/25/2008 12:17 PM ()

Here is a simple replacement for Seq.map that does the same thing, but in parallel:

1
2
3
4
5
6
7
8
let map_parallel func items =
      let tasks =
              seq {
                      for i in items -> async {
                              return (func i)
                      }
              }
      Async.Run (Async.Parallel tasks)

Hope this helps.

By on 9/25/2008 3:53 AM ()

I don't think that works because it would give me an async task for every seq.

What I need is something that taken:

1
2
[ [1;2;3;4] ; [5;6;7;8] ]

would split the work for two cores into two async tasks like:

1
2
Task A [ [1;2] ; [5;6] ]

and

1
2
Task B [ [3;4] ; [7;8] ]

and then joining the result back to:

1
2
Result [6;8;10;12] 

Any ideas on an elegant solution for this?

Cheers,

E.

By on 9/25/2008 6:20 AM ()

This code divides each inner sequence among the processors using Async.Generate. Each sequences is iterated and added to an accumulated sum. The final sums are then returned as a sequence.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
let s = seq [seq [1;2;3;4]; seq [5;6;7;8];]
let sums s = 
    let outerLen = Seq.length s
    let innerLen = Seq.length (Seq.hd s)
    let accums = Array.init (Seq.length (Seq.hd s)) (fun i -> ref 0)
    let asyncGen seqNum =
        async {
            do (Seq.nth seqNum s) |> Seq.iteri (fun i v -> System.Threading.Interlocked.Add(accums.[ i ], v) |> ignore) 
        }     
    Async.Run(Async.Generate(outerLen, asyncGen)) |> ignore
    Seq.init_finite accums.Length (fun i -> !accums.[ i ])

> sums s;;
val it : seq<int> = seq [6; 8; 10; 12]
By on 9/25/2008 7:52 AM ()

What your trying to doing is actually quite tricky the problem being that two separate threads can't contribute to one collection which makes aggregating the results difficult.

There's a couple of approach you can take (sorry don't have time to write examples ATM):

Create a sequence of workflows to execute your tasks for each sequence

Run them in Parallel

Aggregate the results

-or-

Create separate workflows for each sequence

As each result is calculated pass it to a mailbox workflow to handle the results.

The first approach has the advantage of preserving order which could be important.

Cheers,

Rob

By on 9/25/2008 7:33 AM ()
IntelliFactory Offices Copyright (c) 2011-2012 IntelliFactory. All rights reserved.
Home | Products | Consulting | Trainings | Blogs | Jobs | Contact Us | Terms of Use | Privacy Policy | Cookie Policy
Built with WebSharper