ストリーミングデータを受信するごとに都度処理する方法について
ストリーミングデータが提供される API を利用する際に、サーバーからのレスポンスが届くたびに、リアルタイムにできるだけ近い形で処理したいと考えています。
以下のような状況において、もしより適切なアプローチがありましたらご教示いただけましたら幸いです。
状況
サーバーからは何かしらの event が発生するごとにレスポンスが、また 15 秒ごとに接続を維持するための heartbeat メッセージが届きます。
現在は requests
を使って以下のように書いています。Python のバージョンは 3.4 です。
# stream=True を指定してリクエストを投げる
res = requests.get(url, headers=HEADERS, params=payload, stream=True)
# iter_lines() で 1 行ずつ処理する
for line in res.iter_lines(chunk_size=64):
line = line.decode('utf-8')
do something...
問題点1
当初 iter_lines()
を chunk_size
を指定せずに使っていたところ、デフォルトだと chunk_size=512
となっており、heartbeat メッセージのみの場合は 5,6 件ずつまとまって処理されていました。
chunk_size
を小さくすることで処理の間隔をある程度は短縮できたと思いますが、レスポンス 1 件ごとに都度処理というところまでは至っておりません。
問題点2
また、特定の種類のレスポンスについては I/O 待ちが発生する処理を行うため、その間ブロックされてしまう点も気になっています。
非同期処理?
非同期処理を扱うライブラリかフレームワークを利用することで解決できるのではと思い、asyncio
, aiohttp
, Twisted
などについてドキュメント、サンプルコードを読み自分なりに調べてみて、恐らく asyncio
を使えばいいのではないかと思っているのですが、その見立てが正しいのか、また、私のケースではどのように書いたらよいのかがわかりません。
asyncio
のこのあたりが関係するのではないかと思うのですが、よく理解できておりません。
http://docs.python.jp/3/library/asyncio-stream.html
- 非同期処理、
asyncio
で解決する、という考え方は適切でしょうか? - どのように書いたらよいでしょうか?
上記 2 点について可能な範囲で教えていただけませんでしょうか。