Webフレームワーク内でasyncioを使用した並列処理を試みると`this event loop is already running`が発生する
処理の内容
Python 3.6.8 で asyncio と Requests で複数の HTTP リクエストを並列で送信しています。それぞれの完了を待ち合わせ、レスポンスの本文を結合する必要があります。
import asyncio, requests
# requestsを使用しHTTPリクエストを行うだけのコルーチン
async def coroutine(url):
try:
res = requests.get(url=url)
return res.json()
except:
return {}
# 複数のリクエストを並列で送信
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(
coroutine("https://public.bitbank.cc/btc_jpy/ticker"),
coroutine("https://www.btcbox.co.jp/api/v1/ticker/"),
coroutine("https://coincheck.com/api/ticker")
))
print(len(results)) # 3
上記のコードは上手く動きます。
しかし、これを Responder や AIOHTTP といった HTTP サーバーに持ち込むとイベントループの使用について問題が発生します。
Responderに取り込んだ場合
わたしは REST API の実装に取り組んでおり、複数の外部サーバからリソースを並列な方法で取得し、その全ての結果を結合して返す API を提供しようと Responder を使用し次のようなコードを書きました:
# api.py
import asyncio, responder, requests
async def coroutine(url):
try:
res = requests.get(url=url)
return res.json()
except:
return {}
api = responder.API()
@api.route("/tickers")
class tickers:
async def on_get(self, req, resp):
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(
coroutine("https://public.bitbank.cc/btc_jpy/ticker"),
coroutine("https://www.btcbox.co.jp/api/v1/ticker/"),
coroutine("https://coincheck.com/api/ticker")
))
body = {"results": results}
resp.media = body
if __name__ == "__main__":
api.run()
これを python api.py
で実行し、稼働したローカルサーバ http://localhost:5042/tickers
にアクセスすると次のようなエラーが発生します。
...
RuntimeError: this event loop is already running.
AIOHTTPの場合
AIOHTTP を使用しても同じように this event loop is already running.
が出力されます。
# app.py
import asyncio, requests
from aiohttp import web
async def coroutine(url):
try:
res = requests.get(url=url)
return res.json()
except:
return {}
routes = web.RouteTableDef()
@routes.get("/tickers")
async def tickers(request):
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(
coroutine("https://public.bitbank.cc/btc_jpy/ticker"),
coroutine("https://www.btcbox.co.jp/api/v1/ticker/"),
coroutine("https://coincheck.com/api/ticker")
))
body = { "results": results }
return web.json_response(body)
if __name__ == "__main__":
app = web.Application()
app.add_routes(routes)
web.run_app(app)
Responder や AIOHTTP のようなフレームワークを使用する場合、どのようにすれば内部の処理において並列処理(主に外部サーバへ HTTP リクエストを送信するため)を行えるでしょうか。