Reactive.Extensions の SubscribeOn の挙動がわかりません
C# の Reactive.Extensions について勉強中です。
環境は、.NET Core 2.1, C# 7.2 です。
スケジューラについて、挙動の確認をしていたのですが、スケジューラを指定した時の動作が想定しているものと異なるため戸惑っています。
まず1つ目のソースとして、スケジューラを指定しないものが以下です。
class Program
{
static void Main(string[] args)
{
Console.WriteLine($"Main Thread Id: {Thread.CurrentThread.ManagedThreadId}");
var a = new A();
a.Subscribe(onNext: x =>
{
Console.WriteLine($"Current Thread Id: {Thread.CurrentThread.ManagedThreadId}, Value:{x}");
});
a.Notify(1);
a.Notify(2);
a.Notify(3);
Console.ReadKey();
}
}
class A : IObservable<int>
{
readonly Subject<int> observers = new Subject<int>();
public IDisposable Subscribe(IObserver<int> observer)
{
return observers.Subscribe(observer);
}
public void Notify(int x) => observers.OnNext(x);
}
これは、以下のように出力されます。
Main Thread Id: 1
Current Thread Id: 1, Value:1
Current Thread Id: 1, Value:2
Current Thread Id: 1, Value:3
ここで、スケジューラとして ThreadPoolScheduler
を以下のように指定しました。
class Program
{
static void Main(string[] args)
{
Console.WriteLine($"Main Thread Id: {Thread.CurrentThread.ManagedThreadId}");
var a = new A();
// ↓ここです
a.SubscribeOn(ThreadPoolScheduler.Instance).Subscribe(onNext: x =>
{
Console.WriteLine($"Current Thread Id: {Thread.CurrentThread.ManagedThreadId}, Value:{x}");
});
a.Notify(1);
a.Notify(2);
a.Notify(3);
Console.ReadKey();
}
}
class A : IObservable<int>
{
readonly Subject<int> observers = new Subject<int>();
public IDisposable Subscribe(IObserver<int> observer)
{
return observers.Subscribe(observer);
}
public void Notify(int x) => observers.OnNext(x);
}
すると、出力結果は
Main Thread Id: 1
となりました。
期待していた動作としては、
Main Thread Id: 1
Current Thread Id: X, Value:1
Current Thread Id: X, Value:2
Current Thread Id: X, Value:3
だったのですが、ThreadPoolScheduler
を指定することで何も実行されなくなるのが何故かわかりません。
どなたか、ご教授いただければ幸いです。
よろしくお願い致します。