「RxJS」初心者入門 – JavaScriptの非同期処理の常識を変えるライブラリ


「RxJS」初心者入門 – JavaScriptの非同期処理の常識を変えるライブラリ

こんにちは、王です。

みなさん、「RxJS」をご存知でしょうか?

すごく大雑把に言うと、RxJSとは非同期処理(マウスクリックなどのイベントベースの処理も含める)をするときに超絶便利なライブラリです!

イメージしやすいように「便利」とは言いましたが、決して「便利」という言葉ではおさまらないくらいのインパクトがあると思います。ちょっとした「イノベーション」に近い感覚です。

今回は、RxJSを全く知らない初心者でもとっつきやすいように説明してみたいと思います。少し記事が長くなりますが、最後までお読みいただければ幸いです!

目次

ReactiveXについて

「ReactiveX (Reactive Extensions)」(以下「RX」)というC#発のプロジェクトがありますが、RxJSはそのJS版に当たります。2009年に始動したプロジェクトで、なんと開発元はMicrosoft! これだけで、このライブラリの堅牢性と信頼性の高さがうかがえます。

今では、「Java」「JavaScript」「Scala」「C++」「Ruby」「Python」「Groovy」「JRuby」「Kotlin」など、さまざまな言語の移植版が開発されています。

非同期処理の問題点

非同期処理あるいはイベントベースのプログラムは、実行時間が保証されないため、大量に用いるとプログラムが錯綜してくる場合が多いです。
さらに依存関係にある非同期処理の場合、ネストが深まり、いわゆる「コールバック地獄」になりがちでコードの可読性が著しく下がってしまいます。何がどうなっているのかわからなくなってしまうだけでなく、エラーハンドリングもしづらくなります

とは言え、非同期処理を必要とする時代の風向きは依然として変わる気配がないようです……。むしろどんどん重要性が増しているように思えます。

そんな厄介な非同期処理をどうにかすっきりさせようと、「Async.js」のようなライブラリが出てきたり、「Promises/A+」という仕様が考案されたり(すでにネイティブでもサポートされました)で、非同期処理の諸問題がある程度改善されてきました!

数多くある非同期処理を援助するライブラリの中でも、RxJSは独自の思想を持った異色の存在です。他のライブラリと比べ物にならないほど柔軟に扱うことができて、昨今もてはやされているReactive-Functional-Programmingとも親和性が高く、これからの時代にマッチしたライブラリと言えるでしょう。

RXと「Observerパターン」

公式サイトでは以下のように記述されています。

ReactiveXは「Observer パターン」、「Iterator パターン」、「関数型プログラミング」のアイディアをふんだんに取り入れたライブラリです。

RXの根底には「Observerパターン」が根付いています。RXはObserverパターンの考え方を取り入れつつ、イベントやデータをストリームとして扱えるような仕組みを導入することによって、プログラミングの柔軟性と可読性を飛躍的に上昇させました。

さすがに冒頭でこんな抽象的なことを言われてもピンとこないと思いますが、今はあまり気にする必要はありません。とりあえず、このまま読み進めてみてください。説明や実例を見ながら、徐々に全体像が見えてくると思います。そうなってからまた読み返していただければと思います。

そもそも「Observerパターン」と「Iteratorパターン」を知らないのであれば、以下の記事のご一読をおすすめします。

ウォーミングアップ

いきなり核の部分に触れる前に、まずはRXの世界観を少しだけ体感していただきたいです。ということで、手を動かしてみよう!

イベントを配列のように操る?

まずは下記のコードを見てください。

[1,2,3,4,5,6,7,8].filter(function(num){
    return num%2;
});

// => [1, 3, 5, 7]

これは、配列のfilterメソッドを使って1から8までの奇数のみを選び出した例です。簡単ですよね?

では、RxJSを使って同じ操作を配列の代わりに「イベント」に対してやってみます。

まずは下のデモをいじってみてください。
ボタンをクリックしても何も出てきませんが、「alt」キーを押しながらクリックするとコンソールのほうにログが吐き出されるようになっています。

JS Bin

コードはこちらです↓

var btnClicks = Rx.Observable.fromEvent($('#btn'), "click");

btnClicks
    .filter(function (value) {
        return value.altKey;
    })
    .subscribe(function () {
        console.log('Altキーを押しながらクリックしたね!');
    });

配列と同じ名前のfilter()メソッドを使っているのが、なんとなくわかると思います。

では、「イベントは配列に似ているよね!」と言ったら、どう思いますか?
もしかしたら「えっ?全く別ものなのでは?」と思うかもしれません。

「イベント」という言葉だけにフォーカスしてしまうと、確かに配列とは別物です。ところが、違う視点から見てみるとイベントとは時間軸上に点在する離散的な「点」ですよね。タイムライン上にある「個々のイベントの集合体」が配列に似ているということです。

events-timeline

「うん、言われてみれば確かにそうですね、だから何が嬉しいの?

一言で言うと「配列のようにイベントを操作することができる」のが嬉しいですね。
配列には filter()map()forEach()reduce()find() など、配列の中のデータを加工するメソッドがたくさんありますよね? それらはすべて「イベント」に適用できるところが大きな可能性を秘めているんです。

さっきの例でやったことは、ボタンの「クリックイベントの配列」に対して、filter()メソッドを適用し、その結果、クリック時に「altが押されているイベント」だけが残った新しい「イベントの配列」が生成されます。
そして出来上がった新しい「イベントの配列」を一個ずつ取り出すためには、subscribe(購読)を行う必要がありますので、subscribeメソッドを使い、値を出力しました。

ここまで説明したら、filter()の使い方がわかる方なら、もう十分理解できたのではないかと思います。ですが、もっと直観的に理解してもらえるように説明用のプログラムを作りました。ポチポチしてみてください。

JS Bin

大まかなプロセスは以下の通り。

イベントが発生 —> filter()を通り —> テストに合格したイベントでできた新しいイベントの配列の誕生 —> 購読者に通知する

Stream(ストリーム)

ここまで「イベントの配列」と連発してきましたが、厳密には「配列」ではないのは明白です。わかりやすいように、あえて「イベントの配列」という言い方をしてきました。

実は、ここでいう「イベントの配列」にはもっとかっこいい名前があります。それが「Stream(ストリーム)」です。

「Stream」とは「水の流れ・小川」という意味ですから、「タイムライン」という「川」が流れていて、時折ドンブラコ〜ドンブラコ〜と「イベント(データ・値)」が流れてきます〜といったイメージです。

まさにストリームです!

stream

Rxでは、しばしばObservable/Observable-Sequence(後述)と呼ぶことがありますが、やはり「ストリーム」のほうがわかりやすいので、以降「ストリーム」と記述させていただきます。

Rxではストリームを作成するメソッドはたくさん用意されています。

  • Create — Observerの「OnNext/onError/onCompleted」メソッドを用いて、一からプログラミング的にObservableを作成。
  • Defer — Observerが購読する時点で動的にObservableを生成。
  • Empty/Never/Throw — 限定された挙動を持つObservableを作成。
  • From — 配列などの他のデータ型をObservableに変換する。
  • Interval — 指定された時間間隔で値を発行するObservableを作成。
  • Just — 引数を1つだけ取って、取った引数を唯一の値にするObservableを作成。
  • Range — 一連の整数を生成するObservable。
  • Repeat — ある値あるいは一連の値を繰り返し発行するObservable。
  • Start — 関数の戻り値を値とするObservableを作成。
  • Timer — 指定した遅延時間後に値を生成する。周期の指定がある場合は一定周期で値を発行する。

ストリームを操作する「オペレーター」

まだ「filter」しか触れてませんが、「map」「reduce」「merge」「concat」「zip」など、RxJSには他にもこういったストリームを操作するメソッドがたくさん用意されています。こういったストリームに対して何らかの操作を行うメソッドのことを「オペレーター」と言います。

ストリーム(Observable)を作って「オペレーター」をどんどんかけて、出来上がったストリームを購読(subscribe)する、というのが通常の使い方の流れです。
例えば、先ほどの例ですと、Rx.Observable.fromEvent()の戻り値がObservableです。このObservableに対してオペレーター(filter)をかけて、最後にsubscribeで値を受け取るようにしています。

詳しくは、以下のオペレーターのカテゴリ別レファレンスをご参考ください。

Operators by Categories:

オペレーションチェーン

配列ではこんな感じで「メソッドチェーン」ができて、簡潔でわかりやすい記述が可能です。

[1,2,3,4,5,6,7,8]
.filter(function(num){
    return num%2;
})
.map(function(num){
    return num*num;
});

// => [1, 9, 25, 49]

もちろんRxJSでもできます! 以下のように。

Rx.Observable.from([1, 2, 3, 4, 5, 6, 7, 8]) // 配列をストリーム(Observable)に変換
    .filter(function (num) { //戻り値:Observable
        return num % 2;
    }).map(function (num) { // 戻り値:Observable
        return num * num;
    }).forEach(function (num) {  // `forEach`は`subscribe`のエイリアスです。戻り値:Diposable
        return console.log(num);
    });
    
// => 1
// => 9
// => 25
// => 49

もちろん、配列ではないので、1個ずつ順番に値が出力されます。

下記は図解です。

array_filter_map

最後の「」はストリームの「終点」を表しています。
このようなグラフを「Marble-Diagrams」と呼びます。オペーレーターを理解するのにとても役に立ちます。

このように、オペレーターの戻り値が「Observable」になっていますので、チェーンすることができます。

では、「delayWithSelector」オペーレーターを使って、それぞれの値を500ミリ秒ごとに吐き出すように変えましょう!

// `subscribe()`に渡すためのobserverを作成する。
var observer = Rx.Observer.create(function (num) {
    // 新しい値がプッシュされたときに
    return console.log("onNext: " + num);
}, function (error) {
    // エラーが起きたときに
    return console.log("onError: " + error);
}, function () {
    // 全ての値をプッシュし、ストリームが終了したときに
    return console.log('onCompleted');
});

Rx.Observable.from([1, 2, 3, 4, 5, 6, 7, 8])
    // 一要素ごとに500ミリ秒ずらす
    .delayWithSelector(function (num) {
        return Rx.Observable.timer(num * 500);
    }).filter(function (num) {
        return num % 2;
    }).map(function (num) {
        return num * num;
    }).subscribe(observer);
    

// => onNext: 1
// => onNext: 9
// => onNext: 25
// => onNext: 49
// => onCompleted

JS Bin

今までは説明しませんでしたが、subscribe()には2種類の引数の渡し方があります。

1つ目は直接関数オブジェクトを渡す。

var onNext = function(){}; // プッシュ時のコールバック
var onError = function(){}; // エラー時のコールバック
var onCompleted = function(){}; // 完了時のコールバック

o.subscribe( onNext, onError, onCompleted );

2つ目はobserverオブジェクトを渡す(上のサンプルはこちらのほうです)。

o.subscribe(observer);

コールバックは3つありますが、いずれも省略できます。1つ目の渡し方は内部的にはやはりobserverオブジェクトを作っています。

ちなみに、コールバックの名前が「onNext」になっているのはイテレータパターンの名残です(Observableは一種のコレクションですから)。

ObserverとObservable

そろそろObserverとObservableの関係性が見えてきたのではないでしょうか?
さらに理解を深めるために、実際にObservableとObserverを作ってみましょう!

下記のサンプルコードでは、Rx.Observable.create() を使って、Observableオブジェクトを生成し、source変数に格納します。create() に渡したコールバック関数には observer が渡され、関数内で observer.onNext() を使って、observer に対して値を発行するようになっています。
そして、戻り値が関数になっていて、この関数の中には、のちのち不要になったObservableを「破棄する」ときに行いたい処理を記述し、subscription(Diposable型)の dispose() が呼ばれたときに実行されます。

実行すると、500ミリ秒ずつログが出力され、5秒後にログの出力が止まります。

var source = Rx.Observable.create(function (observer) {
    // `onNext`を使って、`num`を500ミリ秒ずつobserverにプッシュする
    var num = 0;
    var id = setInterval(function () {
        observer.onNext(num++);
    }, 500);

    setTimeout(function () {
        // 10秒後に、「ストリームが完了した」合図を送る
        observer.onCompleted();
    }, 10000);

    // もちろん、尻ぬぐいの手段を提供しないとダメです
    return function () {
        console.log('disposed');
        clearInterval(id);
    };
});

var subscription = source.subscribe(
    function (x) {
        console.log('onNext: ' + x);
    },
    function (e) {
        console.log('onError: ' + e.message);
    },
    function () {
        console.log('onCompleted');
    });

setTimeout(function () {
    subscription.dispose();
}, 5000);

// => onNext: 0
// => onNext: 1
// => onNext: 2
// => onNext: 3
// => onNext: 4
// => onNext: 5
// => onNext: 6
// => onNext: 7
// => onNext: 8
// => disposed

JS Bin

上記サンプルコードでは「onNext」しか使っていませんが、状況に応じて以下の3つのメソッドを使って、「Next/Error/Completed」シグナルを発生させることで購読者に通知を行います。

  • onNext:値を発行する通知
  • onError:エラーが起きた際の通知
  • onCompleted:ストリーミングが完了した通知(止めるまで終わらないストリーミングもあります)

Cold Observable と Hot Observable

実は、Observableには「Cold」「Hot」の2種類があります。
本セクションでは、この2種類のObservableの違いと使い分けについて説明します。

cool_vs_hot

Cold Observable

以下の例では、同じ Observable(source) に対して、時間をずらして2回 subscribe() しています。ログを見てみると、各Observerは新しいシーケンスから値を受け取っていることに気づくはずです。

個々のObserverが「自分だけの」Observableから値を受け取っているのです。

実例

var source = Rx.Observable.interval(1000),
    subscription1 = source.subscribe(
        function (x) {
            console.log('Observer 1: onNext: ' + x);
        }
    ),
    subscription2;

setTimeout(function () {
    subscription2 = source.subscribe(
        function (x) {
            console.log('Observer 2: onNext: ' + x);
        }
    );
}, 2000);

setTimeout(function () {
    subscription1.dispose();
    subscription2.dispose();
}, 5000);

// => Observer 1: onNext: 0
// => Observer 1: onNext: 1
// => Observer 1: onNext: 2
// => Observer 2: onNext: 0
// => Observer 1: onNext: 3
// => Observer 2: onNext: 1

Hot Observable

以下の例では、publish() を使って、Cold Observable(source)をHot Observable(hot)に変換し、時間をずらして2回 subscribe() しています。
ログからわかるように、Cold Observableと違い、各ObserverはHot Observable(hot)から最新の値を受け取ります。そして、connect() を使って、ソースとなるObservable(source)からプッシュしてきた値をさらに自身(hot)が持つobserverたちにプッシュするようにします。

「Hot Observable」は、複数のObserverに対して同じタイミングで同じ値を発行する「1対マルチ」なスタイルのObservableです。このような挙動は後ほどで紹介するSubjectによって実現しています。

実例

// Observableを作成
var source = Rx.Observable.interval(1000);

// Hot Observableに変換
var hot = source.publish();

// この時点ではまだ値はプッシュされない
var subscription1 = hot.subscribe(
    function (x) {
        console.log('Observer 1: onNext: %s', x);
    }
);

console.log('Current Time after 1st subscription: ' + Date.now());

// 3秒後に……
setTimeout(function () {
    // `connect()`を使って、`source`に接続する
    // これで、source から受け取った値がhotのobserverたちにプッシュされるようになる。
    hot.connect();

    console.log('Current Time after connect: ' + Date.now());

    // さらに3秒後に……
    setTimeout(function () {
        console.log('Current Time after 2nd subscription: ' + Date.now());
        var subscription2 = hot.subscribe(
            function (x) {
                console.log('Observer 2: onNext: %s', x);
            }
        );
    }, 3000);
}, 3000);


// => Current Time after 1st subscription: 1425834043641
// => Current Time after connect: 1425834046647
// => Observer 1: onNext: 0
// => Observer 1: onNext: 1
// => Current Time after 2nd subscription: 1425834049649
// => Observer 1: onNext: 2
// => Observer 2: onNext: 2
// => Observer 1: onNext: 3
// => Observer 2: onNext: 3
// => Observer 1: onNext: 4
// => Observer 2: onNext: 4
// => Observer 1: onNext: 5
// => Observer 2: onNext: 5
// => Observer 1: onNext: 6
// => Observer 2: onNext: 6

Subject について

Subjectは、RXを使う上でもう1つの重要なクラスです。Subjectクラスは、ObservableとObserverの両方を継承しています。
つまり、SubjectはObservableでありながら、Observerでもあるということです。

ObserverとObservableの顔を併せ持つSubject

以下の例では、Subjectを1つ作って、このsubjectに対して、subscribeします(Observableの特性)。そして、同じsubjectを使って、自身に登録してあるobserverに対して値をプッシュします(observerの特性)。

そうすることで、「発行(publish)」と「購読(subscribe)」を1つのオブジェクトに集約することができます。

var subject = new Rx.Subject();

var subscription = subject.subscribe(
    function (x) { console.log('onNext: ' + x); },
    function (e) { console.log('onError: ' + e.message); },
    function () { console.log('onCompleted'); });

subject.onNext(1);
// => onNext: 1

subject.onNext(2);
// => onNext: 2

subject.onCompleted();
// => onCompleted

subscription.dispose();

「ブロードキャスタート」としてのSubject

Subjectの用途の1つとして、「ブロードキャスト」が挙げられます。SubjectはObservableと同様に、subscribe()インタフェースを持っています。
ただ、Subjectのsubscribe()は、schedulerを気にしていません。このため、同じObservableに対して、何回もsubscribe()するよりも、subjectに置き換えた方がパフォーマンスがいいということです。

// 1秒間隔で値を発行するObservable
var source = Rx.Observable.interval(1000);

var subject = new Rx.Subject();

// ObserverとしてObservableである`source`に渡す
var subSource = source.subscribe(subject);

// ブロードキャスト先、その1
var subSubject1 = subject.subscribe(
    function (x) { console.log('Value published to observer #1: ' + x); },
    function (e) { console.log('onError: ' + e.message); },
    function () { console.log('onCompleted'); });

// ブロードキャスト先、その2
var subSubject2 = subject.subscribe(
    function (x) { console.log('Value published to observer #2: ' + x); },
    function (e) { console.log('onError: ' + e.message); },
    function () { console.log('onCompleted'); });

setTimeout(function () {
    // 5秒後に終了させる
    subject.onCompleted();
    subSubject1.dispose();
    subSubject2.dispose();
}, 5000);

// => Value published to observer #1: 0
// => Value published to observer #2: 0
// => Value published to observer #1: 1
// => Value published to observer #2: 1
// => Value published to observer #1: 2
// => Value published to observer #2: 2
// => Value published to observer #1: 3
// => Value published to observer #2: 3
// => onCompleted
// => onCompleted

「プロキシ」としてのSubject

source を購読しつつ、自分のニーズで合わせて自ら値を発行することができます。

上記のサンプルコードの最後に下記を追加して確認しましょう。

setTimeout(function () {
    // 2秒後にsubjectの購読者に対して値を発行する
    subject.onNext('from SUBJECT');
}, 2000);

// => Value published to observer #1: 0
// => Value published to observer #2: 0
// => Value published to observer #1: from SUBJECT
// => Value published to observer #2: from SUBJECT
// => Value published to observer #1: 1
// => Value published to observer #2: 1
// => Value published to observer #1: 2
// => Value published to observer #2: 2
// => Value published to observer #1: 3
// => Value published to observer #2: 3
// => onCompleted
// => onCompleted

Scheduler について

Rxには「Scheduler」というクラスがあります。
「いつ購読が始まるか」「いつ値が発行されるか」を決めるために用いられます。普通に利用している分には特に気にする必要はないのですが、一応簡単に説明しておきます。

下記のコードを見てください。

// Observableを作成
var source = Rx.Observable.create(function (observer) {
    console.log('subscribe function');

    var i = 0;
    while (i++ < 3) {
        observer.onNext(i);
    }
    observer.onCompleted();
});

// source = source.subscribeOn(Rx.Scheduler.timeout);
// source = source.observeOn(Rx.Scheduler.timeout);

console.log('in-between');

source.subscribe(
    function (num) {
        console.log('onNext: ' + num);
    },
    null,
    function () {
        console.log('completed!');
    });

console.log('EOF');


// => in-between
// => subscribe function
// => onNext: 1
// => onNext: 2
// => onNext: 3
// => completed!
// => EOF

出力の順番は読者の予想通りだ思います。図解でもう一度見てみましょう。

call-flow

ところが、 source = source.subscribeOn(Rx.Scheduler.timeout); のコメントを外して、購読タスクを「timeoutスケジュール」上で実行するように指定しますと……。

// => in-between
// => EOF
// => subscribe function
// => onNext: 1
// => onNext: 2
// => onNext: 3
// => completed!

なぜこうなったかと言うと、subscribeの処理を別スレッド(厳密に言うと次のイベントループ)に回したからです。
JavaScriptはシングルスレッドですが、setTimeout(fn,0) で擬似的にマルチスレッドっぽい挙動にすることができます。「timeoutスケジューラ」とは言っても、実際はランタイムによって利用している関数が違ってきます。

利用する関数の優先順位は以下の通り。

  • setImmediate
  • nextTick
  • postMessage
  • MessageChannel
  • script readystatechanged
  • setTimeout

例えばNode.jsなら、「setTimeout」ではなく「setImmediate」or「nextTick」が使われます。主な目的は、重い処理する際にUIをブロックすることを避けるためです。

では、source = source.subscribeOn(Rx.Scheduler.timeout); を再びコメントアウトして、今度は source = source.observeOn(Rx.Scheduler.timeout); をオンにしてみると……。

// => in-between
// => subscribe function
// => EOF
// => onNext: 1
// => onNext: 2
// => onNext: 3
// => completed!

原理は全く一緒です。今度は onNext の処理を一旦キューに入れて、次のイベントループでキューから呼び出します。

Schedulerの種類

Schedulerの種類ですが、シングルスレッドのJavaScriptは今のところ以下の3つくらいです。

  • timeoutScheduler : タスクを次のイベントループで実行するようにするスケジュール(非同期処理)。
  • immediateScheduler : その場でタスクを実行する(同期処理)。
  • currentThreadScheduler : immediateSchedulerと同様、その場で実行される。ただし、再帰的な呼び出しの場合はtimeoutSchedulerの挙動になる。

実例

RxJSの基礎は以上です。オペーレーターの数は殺人的に多いのですが、基本的な考え方さえ押さえておけば、あとは楽です。

では、つまらない理論はここまでにして、実例でおさらいも兼ねて情報整理しておきましょう。
実例では、過度な説明を控えておりますが、オペーレーターの数は山ほどあります。公式ドキュメントに図解と詳細な解説があるので、オペーレーターの使い方がよく分からなければ、公式ページのほうで確認してみてください!

実例1. 「ドラッグ・アンド・ドロップ」

いわゆる「ドラッグ・アンド・ドロップ」の実装ですね。

RxJS

(function() {
    var $box, box_width, mousedown_events, mousemove_events, mouseup_events, source;

    $box = $('#box');
    mouseup_events = Rx.Observable.fromEvent($box, 'mouseup');
    mousemove_events = Rx.Observable.fromEvent(document, 'mousemove');
    mousedown_events = Rx.Observable.fromEvent($box, 'mousedown');

    source = mousedown_events.flatMap(function(event) {
        var start_left, start_pageX, start_pageY, start_top;
        start_pageX = event.pageX;
        start_pageY = event.pageY;
        start_left = parseInt($box.css('left'));
        start_top = parseInt($box.css('top'));
        $box.addClass('hovering');
        return mousemove_events.map(function(e) {
            return {
                left: start_left + (e.pageX - start_pageX),
                top: start_top + (e.pageY - start_pageY)
            };
        }).takeUntil(mouseup_events);
    });

    mouseup_events.subscribe(function() {
        $box.removeClass('hovering');
    });

    source.subscribe(function(pos) {
        TweenLite.set($box, {
            left: pos.left,
            top: pos.top
        });
    });

})();

使ったオペーレーター

解説

flatMapを使って、「mousedownイベント」を「mouseupするまで続くmousemoveイベント」に変換しています。

実例2. 「簡易データバインディング」

RxJS

(function () {
    var $color, $combined, $h1, $size, $text, bind, color, size, text;

    $h1 = $('h1');
    $text = $('.text>input');
    $size = $('.size>input');
    $color = $('.color>input');
    $combined = $('#combined');

    text = new Rx.BehaviorSubject($text.val());
    size = new Rx.BehaviorSubject($size.val());
    color = new Rx.BehaviorSubject($color.val());

    text.subscribe(function (val) {
        $h1.text(val);
    });

    size.subscribe(function (val) {
        $h1.css('font-size', val + 'px');
    });

    color.subscribe(function (val) {
        $h1.css('color', val);
    });

    bind = function (eType, elem, subject) {
        Rx.Observable.fromEvent(elem, eType).subscribe(function (e) {
            subject.onNext(e.target.value);
        });
    };

    text.combineLatest(size, color, function (text, size, color) {
        return "text: " + text + "<br>Size: " + size + "px<br>Color: " + color;
    }).subscribe(function (val) {
        return $combined.html(val);
    });

    bind('keyup', $text, text);
    bind('keyup change', $size, size);
    bind('change', $color, color);

})();

使ったオペーレーター

解説

一番最近に発行した値だけを取得するBehaviorSubjectを利用してUIを更新しています。jQueryでは同じことをより簡単にできますが、ミソは combineLatest を使ってBehaviorSubjectたちの値をまとめて受け取り、設定するところですね。

実例3. 「ゲームのコマンド」

JS Bin


(function () {
    var $ken, $stage, createCommand, keydowns, keys, skill;

    keys = {
        left: 37,
        right: 39,
        up: 38,
        down: 40,
        a: 65,
        s: 83
    };

    keydowns = Rx.Observable.fromEvent(document, 'keydown');

    /**
     * コマンドを作るヘルパー関数
     * @param {Array} combination_keysコマンドを構成するキーを順番に入れた配列
     * @param {Number} timeoutキーとキーの間のタイムアウト(ミリ秒)、指定した時間が過ぎたら失敗と見なす。
     * @param {Function} callbackコマンドが成功に押された時に実行するコールバック関数
     */
    createCommand = function (combination_keys, timeout, callback) {
        var get_source, watch;

        get_source = function () {
            var source;
            source = Rx.Observable.empty();

            // `source.concat()`を使って、順番にコマンドの各キーを繋げる
            combination_keys.forEach(function (keyCode, index) {
                var this_key;
                this_key = keydowns.filter(function (e) {
                    var is_correct;
                    is_correct = e.keyCode === keyCode;
                    if (is_correct === false) {
                        throw Error('incorrect key pressed');
                    } else {
                        return is_correct;
                    }
                }).take(1);
                if (index > 0) {
                    // 最初のキーにはタイムアウトを設定しない
                    this_key = this_key.timeout(timeout);
                }
                source = source.concat(this_key);
            });
            return source;
        };

        watch = function () {
            var source;
            source = get_source();
            source.subscribe(function () {
                console.log('correct');
            }, function (e) {
                console.log(e.message);
                watch();
            }, function () {
                console.log('completed');
                watch();
                callback();
            });
        };
        watch();
    };


    // アニメーションの処理
    $ken = $('.ken');
    $stage = $('.stage');
    skill = {
        hadoken: function () {
            var $fireball, tl;
            tl = new TimelineLite();
            $fireball = $('<div class="fireball"></div>');
            $ken.addClass('hadoken');
            $stage.append($fireball);

            tl.add(function () {
                $fireball.addClass('moving').animate({
                    left: '+=250'
                }, 3000, 'linear');
            }, 0.33).add(function () {
                $ken.removeClass('hadoken');
                console.log('yes');
            }, 0.5).add(function () {
                $fireball.addClass('explode');
            }, 3.3);
        },
        senpukyaku: function () {
            var tl;
            $ken.addClass('tatsumaki');
            tl = new TimelineLite();
            tl.add((function () {
                $ken.removeClass('tatsumaki');
            }), 2);
        }
    };


    // コマンドを設定する
    createCommand([keys.left, keys.down, keys.right, keys.a], 500, skill.hadoken);
    createCommand([keys.right, keys.down, keys.left, keys.s], 500, skill.senpukyaku);

})();

使ったオペーレーター

解説

一見難しそうに見えますが、大事なのは createCommand という関数だけです。
やってることは、filter で特定のキーだけをtake(1) を使って1回だけキャッチする。キャッチしたイベントに timeout を設定して、concat で順番に連結しています。

競合ライブラリ

今のところ、競合と言えるライブラリは「Bacon.js」くらいでしょう。RxJSと非常に似ています。
実を言うと、私は Bacon.jsをきっかけにRxJSを知りました……。
正直、使いやすさと用語の分かりやすさ共にRxJSより優れていると思います。歴史も結構あって、おすすめです。RxJSが理解できれば、すんなりと移行できると思います。

下記は各ライブラリの比較です。参考にさせていただきました。

参考:ECMAScript7を見据えた、JavaScript(TypeScript)で使えるFRPライブラリの比較調査
http://qiita.com/kondei/items/17e5d4867a0652911e52

なぜ今までRxJSの存在に気づかなかったのか?

なんとGithubでの公式ページのコミットを覗いてみたら、JS版の最初のコミットは2012年の7月です! ということは、実際に開発に着手したのはもっと早い段階です(少なくとも2010年あたりからだと思います)。

誠に面目ないのですが、私がまだこの業界に入っていないときから既に存在していたのにも関わらず、今更その存在を知りました……。それで、なぜ今までRxJSの存在を知らなかったのか、どうしても腑に落ちなかったので、その原因を自分なりに考えてみました。

情報が少なかった

何よりも情報が少なかったです。書籍はともかく、今ググっても情報はそんなに出てきません。

.NETで開発している人からしたら、とっくに常識になっているのかもしれません。しかし、C#以外の言語ではどうも情報が少ないように思います。

下記は関数型プログラミング支援のライブラリである「underscore.js」との月間検索ボリュームの比較です。依然として検索ボリュームが少ないです。

adwords

時代性がなかった

RxJSは その名の通り、Reactive Programmingをサポートするライブラリです。しかし、そもそもReactive Programmingという概念が注目を集め始めたのはここ最近の話です。

参考:注目を集めるリアクティブプログラミング
http://www.infoq.com/jp/news/2013/09/reactive-programming-emerging

Reactive Programmingの概念自体はそんなに新しいものではないですが、恐らく「馴染みがない+その時代でのニーズがない」ことが原因で、長らく水面下で進行していました。最近になって、やっと日の目を見ることができた感じですかね。

下記はGoogleトレンドで、「reactive programming」「RxJS」「Reactive Extensions」の検索ボリュームを調べた結果です。

trend

Reactive Extensions と Reactive Programmingというキーワードはほぼ同時期に関心が高まった背景事情が見て取れると思います。そして、Reactive Programmingが引き金となって、2012年頃から現在までRxJSが伸び続けています。

学習コストがかかる

上記の2点から「認知度が低い」ことは容易に想像できます。そのため「学習コストがかかる」という結論が自然に導き出されます。

学習コストもかかるし、実際に役に立つのかどうかも不明瞭……。こんなシチュエーションに遭遇したら、大抵の人は勉強するのをやめるでしょう。

これから流行りそうな予感

近年はWebでもリッチなアプリケーション・サービスが求められるようになってきました。Webと言えばJavaScript、JavaScriptと言えばイベント駆動、非同期処理
そう、実にRxJSと相性がいいのです。「リアクティブなアプリケーション」は1つの傾向として、今後徐々にWeb開発業界のビッグキーワードになっていくでしょう。
そんな中で、RxJSおよび類似ライブラリもこの流れに乗っかって、少しずつ人目に触れるようになるのではと予測しています。

まとめ

今回の記事で「入門」と題したのは、私自身もRxJS初心者だからです(触り始めたのは数週間前です)。Reactive Programmingについてあれこれ調べていたのがきっかけでした。

私は結構大きな衝撃を受けたので、もっとたくさんの人に知っていただきたく、RxJSの普及を願って今回この記事を書きました。
説明不足や突っ込みどころもあると思うので、どうぞご教示いただければ幸いです!

参考資料

【JSライブラリを使ってみた】

Pixi.jsを初めて触って簡単にアニメーションを作成してみた話(その1)

p5.jsでProcessingのようにリッチなWeb表現を作るチュートリアル

デザイナー・ノンプログラマにおすすめしたいThree.jsのカンタン3D体験

Box2DWeb jsを使って物理演算をする方法

ブラウザ対応が簡単に!「modernizr.js」の機能と利用方法

王
この記事を書いた人

バックエンドエンジニア

2012年入社

この記事を読んだ人におすすめ