SignalR(.NET Client)を使用して、イベントの通知とデータの取得を行うプログラムを作りました。
やりたいことはとりあえず出来たのですが、あまり処理を綺麗に書けませんでした。
マルチスレッドの問題やDisposable漏れなどに考慮して、もっと処理をスマートに書けるものでしょうか?
Rxなどを使うともっと綺麗に書けたりするものなのかもしれませんが、自分の力では、これをどうRxで記述すれば良いかイメージができませんでした。

なにかアイディアがあれば力をお貸しください。

■やりたいこと

  • サーバ側は一定間隔でクライアントにデータの発生を通知する。
  • データはキューに保持する。

  • クライアントはデータ発生のイベントを受信すると、サーバからデータ取得を行う。

  • データ発生のイベントだけでなく、接続直後にもデータの取得を行う。
  • データ取得は、サーバのキューが空になるまで続けて呼び出す。
  • クライアントが接続に失敗した場合、一定時間後に再接続を試みる。
  • クライアントは任意の時点で開始/停止ができる。

■クライアント/サーバ共通ソース

public class Data
{
    public int Id { get; set; }
    public string Text { get; set; }
}

■サーバ側ソース

// Hub
[HubName("test")]
public class TestHub : Hub
{
    // データ取得処理
    public Data Query()
    {
        return Service.Deque();
    }
}

// 通知およびキューの管理、事前にStartしておく
public static class Service
{
    private static readonly ConcurrentQueue<Data> queue = new ConcurrentQueue<Data>();

    private static int nextId = 1;

    private static Timer timer;

    // 一定期間毎にキューにデータを入れて、クライアントに通知
    public static void Start(IHubConnectionContext<dynamic> clients)
    {
        timer = new Timer(state =>
        {
            nextId++;
            queue.Enqueue(new Data { Id = nextId, Text = String.Format("Data-{0}", nextId) });

            clients.All.Update();
        }, null, 3000, 3000);
    }

    public static void Stop()
    {
        timer.Dispose();
    }

    public static Data Deque()
    {
        Data data;
        queue.TryFetchAndAction(out data);
        return data;
    }
}

■クライアント側ソース(もっとスマートにしたい部分)

public class Client
{
    // hub操作用の同期オブジェクト
    private readonly object sync = new object();

    // 実行状態、Closed時の再接続と、データ取得の連続処理中断に使用
    private readonly ManualResetEvent running = new ManualResetEvent(false);

    // 再接続用のタイマー
    private Timer timer;

    private HubConnection hub;

    private IHubProxy proxy;

    private void Start()
    {
        if (hub != null)
        {
            return;
        }

        // 実行状態に移行
        running.Set();

        hub = new HubConnection("http://127.0.0.1:10080");
        hub.Closed += () =>
        {
            // 旧タイマーを片付け
            var disposable = Interlocked.Exchange(ref timer, null);
            if (disposable != null)
            {
                disposable.Dispose();
            }

            if (running.WaitOne(0))
            {
                // 一定時間後に再接続の試行
                Interlocked.Exchange(ref timer, new Timer(state =>
                {
                    lock (sync)
                    {
                        if (hub != null)
                        {
                            ConnectHubAsync();
                        }
                    }
                }, null, 5000, 0));
            }
        };

        proxy = hub.CreateHubProxy("test");
        proxy.On("Update", FetchAndAction);

        ConnectHubAsync();
    }

    // 接続と、接続とのデータ取得
    private void ConnectHubAsync()
    {
        hub.Start().ContinueWith(t =>
        {
            if (hub.State == ConnectionState.Connected)
            {
                FetchAndAction();
            }
        });
    }

    // データ取得の連続処理
    private async void FetchAndAction()
    {
        while (running.WaitOne(0))
        {
            Data data;
            try
            {
                data = await proxy.Invoke<Data>("Query");
            }
            catch (Exception ex)
            {
                // Invoke中にHubConnection.Stop()するとInvalidOperationException発生
                break;
            }

            // もうデータがない
            if (data == null)
            {
                break;
            }

            // ここでDataを元に処理を行う
        }
    }

    private void Stop()
    {
        if (hub == null)
        {
            return;
        }

        // 停止状態に移行
        running.Reset();

        lock (sync)
        {
            var disposable = Interlocked.Exchange(ref timer, null);
            if (disposable != null)
            {
                disposable.Dispose();
            }

            hub.Stop();
            hub = null;
        }
    }
}