node.jsで使える非同期コントロールフローライブラリ nue 0.4.0リリース

0.4.0をリリースしました。npm install nue でインストールできます。

CHANGELOG
  • 新機能 - nue.parallelでstep関数を並列実行できるようになりました。
  • 新機能 - step関数の中で、this.forEach関数を使って配列の要素ごとに関数を並列実行できるようになりました。
  • 新機能 - step関数の中で、this.exec関数を使って関数を非同期実行できるようになりました。
  • 新機能 - step関数の中で、this.history配列から実行済みのstep関数の情報(flow名、step名、flow内におけるindex)を取得できるようになりました(これは実験的な機能です)。


今回は、新機能満載でかなりパワーアップしました。まあ、その分コード数も増えてしまって、379行になっちゃいましたけど。一時期、800行あったからそれに比べればだいぶましなんですが、これ以上は増やしたくないところです。

さて、機能の解説です。

nue.parallel

これは、id:koichikさんのflowlessのpar関数に触発されて作ったものです。このエントリに載っている例と同様のことが、nueでは次のように実現できるようになりました。

var flow = require('nue').flow;
var parallel = require('nue').parallel;

var myFlow = flow('main')(
  function one() { this.next(); },
  function two() { this.next(); },
  parallel('par1')(                        // この辺に注目!
    flow('par1-1')(
      function three() { this.next(); },
      function four() { this.next(); }
    ),
    flow('par1-2')(
      function five() { this.next(); },
      function six() { this.next(); }
    )
  ),
  function seven() { this.next(); },
  function eight() { this.next(); },
  function allDone() {
    if (this.err) throw this.err;
    console.log(this.history);
    this.next();
  }
);

myFlow();


one-twoと直列に実行してparallelで分岐し、'par1-1'と`par1-2`のflowが並列で走ります。'par1-1'と`par1-2`のflowの中ではそれぞれ、three-four、five-sixが直列で実行されます。'par1-1'と`par1-2`のflowの両方が終了すると、sever-eight-allDoneが直列で実行されます。


allDown関数の中でthis.historyをコンソールに出力していますが、this.historyには実行履歴が入っています。ある関数が呼び出されたかどうかをチェックするためにいろんなところにconsole.logを埋め込むことに疲れたので、自動でトレースすることにしました。this.historyは一応実験的な機能です(保持する情報の種類や量についてはどうしようか悩み中)。出力結果は次のようになります。

[ { flowName: 'main', stepName: 'one', stepIndex: 0 },
  { flowName: 'main', stepName: 'two', stepIndex: 1 },
  { flowName: 'main', stepName: 'par1', stepIndex: 2 },
  { flowName: 'par1', stepName: 'par1-1', stepIndex: 0 },
  { flowName: 'par1-1', stepName: 'three', stepIndex: 0 },
  { flowName: 'par1-1', stepName: 'four', stepIndex: 1 },
  { flowName: 'par1', stepName: 'par1-1_end', stepIndex: 1 },
  { flowName: 'par1', stepName: 'par1-2', stepIndex: 0 },
  { flowName: 'par1-2', stepName: 'five', stepIndex: 0 },
  { flowName: 'par1-2', stepName: 'six', stepIndex: 1 },
  { flowName: 'par1', stepName: 'par1-2_end', stepIndex: 1 },
  { flowName: 'main', stepName: 'seven', stepIndex: 3 },
  { flowName: 'main', stepName: 'eight', stepIndex: 4 },
  { flowName: 'main', stepName: 'allDone', stepIndex: 5 } ]


並列に実行されたかどうかは、この出力内容からはわからないのですが、すべてのstepが実行されたことがわかります(並びは実行順、より正確には実行が開始された順です)。並列に実行されたかどうかは、threeの関数の中で実行をsetTimeoutなどを使って遅延させることで確認できます。threeの関数のみを次のように書き換えます。

      function three() { setTimeout(this.async(), 10); },


これで再度実行すると、

[ { flowName: 'main', stepName: 'one', stepIndex: 0 },
  { flowName: 'main', stepName: 'two', stepIndex: 1 },
  { flowName: 'main', stepName: 'par1', stepIndex: 2 },
  { flowName: 'par1', stepName: 'par1-1', stepIndex: 0 },
  { flowName: 'par1-1', stepName: 'three', stepIndex: 0 },
  { flowName: 'par1', stepName: 'par1-2', stepIndex: 0 },
  { flowName: 'par1-2', stepName: 'five', stepIndex: 0 },
  { flowName: 'par1-2', stepName: 'six', stepIndex: 1 },
  { flowName: 'par1', stepName: 'par1-2_end', stepIndex: 1 },
  { flowName: 'par1-1', stepName: 'four', stepIndex: 1 },
  { flowName: 'par1', stepName: 'par1-1_end', stepIndex: 1 },
  { flowName: 'main', stepName: 'seven', stepIndex: 3 },
  { flowName: 'main', stepName: 'eight', stepIndex: 4 },
  { flowName: 'main', stepName: 'allDone', stepIndex: 5 } ]


fourの実行が、fiveやsixよりも遅くなっていることからthree-fourのflowとfive-sixのflowは並列で実行されていることがわかります。

this.forEach

これもflowlessのmap/runMapに触発されました。配列の要素をループで処理しますが、全部を一度に処理するのではなく、小分けして、process.nextTickを挟みながら処理します。

var flow = require('nue').flow;
var fs = require('fs');

var myFlow = flow('myFlow')(
  function readFiles() {
    var files = ['file1', 'file2', 'file3', 'file4', 'file5'];
    this.forEach(files, function (file) {                     // この辺に注目!
      fs.readFile(file, 'utf8', this.async());
    });
  },
  function end() {
    if (this.err) throw this.err;
    console.log(this.args.join(''));
    this.next();
  }
);

myFlow();


デフォルトでは10要素単位にprocess.nextTickを挟みますが、this.forEach関数にconcurrencyのサイズを設定できます。次の例では3を設定しています。

    this.forEach(3)(files, function (file) {
      fs.readFile(file, 'utf8', this.async());
    });


これで、大きな配列も怖くない(ブロックしない)!


ちなみに、forEachへのfunctionのパラメータはECMAScript 5のArrayのforEachと同じです。2番目のパラメータが配列の要素のindex、3番目のパラメータがオリジナルの配列です。

    this.forEach(3)(files, function (file, index, originalFiles) {
      fs.readFile(file, 'utf8', this.async());
    });

this.exec

step関数の中から別のflow(や関数)を非同期で呼び出します。

var flow = require('nue').flow;
var fs = require('fs');

var subFlow = flow('subFlow')(
  function readFile(file) {
    fs.readFile(file, 'utf8', this.async());
  }
);

var mainFlow = flow('mainFlow')(
  function start() {
    this.exec(subFlow, 'file1', this.async()); // この辺に注目!
    this.exec(subFlow, 'file2', this.async()); // この辺に注目!
    this.exec(subFlow, 'file3', this.async()); // この辺に注目!
  },
  function end(data1, data2, data3) {
    if (this.err) throw this.err;
    console.log(data1 + data2 + data3);
    console.log('done');
    this.next();
  }
);

mainFlow();


これまでも、flow(sub-flow1, sub-flow2)() とflowを入れ子にすれば直列的に組み合わせできたのですが、任意のstep関数の中から呼び出すのは簡単ではありませんでした。


実は、nueは、並列処理の方法を2種類持っています。粗粒度な方法と細粒度な方法と便宜的に呼ぶことにします。祖粒度は方法とはnue.parallelを使う方法です。nue.parallelは、大局的?な観点から「処理の流れを並列化」する場合に使います。言い換えるとモジュール(関数)の「構造」に焦点を当てます。もう一つの細粒度の方法とは、上述の例のようにあるstep関数の中でthis.exec(…, this.async())したり、fs.readFile(…, this.async())をしたりと「(非同期)APIの実行を並列化」する場合です。this.forEachの中でthis.async()を呼ぶのも後者に分類されます。こちらはモジュール(関数)の「呼び出し方」に焦点を当てます。


この2つの並列化の方法を持つのは、nueの強みなんじゃないかなーと思います。モジュールの「構造」と「呼び出し方」の両面に並列化の要素があり(静的と動的と言ってもいいかも)、その両方に別々の方法でアプローチできます。もっと簡単に言えば、並列処理の観点からモジュールを定義しやすい/呼び出しやすいと言えるんじゃないかと思います。