以下のコードは、「source2 は source1 の周期内で一度だけ購読される」という意図で書きました。

var Rx = require('rx');

var source1 = Rx.Observable
  .interval(500)
;
var source2 = Rx.Observable
  .interval(100)
  .pausable()
;

source1.subscribe(
  function(callCount) {
    source2.resume();
    console.log('source1:', callCount);
  },
  function (err) {
    console.log('Error(source1): ' + err);
  }
);
source2.subscribe(
  function(callCount) {
    source2.pause();
    console.log('source2:', callCount);
  },
  function (err) {
    console.log('Error(source2): ' + err);
  }
);

source2.resume();

私は以下の様な出力を期待していましたが:

source2: 0
source1: 0
source2: 4
source1: 1
source2: 9
source1: 2
source2: 14
source1: 3
source2: 19
source1: 4
source2: 24
..

実際の出力はこのようになりました:

source2: 0
source1: 0
source2: 0
source1: 1
source2: 0
source1: 2
source2: 0
source1: 3
source2: 0
source1: 4
source2: 0
..

どのようにすれば、正しく pausable で監視制御ができますでしょうか。

実行環境は以下です:

  • Mac OSX
  • Node.js 0.12.4
  • RxJS 2.5.3

なお、最終的な目的は、いわゆるゲーム的な「フレーム内で最初の 1 つだけキー入力を受け付ける」というもので、source2 は process.stdin からのキー入力を受け付ける予定です。