async def
await
async with
async for
@asyncio.coroutine
import asyncio
@asyncio.coroutine
def hello():
return 'Hello, world!'
loop = asyncio.get_event_loop()
message = loop.run_until_complete(hello())
print(message)
loop.close()
async def
import asyncio
async def hello():
return 'Hello, world!'
loop = asyncio.get_event_loop()
message = loop.run_until_complete(hello())
print(message)
loop.close()
@asyncio.coroutine + yield from
import asyncio
from aiohttp import client
@asyncio.coroutine
def fetch_page(url):
response = yield from client.get(url)
return (yield from response.text())
loop = asyncio.get_event_loop()
content = loop.run_until_complete(fetch_page('http://lvivpy.org.ua/'))
print(content)
loop.close()
async def + await
import asyncio
from aiohttp import client
async def fetch_page(url):
response = await client.get(url)
return await response.text()
loop = asyncio.get_event_loop()
content = loop.run_until_complete(fetch_page('http://lvivpy.org.ua/'))
print(content)
loop.close()
with (yield from ...)
import asyncio
import sqlalchemy as sa
from aiopg.sa import create_engine
@asyncio.coroutine
def count_data(dsn):
engine = yield from create_engine(dsn)
with (yield from engine) as conn:
query = sa.select(...).count()
return (yield from conn.scalar(query))
loop = asyncio.get_event_loop()
counter = loop.run_until_complete(count_data('postgresql://...'))
print(counter)
loop.close()
async with
import asyncio
import sqlalchemy as sa
from aiopg.sa import create_engine
from .utils import ConnectionContextManager
async def count_data(dsn):
engine = await create_engine(dsn)
async with ConnectionContextManager(engine) as conn:
query = sa.select(...).count()
return await conn.scalar(query)
loop = asyncio.get_event_loop()
counter = loop.run_until_complete(count_data('postgresql://...'))
print(counter)
loop.close()
async with
utils.py
class ConnectionContextManager(object):
def __init__(self, engine):
self.conn = None
self.engine = engine
async def __aenter__(self):
self.conn = await self.engine.acquire()
return self.conn
async def __aexit__(self, exc_type, exc, tb):
try:
self.engine.release(self.conn)
finally:
self.conn = None
self.engine = None
for row in (yield from ...):
import asyncio
import sqlalchemy as sa
from aiopg.sa import create_engine
@asyncio.coroutine
def fetch_data(dsn):
data = []
engine = yield from create_engine(dsn)
with (yield from engine) as conn:
result = yield from conn.execute(sa.select(...))
for row in result:
data.append(row)
return data
loop = asyncio.get_event_loop()
data = loop.run_until_complete(fetch_data('postgresql://...'))
loop.close()
async for
import asyncio
import sqlalchemy as sa
from aiopg.sa import create_engine
from .utils import ConnectionContextManager, ResultIter
async def fetch_data(dsn):
data = []
engine = await create_engine(dsn)
async with ConnectionContextManager(engine) as conn:
async for row in ResultIter(await conn.execute(sa.select(...))):
data.append(row)
return data
loop = asyncio.get_event_loop()
data = loop.run_until_complete(fetch_data('postgresql://...'))
loop.close()
async for
utils.py
from aiopg.sa.exc import ResourceClosedError
class ResultIter(object):
def __init__(self, result):
self.result = result
async def __aiter__(self):
return self
async def __anext__(self):
try:
data = await self.result.fetchone()
except ResourceClosedError:
data = None
if data:
return data
raise StopAsyncIteration
@types.coroutine
bridge between generator based and
native coroutines
__await__
magic methodinspect
library as
iscoroutine
, isawaitable
, etc
abc.Awaitable
,
abc.Coroutine
, abc.AsyncIterable
,
abc.AsyncIterator
Method | Can contain | Can't contain |
---|---|---|
async def func |
await, return value |
yield, yield from |
async def __a*__ |
await, return value |
yield, yield from |
def __a*__ |
return awaitable |
await |
def __await__ |
yield, yield from, return iterable |
await |
generator |
yield, yield from, return value |
await |
0.19.0
import asyncio
import ujson
from aiohttp import client
async def github_search(query):
response = await client.get('https://api.github.com/search/repositories',
params={'q': query})
return await response.json(loads=ujson.loads)
loop = asyncio.get_event_loop()
response = loop.run_until_complete(github_search('asyncio'))
print('\n'.join(repo['full_name'] for repo in response['items']))
loop.close()
import ujson
from aiohttp import web
async def api_index(request):
return web.json_response({...}, dumps=ujson.dumps)
app = web.Application()
app.router.add_route('GET', '/api/', api_index)
gunicorn -k aiohttp.worker.GunicornWebWorker -w 9 -t 60 app:app
0.7.0
import asyncio
from aiopg import create_pool
async def select_one(dsn):
pool = await create_pool(dsn)
with (await pool.cursor()) as cursor:
await cursor.execute('SELECT 1')
selected = await cursor.fetchone()
assert selected == (1, )
loop = asyncio.get_event_loop()
loop.run_until_complete(select_one('dbname=... user=... password=... host=...'))
loop.close()
import asyncio
import sqlalchemy as sa
from aiopg.sa import create_engine
from .utils import ConnectionContextManager
async def select_one(dsn):
engine = await create_engine(dsn)
async with ConnectionContextManager(engine) as conn:
await conn.execute(sa.select([sa.text('1')]))
selected = await conn.fetchone()
assert selected == (1, )
loop = asyncio.get_event_loop()
loop.run_until_complete(select_one('postgresql://...'))
loop.close()
0.2.4
import asyncio
from aioredis import create_redis
async def redis_set_get_delete(address, **options):
options.setdefault('encoding', 'utf-8')
redis = await create_redis(address, **options)
assert await redis.set('key', 'value') is True
assert await redis.get('key') == 'value'
assert await redis.delete('key') == 1
loop = asyncio.get_event_loop()
loop.run_until_complete(redis_set_get_delete(('localhost', 6379)))
loop.close()
data: Hello, Lviv.py
<blank line>
event: lvivpy
data: Hello, Lviv.py
data: Nice to be here again!
<blank link>
id: 42
event: uapycon
data: PyCon Ukraine 2016 will held a place in Lviv at April 2016
<blank link>
const element = document.getElementById("notifications-container");
const source = new EventSource("/api/notifications");
source.onerror = err => {
...
};
source.addEventListener("lvivpy", evt => {
element.innerHTML += `<code>#lvivpy</code> ${evt.data}<br>`;
});
source.addEventListener("uapycon", evt => {
element.innerHTML += `<code>#uapycon</code> ${evt.data}<br>`;
});
...
source.close();
def main(*args):
season, week = args
loop = asyncio.get_event_loop()
week_obj = loop.run_until_complete(fetch_week(season, week, False))
week_data = [to_game_data(game) for game in week_obj.iterfind('gms/g')]
redis = loop.run_until_complete(create_redis(('localhost', 6379)))
loop.run_until_complete(redis.publish('CHANNEL', ujson.dumps(week_data)))
loop.close()
return False
if __name__ == '__main__':
sys.exit(int(main(*sys.argv[1:])))
async def livescore(request):
if request.headers['Accept'] != 'text/event-stream':
raise web.HTTPFound('/api/')
response = web.StreamResponse(status=200, headers={
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
})
await response.prepare(request)
...
return response
...
redis = await create_redis(('localhost', 6379))
channel = (await redis.subscribe('CHANNEL'))[0]
while (await channel.wait_message()):
message = await channel.get()
response.write(b'event: update\r\n')
response.write(b'data: ' + message + b'\r\n\r\n')
...
from aiohttp import web
app = web.Application()
app.router.add_route('GET', '/api/livescore', livescore)
import React, {Component} from "react";
export default App extends Component {
liveScore = null;
constructor(props) {
super(props);
this.state = {games: props.games};
}
componentDidMount() {
this.liveScore = new EventSource("/api/");
this.liveScore.addEventListener("update", evt => {
this.setState({games: JSON.parse(evt.data)});
});
}
componentWillUnmount() {
this.liveScore.close();
}
render() {
return <Games data={this.state.games} />;
}
}
location /api/livescore {
chunked_transfer_encoding off;
proxy_buffering off;
proxy_cache off;
proxy_http_version 1.1;
proxy_pass http://127.0.0.1:8000;
proxy_set_header Connection '';
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
import logging
from aiohttp import MsgType, web
logger = logging.getLogger()
async def websocket_server(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for message in ws:
if message.tp == MsgType.text:
if message.data == 'close':
ws.close()
else:
logger.info('Message received {0}'.format(message.data))
ws.send_str('Welcome, {0}'.format(message.data))
elif message.tp == MsgType.error:
logger.warning('WebSocket error', extra={'ws_exc': ws.exception()})
logger.info('WebSocket connection closed')
return ws
app.router.add('GET', '/ws', websocket_server)
from aiohttp import ClientSession
async def send_ws_message(message=None):
session = ClientSession()
async with session.ws_connect('ws://<server>/path') as ws:
ws.send_str('Hello, {0}!'.format(message or 'world'))
from aiohttp import ClientSession
async def ws_client(message=None):
session = ClientSession()
async with session.ws_connect('ws://<server>/path') as ws:
async for msg in ws:
...
const ws = new WebSocket('ws://<server>/path');
// Send message to WebSocket server
ws.send("Hello, world!");
// Receive message from WebSocket server
ws.onmessage = function(event) {
console.log(event.data);
}
async
/await
statements
asyncio
stack got great addition, code now is simple
to read and understand
aiohttp
– interact with remote APIaiohttp.web
– create API backendsaiopg
– interact with PostgreSQL (DB)
aioredis
– interact with Redis (cache)
asyncio
stack is a great reason why you'll finally
need to switch to Python 3
asyncio
is good fit for: