node.jsで使える非同期コントロールフローライブラリ nue 0.5.0 リリース

0.5.0をリリースしました。npm install nue でインストールできます。

CHANGELOG
  • 新機能 - nue.as関数を導入し、非同期コールバックの引数を次の関数の引数にマッピングできるようにしました。
  • 変更 - step関数のコンテキストにおいて、this.asyncが引数のマッピング定義を受け取り、非同期コールバックの引数を次の関数へ渡せるようにしました。
  • 変更 - step関数のコンテキストにおいて、this.forEach関数を削除し、this.asyncEach関数を追加しました。(単なるリネームではなくシグネチャや仕様が変わっています)
  • 変更 - step関数のコンテキストにおいて、this.argsプロパティを削除しました(Array.prototype.slice.call(arguments)を使ってください)。


コード行数はついに450行。

nue.as関数と引数のマッピング定義

引数のマッピング定義とは、コールバックの引数の何番目を次の関数に渡すかを示すものです。この何番目かを指定するものがas関数です。
たとえば、fs.readFileのコールバックの引数はerrとdataの2つです(fs.readFile)。この場合、errのindexは0、dataのindexは1です。したがって、dataを次の関数に渡したい場合は次のように呼び出します。

this.async(as(1))


もっと具体的な使用例を見てみましょう。2つのファイルを非同期に読んで連結して表示します。ちなみに、file1のデータはFILE1、file2のデータはFILE2というテキストです。

var flow = require('nue').flow;
var as = require('nue').as;
var fs = require('fs');

var myFlow = flow('myFlow')(
  function readFiles(file1, file2) {
    fs.readFile(file1, 'utf8', this.async(as(1))); // ここに注目!
    fs.readFile(file2, 'utf8', this.async(as(1)));
  },
  function end(data1, data2) {
    if (this.err) throw this.err;
    console.log(data1 + data2); // FILE1FILE2
    console.log('done');
    this.next();
  }
);

myFlow('file1', 'file2');


マッピング定義をオブジェクトにすることで、非同期コールバックで受け取る引数に名前をつけたり、任意の値を渡したりできます。たとえば、上記の例を少し変更して、ファイル名とファイルの内容をまとめて次の関数へ渡せます。

var flow = require('nue').flow;
var as = require('nue').as;
var fs = require('fs');

var myFlow = flow('myFlow')(
  function readFiles(file1, file2) {
    fs.readFile(file1, 'utf8', this.async({name: file1, data: as(1)})); // ここに注目
    fs.readFile(file2, 'utf8', this.async({name: file2, data: as(1)}));
  },
  function end(f1, f2) {
    if (this.err) throw this.err;
    console.log(f1.name + ' and ' + f2.name + ' have been read.'); // file1 and file2 have been read.
    console.log(f1.data + f2.data); // FILE1FILE2
    console.log('done');
    this.next();
  }
);

myFlow('file1', 'file2');


当然、非同期コールバックが複数の引数を持つ場合(たとえばchild_process.exec)、マッピングできる引数の数には制限はありません。つまり次のようことは可能です。

this.async({stdout: as(1), stderr: as(2)})


nueは、コールバックの第1引数はエラーを表すオブジェクトだと想定し、エラーかどうかのチェックを行います。しかし、コールバックすべての第1引数がエラーを表すわけではないので、マッピング定義でas(0)が指定された場合は、第1引数はエラーを表すオブジェクトではないと判断し、エラーチェックをスキップします。


ところで、なんで名前が as なのかですが、これは前置詞のasです。at か of か map(これは前置詞じゃないけど) か迷ったんですが、どれもピンとこない(自分の英語力では)。とにかく、2文字程度の短い単語でこの文脈に即していれば何でもいいのですが、とりあえず as にしています。ArgumenS の as なんだといういい訳もあったりします。。。よい名前募集中です!

this.asyncEach 関数

以前のバージョンのthis.forEachでは、forEachの中で呼び出した非同期呼び出しの数と次の関数への引数の数が対応していました。要するに、10回ループしてループごとに非同期呼び出しをした場合、次の関数への引数は10個になっていました。そして、そのようなforEachが2つ存在する場合、引数の数は20個になりますが、次の関数では、10個ずつグループ化して受け取る方法がありませんでした。

そこで、次のように記述することで、asyncEachの中で実行された非同期呼び出しの結果をグループ化できるようにしました。非同期コールバックの引数のマッピング方法はthis.asyncと同じです。

var flow = require('nue').flow;
var as = require('nue').as;
var fs = require('fs');

var myFlow = flow('myFlow')(
  function readFiles(files) {
    this.asyncEach(files, function (file, group) {
      fs.readFile(file, 'utf8', group.async({name: file, data: as(1)})); // ここに注目
    });
  },
  function end(files) { // 配列で受け取る
    if (this.err) throw this.err;
    var names = files.map(function (f) { return f.name; });
    var contents = files.map(function (f) { return f.data});
    console.log(names.join(' and ') + ' have been read.'); // file1 and file2 have been read.
    console.log(contents.join('')); // FILE1FILE2
    this.next();
  }
);

myFlow(['file1', 'file2']);

node-seqのサンプルの書き換え 再び

http://d.hatena.ne.jp/taedium/20120223/p2 で行ったnode-seqのサンプルと比較ですが、nue 0.5.0版ではどうなるか試してみました(前回比較時のnueのバージョンは0.2.0)。違いがこまかすぎて伝わらない気がしますが、ずいぶん自然に(きれいに)書けるようになりました。

stat_all.js
var flow = require('nue').flow;
var as = require('nue').as;
var fs = require('fs');

flow('stat_all')(
  function readdir() {
    fs.readdir(__dirname, this.async(as(1)));
  },
  function stat(files) {
    this.asyncEach(files, function (file, group) {
      fs.stat(__dirname + '/' + file, group.async({name: file, stats: as(1)}))
    });
  },
  function end(files) {
    if (this.err) throw this.err;
    var sizes = files.reduce(function (sizes, file) {
      sizes[file.name] = file.stats.size;
      return sizes;
    }, {});
    console.dir(sizes);
    this.next();
  }
)();
parseq.js
var flow = require('nue').flow;
var as = require('nue').as;
var fs = require('fs');
var exec = require('child_process').exec;

flow('parseq')(
  function whoami() {
    exec('whoami', this.async(as(1)));
  },
  function parallel(who) {
    exec('groups ' + who, this.async(as(1)));
    fs.readFile(__filename, 'ascii', this.async(as(1)));
  },
  function end(groups, src) {
    if (this.err) throw this.err;
    console.log('Groups: ' + groups.trim());
    console.log('This file has ' + src.length + ' bytes');
    this.next();
  }
)();
join.js
var flow = require('nue').flow;
var as = require('nue').as;

flow('join')(
  function parallel() {
    setTimeout(this.async({val: 'a'}), 300);
    setTimeout(this.async({val: 'b'}), 200);
    setTimeout(this.async({val: 'c'}), 100);
  },
  function end(a, b, c) {
    if (this.err) throw this.err;
    console.dir([ a.val, b.val, c.val ]);
    this.next();
  }
)();