async def
await
async with
async for
@asyncio.coroutine
import asyncio
@asyncio.coroutine
def hello():
return 'Hello, world!'
loop = asyncio.get_event_loop()
message = loop.run_until_complete(hello())
print(message)
loop.close()
async def
import asyncio
async def hello():
return 'Hello, world!'
loop = asyncio.get_event_loop()
message = loop.run_until_complete(hello())
print(message)
loop.close()
@asyncio.coroutine + yield from
import asyncio
from aiohttp import client
@asyncio.coroutine
def fetch_page(url):
response = yield from client.get(url)
return (yield from response.text())
loop = asyncio.get_event_loop()
content = loop.run_until_complete(fetch_page('http://fi.pycon.org/'))
print(content)
loop.close()
async def + await
import asyncio
from aiohttp import client
async def fetch_page(url):
response = await client.get(url)
return await response.text()
loop = asyncio.get_event_loop()
content = loop.run_until_complete(fetch_page('http://fi.pycon.org/'))
print(content)
loop.close()
with (yield from ...)
import asyncio
import sqlalchemy as sa
from aiopg.sa import create_engine
@asyncio.coroutine
def count_data(dsn):
engine = yield from create_engine(dsn)
with (yield from engine) as conn:
query = sa.select(...).count()
return (yield from conn.scalar(query))
loop = asyncio.get_event_loop()
counter = loop.run_until_complete(count_data('postgresql://...'))
print(counter)
loop.close()
async with
import asyncio
import sqlalchemy as sa
from aiopg.sa import create_engine
from .utils import ConnectionContextManager
async def count_data(dsn):
engine = await create_engine(dsn)
async with ConnectionContextManager(engine) as conn:
query = sa.select(...).count()
return await conn.scalar(query)
loop = asyncio.get_event_loop()
counter = loop.run_until_complete(count_data('postgresql://...'))
print(counter)
loop.close()
async with
utils.py
class ConnectionContextManager(object):
def __init__(self, engine):
self.conn = None
self.engine = engine
async def __aenter__(self):
self.conn = await self.engine.acquire()
return self.conn
async def __aexit__(self, exc_type, exc, tb):
try:
self.engine.release(self.conn)
finally:
self.conn = None
self.engine = None
for row in (yield from ...):
import asyncio
import sqlalchemy as sa
from aiopg.sa import create_engine
@asyncio.coroutine
def fetch_data(dsn):
data = []
engine = yield from create_engine(dsn)
with (yield from engine) as conn:
result = yield from conn.execute(sa.select(...))
for row in result:
data.append(row)
return data
loop = asyncio.get_event_loop()
data = loop.run_until_complete(fetch_data('postgresql://...'))
loop.close()
async for
import asyncio
import sqlalchemy as sa
from aiopg.sa import create_engine
from .utils import ConnectionContextManager, ResultIter
async def fetch_data(dsn):
data = []
engine = await create_engine(dsn)
async with ConnectionContextManager(engine) as conn:
async for row in ResultIter(await conn.execute(sa.select(...))):
data.append(row)
return data
loop = asyncio.get_event_loop()
data = loop.run_until_complete(fetch_data('postgresql://...'))
loop.close()
async for
utils.py
from aiopg.sa.exc import ResourceClosedError
class ResultIter(object):
def __init__(self, result):
self.result = result
async def __aiter__(self):
return self
async def __anext__(self):
try:
data = await self.result.fetchone()
except ResourceClosedError:
data = None
if data:
return data
raise StopAsyncIteration
@types.coroutine
bridge between generator based and
native coroutines
__await__
magic methodinspect
library as
iscoroutine
, isawaitable
, etc
abc.Awaitable
,
abc.Coroutine
, abc.AsyncIterable
,
abc.AsyncIterator
Method | Can contain | Can't contain |
---|---|---|
async def func |
await, return value |
yield, yield from |
async def __a*__ |
await, return value |
yield, yield from |
def __a*__ |
return awaitable |
await |
def __await__ |
yield, yield from, return iterable |
await |
generator |
yield, yield from, return value |
await |
0.17.4
import asyncio
import ujson
from aiohttp import client
async def github_search(query):
response = await client.get('https://api.github.com/search/repositories',
params={'q': query})
return ujson.loads(await response.read())
loop = asyncio.get_event_loop()
response = loop.run_until_complete(github_search('asyncio'))
print('\n'.join(repo['full_name'] for repo in response['items']))
loop.close()
import ujson
from aiohttp import web
async def api_index(request):
output = ujson.dumps({...})
return web.Response(body=output, content_type='application/json')
app = web.Application()
app.router.add_route('GET', '/api/', api_index)
gunicorn -k aiohttp.worker.GunicornWebWorker -w 9 -t 60 app:app
0.7.0
import asyncio
from aiopg import create_pool
async def select_one(dsn):
pool = await create_pool(dsn)
with (await pool.cursor()) as cursor:
await cursor.execute('SELECT 1')
selected = await cursor.fetchone()
assert selected == (1, )
loop = asyncio.get_event_loop()
loop.run_until_complete(select_one('dbname=... user=... password=... host=...'))
loop.close()
import asyncio
import sqlalchemy as sa
from aiopg.sa import create_engine
from .utils import ConnectionContextManager
async def select_one(dsn):
engine = await create_engine(dsn)
async with ConnectionContextManager(engine) as conn:
await conn.execute(sa.select([sa.text('1')]))
selected = await conn.fetchone()
assert selected == (1, )
loop = asyncio.get_event_loop()
loop.run_until_complete(select_one('postgresql://...'))
loop.close()
0.2.4
import asyncio
from aioredis import create_redis
async def redis_set_get_delete(address, **options):
options.setdefault('encoding', 'utf-8')
redis = await create_redis(address, **options)
assert await redis.set('key', 'value') is True
assert await redis.get('key') == 'value'
assert await redis.delete('key') == 1
loop = asyncio.get_event_loop()
loop.run_until_complete(redis_set_get_delete(('localhost', 6379)))
loop.close()
import requests
from lxml import etree
NFL_URL = 'http://www.nfl.com/ajax/scorestrip'
def fetch_week(season, week, is_preseason=False):
response = requests.get(NFL_URL, params={
'season': season,
'seasonType': 'PRE' if is_preseason else 'REG',
'week': week,
})
return etree.fromstring(response.content)
def fetch_season(season):
pre_call(...)
for is_preseason, weeks in ((False, range(5)), (True, range(1, 18))):
for week in weeks:
fetch_week(season, week, is_preseason)
...
post_call(...)
from aiohttp import client
from lxml import etree
async def aio_fetch_week(season, week, is_preseason=False):
response = await client.get(NFL_URL, params={
'season': season,
'seasonType': 'PRE' if is_preseason else 'REG',
'week': week,
})
return etree.fromstring(await response.read())
import asyncio
def fetch_season(season):
loop = asyncio.get_event_loop()
loop.run_until_complete(pre_call(...))
tasks = [
aio_fetch_week(season, year, is_preseason)
for is_preseason, weeks in ((False, range(5)), (True, range(1, 18)))
for week in weeks
]
loop.run_until_complete(asyncio.wait(tasks, loop=loop))
loop.run_until_complete(post_call(...))
loop.close()
scripts/fetch_season_data.py
import sys
def main(*args):
fetch_season(int(args[0]))
return False
if __name__ == '__main__':
sys.exit(int(main(*sys.argv[1:])))
$ python scripts/fetch_season_data.py 2015
...
All games for Season 2015/REG. Week 17 processed!
Season 2015/REG. Week 12. Game ID 2015113000, CLE @ BAL, scheduled at 2015-11-30T20:30:00-05:00 updated in database
All games for Season 2015/REG. Week 12 processed!
Season 2015/PRE. Week 4. Game ID 2015090361, DAL @ HOU, scheduled at 2015-09-03T20:00:00-04:00 updated in database
Season 2015/REG. Week 16. Game ID 2015122711, SEA @ STL, scheduled at 2015-12-27T16:25:00-05:00 updated in database
Season 2015/PRE. Week 4. Game ID 2015090362, STL @ KC, scheduled at 2015-09-03T20:00:00-04:00 updated in database
Season 2015/REG. Week 16. Game ID 2015122712, BAL @ PIT, scheduled at 2015-12-27T20:30:00-05:00 updated in database
Season 2015/PRE. Week 4. Game ID 2015090363, TEN @ MIN, scheduled at 2015-09-03T20:00:00-04:00 updated in database
Season 2015/REG. Week 16. Game ID 2015122800, DEN @ CIN, scheduled at 2015-12-28T20:30:00-05:00 updated in database
All games for Season 2015/REG. Week 16 processed!
Season 2015/PRE. Week 4. Game ID 2015090364, DEN @ ARI, scheduled at 2015-09-03T21:00:00-04:00 updated in database
Season 2015/PRE. Week 4. Game ID 2015090365, SF @ SD, scheduled at 2015-09-03T22:00:00-04:00 updated in database
Season 2015/PRE. Week 4. Game ID 2015090366, SEA @ OAK, scheduled at 2015-09-03T22:00:00-04:00 updated in database
All games for Season 2015/PRE. Week 4 processed!
...
XML
GraphQL
JSON
from flask import Flask
from . import cache
from .data import to_game_data
app = Flask(__name__)
@app.route('/api/season/<int:season_year>/<week_slug>')
def retrieve_week(season_year, week_slug):
is_preseason = week_slug[:4] == 'pre-'
week = int(week_slug[4:] if is_preseason else week_slug)
cache_key = cache.build_key(season_year, week_slug)
week_data = cache.ensure_data(cache_key)
if week_data is None:
week_obj = fetch_week(season_year, week, is_preseason)
week_data = [to_game_data(game) for game in week_obj.iterfind('gms/g')]
cache.store_data(cache_key, week_data)
return jsonify(week_data)
$ gunicorn -k sync -w 5 -t 60 app:app
$ gunicorn -k eventlet -w 5 -t 60 app:app
import ujson
from aiohttp import web
async def aio_retrieve_week(request):
season_year = int(request.match_info['season_year'])
week_slug = request.match_info['week_slug']
is_preseason = week_slug[:4] == 'pre-'
week = int(week_slug[4:] if is_preseason else week_slug)
cache_key = cache.build_key(season_year, week, is_preseason)
week_data = await cache.aio_ensure_data(cache_key)
if week_data is None:
week_obj = await aio_fetch_week(season_year, week, is_preseason)
week_data = [to_game_data(game) for game in week_obj.iterfind('gms/g')]
await cache.store_data(cache_key, week_data)
return web.Response(ujson.dumps(week_data),
content_type='application/json')
aio_app = web.Application()
aio_app.router.add_route('GET',
'/api/season/{season_year:\d{4}}/{week_slug}',
aio_retrieve_week)
coroutine
/async def
and return response or raise an exc
request
request.GET
multidict
request.POST
multidict and
should be awaited with
await request.post()
request.match_info
dict
request
also contains an app
instance
$ gunicorn -k aiohttp.worker.GunicornWebWorker -w 5 -t 60 app:aio_app
data: Hello, PyCon Finland!
<blank line>
event: pyconfi
data: Hello, Pycon Finland!
data: I'm glad to be here
<blank link>
id: 42
event: uapycon
data: PyCon Ukraine 2016 will held a place in Lviv at April 2016
<blank link>
const element = document.getElementById("notifications-container");
const source = new EventSource("/api/notifications");
source.onerror = err => {
...
};
source.addEventListener("pyconfi", evt => {
element.innerHTML += `<code>#pyconfi</code> ${evt.data}<br>`;
});
source.addEventListener("uapycon", evt => {
element.innerHTML += `<code>#uapycon</code> ${evt.data}<br>`;
});
...
source.close();
def main(*args):
season, week = args
loop = asyncio.get_event_loop()
week_obj = loop.run_until_complete(aio_fetch_week(season, week, False))
week_data = [to_game_data(game) for game in week_obj.iterfind('gms/g')]
redis = loop.run_until_complete(create_redis(('localhost', 6379)))
loop.run_until_complete(redis.publish('CHANNEL', ujson.dumps(week_data)))
loop.close()
return False
if __name__ == '__main__':
sys.exit(int(main(*sys.argv[1:])))
async def livescore(request):
if request.headers['Accept'] != 'text/event-stream':
raise web.HTTPFound('/api/')
response = web.StreamResponse(status=200, headers={
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
})
response.start(request) # This will be changed in asyncio==0.18.0
...
return response
...
redis = await create_redis(('localhost', 6379))
channel = (await redis.subscribe('CHANNEL'))[0]
while (await channel.wait_message()):
message = await channel.get()
response.write(b'event: update\r\n')
response.write(b'data: ' + message + b'\r\n\r\n')
...
aio_app.router.add_route('GET', '/api/livescore', livescore)
import React, {Component} from "react";
export default App extends Component {
liveScore = null;
constructor(props) {
super(props);
this.state = {games: props.games};
}
componentDidMount() {
this.liveScore = new EventSource("/api/");
this.liveScore.addEventListener("update", evt => {
this.setState({games: JSON.parse(evt.data)});
});
}
componentWillUnmount() {
this.liveScore.close();
}
render() {
return <Games data={this.state.games} />;
}
}
location /api/livescore {
chunked_transfer_encoding off;
proxy_buffering off;
proxy_cache off;
proxy_http_version 1.1;
proxy_pass http://127.0.0.1:8000;
proxy_set_header Connection '';
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
async
/await
statements
asyncio
stack got great addition, code now is simple
to read and understand
aiohttp
– interact with remote APIaiohttp.web
– create API backendsaiopg
– interact with PostgreSQL (DB)
aioredis
– interact with Redis (cache)
asyncio
stack is a great reason why you'll finally
need to switch to Python 3
asyncio
is good fit for:
aiohttp
out of the box