Task Parallel Libraryで並列処理とか待ち合わせとか

ジョブを並列処理するプログラムを書こうかなと思って.NET4のTPL(とTask Parallel Library)を調査中。TaskCreationOptions.AttachedToParentとTaskContinuationOptions.AttachedToParentを使うと結構簡単に書ける気がして試してみました。

次のプログラムは、XMLをパースして処理をシーケンシャルに実行したり並列に実行したりするサンプルプログラムです。Taskの戻り値はintにしていますが特に使っていません。あと例外処理とかは何にもしてません。F#で書いてみました。

open System
open System.Threading
open System.Threading.Tasks;
open System.Xml.Linq

let xname = XName.Get

let createAttachedTask (funs: (unit -> int) list) =
match funs with
| [] ->
new
Task<int>(fun _ -> 0)
| h :: t ->
let
root = new Task<int>(h, TaskCreationOptions.AttachedToParent)
t |> List.fold (
fun (antecedent:Task<int>) f ->
let
continueFunc = Func<Task<int>, int>(fun _ -> f())
let options = TaskContinuationOptions.AttachedToParent
antecedent.ContinueWith<int>(continueFunc, options)) root |> ignore
root

let createJobFun (jobElement:XElement) =
fun () ->
Console.WriteLine("job task. id={0}, threadId={1}", jobElement.Attribute(xname "id"), Thread.CurrentThread.ManagedThreadId)
let random = Random()
Thread.Sleep(random.Next(500))
0

let rec createForkFun (forkElement:XElement) =
fun () ->
Console.WriteLine("fork task. id={0}, threadId={1}", forkElement.Attribute(xname "id"), Thread.CurrentThread.ManagedThreadId)
forkElement.Elements()
|> Seq.choose (fun element ->
if
element.Name = (xname "parallel") then
Some (createParallelFun element)
else
None )
|> Seq.iter (fun f ->
Task<int>.Factory.StartNew(Func<int>(fun () -> f()), TaskCreationOptions.AttachedToParent) |> ignore)
0

and createParallelFun (parallelElement:XElement) =
fun () ->
Console.WriteLine("parallel task. id={0}, threadId={1}", parallelElement.Attribute(xname "id"), Thread.CurrentThread.ManagedThreadId)
let funs =
parallelElement.Elements()
|> Seq.choose (fun element ->
if
element.Name = (xname "job") then
Some (createJobFun element)
elif element.Name = (xname "fork") then
Some (createForkFun element)
else
None )
|> Seq.toList
let task = createAttachedTask(funs)
task.Start()
0

let createJobNetFun (jobNetElement:XElement) =
fun () ->
Console.WriteLine("jobnet task. threadId={0}", Thread.CurrentThread.ManagedThreadId);
let funs =
jobNetElement.Elements()
|> Seq.choose (fun element ->
if
element.Name = (xname "job") then
Some (createJobFun element)
elif element.Name = (xname "fork") then
Some (createForkFun element)
else
None )
|> Seq.toList
let task = createAttachedTask(funs)
task.Start()
0

let main =
let jobNetElement = XElement.Parse "
<jobNet>
<job id='1'/>
<job id='2'/>
<fork id='3'>
<parallel id='3-1'>
<job id='3-1-1'/>
<job id='3-1-2'/>
</parallel>
<parallel id='3-2'>
<job id='3-2-1'/>
</parallel>
</fork>
<job id='4'/>
</jobNet>
"
Console.WriteLine("main. threadId={0}", Thread.CurrentThread.ManagedThreadId)
let jobNetTask = new Task<int>(fun () -> createJobNetFun(jobNetElement)())
jobNetTask.RunSynchronously()

Console.ReadKey()


これを実行するとコンソールに次のように出力されます。

main. threadId=1
jobnet task. threadId=1
job task. id=id="1", threadId=3
job task. id=id="2", threadId=4
fork task. id=id="3", threadId=3
parallel task. id=id="3-1", threadId=4
parallel task. id=id="3-2", threadId=3
job task. id=id="3-1-1", threadId=4
job task. id=id="3-2-1", threadId=3
job task. id=id="3-1-2", threadId=4
job task. id=id="4", threadId=3

XMLの各要素に対応するタスクを動作させているのですが、parallel要素は並列に、その他はシーケンシャルに動作させるというのが意図です。ちゃんと動いているようです。


Pro .NET 4 Parallel Programming in C# (Expert's Voice in .NET)

Pro .NET 4 Parallel Programming in C# (Expert's Voice in .NET)

自分は今この本読んでいますが、わかりやすくてお勧めです。英語は簡単な単語が多く、サンプルコードが多めです。