Advanced async/await
in Python 3.5

I am...

  • Igor Davydenko
  • Python & React.js developer
  • From Kyiv, Ukraine
  • Works on Ezhome Inc.
  • Primarly designs & develops backend API
  • Personal Site
  • @ GitHub
  • @ Twitter

PEP 492

Coroutines with async and await syntax

Quick Summary

  • Created by Yuri Selivanov in April, 2015
  • Included in Python 3.5
  • Added 4 new statements to the standard library:
    • async def
    • await
    • async with
    • async for
  • Received great response from the Python community
  • PEP 492 @ Python.org

@asyncio.coroutine

import asyncio

@asyncio.coroutine
def hello():
    return 'Hello, world!'

loop = asyncio.get_event_loop()
message = loop.run_until_complete(hello())
print(message)
loop.close()

async def

import asyncio

async def hello():
    return 'Hello, world!'

loop = asyncio.get_event_loop()
message = loop.run_until_complete(hello())
print(message)
loop.close()

@asyncio.coroutine + yield from

import asyncio
from aiohttp import client

@asyncio.coroutine
def fetch_page(url):
    response = yield from client.get(url)
    return (yield from response.text())

loop = asyncio.get_event_loop()
content = loop.run_until_complete(fetch_page('http://lvivpy.org.ua/'))
print(content)
loop.close()

async def + await

import asyncio
from aiohttp import client

async def fetch_page(url):
    response = await client.get(url)
    return await response.text()

loop = asyncio.get_event_loop()
content = loop.run_until_complete(fetch_page('http://lvivpy.org.ua/'))
print(content)
loop.close()

with (yield from ...)

import asyncio
import sqlalchemy as sa
from aiopg.sa import create_engine

@asyncio.coroutine
def count_data(dsn):
    engine = yield from create_engine(dsn)
    with (yield from engine) as conn:
        query = sa.select(...).count()
        return (yield from conn.scalar(query))

loop = asyncio.get_event_loop()
counter = loop.run_until_complete(count_data('postgresql://...'))
print(counter)
loop.close()

async with

import asyncio
import sqlalchemy as sa
from aiopg.sa import create_engine
from .utils import ConnectionContextManager

async def count_data(dsn):
    engine = await create_engine(dsn)
    async with ConnectionContextManager(engine) as conn:
        query = sa.select(...).count()
        return await conn.scalar(query)

loop = asyncio.get_event_loop()
counter = loop.run_until_complete(count_data('postgresql://...'))
print(counter)
loop.close()

async with

utils.py

class ConnectionContextManager(object):

    def __init__(self, engine):
        self.conn = None
        self.engine = engine

    async def __aenter__(self):
        self.conn = await self.engine.acquire()
        return self.conn

    async def __aexit__(self, exc_type, exc, tb):
        try:
            self.engine.release(self.conn)
        finally:
            self.conn = None
            self.engine = None

for row in (yield from ...):

import asyncio
import sqlalchemy as sa
from aiopg.sa import create_engine

@asyncio.coroutine
def fetch_data(dsn):
    data = []
    engine = yield from create_engine(dsn)
    with (yield from engine) as conn:
        result = yield from conn.execute(sa.select(...))
        for row in result:
            data.append(row)
    return data

loop = asyncio.get_event_loop()
data = loop.run_until_complete(fetch_data('postgresql://...'))
loop.close()

async for

import asyncio
import sqlalchemy as sa
from aiopg.sa import create_engine
from .utils import ConnectionContextManager, ResultIter

async def fetch_data(dsn):
    data = []
    engine = await create_engine(dsn)
    async with ConnectionContextManager(engine) as conn:
        async for row in ResultIter(await conn.execute(sa.select(...))):
            data.append(row)
    return data

loop = asyncio.get_event_loop()
data = loop.run_until_complete(fetch_data('postgresql://...'))
loop.close()

async for

utils.py

from aiopg.sa.exc import ResourceClosedError

class ResultIter(object):

    def __init__(self, result):
        self.result = result

    async def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            data = await self.result.fetchone()
        except ResourceClosedError:
            data = None
        if data:
            return data
        raise StopAsyncIteration

Other additions to standard library

  • @types.coroutine bridge between generator based and native coroutines
  • New __await__ magic method
  • New functions in inspect library as iscoroutine, isawaitable, etc
  • New abstract base classes: abc.Awaitable, abc.Coroutine, abc.AsyncIterable, abc.AsyncIterator

Conclusion on functions and methods

Method Can contain Can't contain
async def func await, return value yield, yield from
async def __a*__ await, return value yield, yield from
def __a*__ return awaitable await
def __await__ yield, yield from, return iterable await
generator yield, yield from, return value await

async/await in real life

Libraries

aiohttp

import asyncio
import ujson
from aiohttp import client

async def github_search(query):
    response = await client.get('https://api.github.com/search/repositories',
                                params={'q': query})
    return await response.json(loads=ujson.loads)

loop = asyncio.get_event_loop()
response = loop.run_until_complete(github_search('asyncio'))
print('\n'.join(repo['full_name'] for repo in response['items']))
loop.close()

aiohttp.web

import ujson
from aiohttp import web

async def api_index(request):
    return web.json_response({...}, dumps=ujson.dumps)

app = web.Application()
app.router.add_route('GET', '/api/', api_index)

gunicorn -k aiohttp.worker.GunicornWebWorker -w 9 -t 60 app:app

aiopg

import asyncio
from aiopg import create_pool

async def select_one(dsn):
    pool = await create_pool(dsn)
    with (await pool.cursor()) as cursor:
        await cursor.execute('SELECT 1')
        selected = await cursor.fetchone()
        assert selected == (1, )

loop = asyncio.get_event_loop()
loop.run_until_complete(select_one('dbname=... user=... password=... host=...'))
loop.close()

aiopg.sa

import asyncio
import sqlalchemy as sa
from aiopg.sa import create_engine
from .utils import ConnectionContextManager

async def select_one(dsn):
    engine = await create_engine(dsn)
    async with ConnectionContextManager(engine) as conn:
        await conn.execute(sa.select([sa.text('1')]))
        selected = await conn.fetchone()
        assert selected == (1, )

loop = asyncio.get_event_loop()
loop.run_until_complete(select_one('postgresql://...'))
loop.close()

aioredis

import asyncio
from aioredis import create_redis

async def redis_set_get_delete(address, **options):
    options.setdefault('encoding', 'utf-8')
    redis = await create_redis(address, **options)
    assert await redis.set('key', 'value') is True
    assert await redis.get('key') == 'value'
    assert await redis.delete('key') == 1

loop = asyncio.get_event_loop()
loop.run_until_complete(redis_set_get_delete(('localhost', 6379)))
loop.close()

And others

async/await in real life

Server-Sent Events

Server-Sent Events. Theory

  • Technology for receiving automatic updates from server via HTTP connection
  • Part of HTML5 by W3C
  • Supported in:
    • Google Chrome 6+
    • Mozilla Firefox 6.0+
    • Safari 5.0+
  • Spec is available at WhatWG

Server-Sent Events. When to use?

  • User notifications
  • Currency/weather changes
  • Sport game/score streaming
  • In common, message streaming

Server-Sent Events. Message Format

data: Hello, Lviv.py
<blank line>

event: lvivpy
data: Hello, Lviv.py
data: Nice to be here again!
<blank link>

id: 42
event: uapycon
data: PyCon Ukraine 2016 will held a place in Lviv at April 2016
<blank link>

Server-Sent Events. EventSource

const element = document.getElementById("notifications-container");
const source = new EventSource("/api/notifications");
source.onerror = err => {
  ...
};
source.addEventListener("lvivpy", evt => {
  element.innerHTML += `<code>#lvivpy</code> ${evt.data}<br>`;
});
source.addEventListener("uapycon", evt => {
  element.innerHTML += `<code>#uapycon</code> ${evt.data}<br>`;
});
...
source.close();

Task. NFL Livescore stream

  • NFL stands for National Football League
  • When there are live games, update the scores automagically for the user
  • Do not reconnect each X seconds for new scores
  • When new scores available - show them to user
  • This is the case for Server-Sent Events (SSE)

How to implement?

  1. Run script to fetch live scores and publish new scores to Redis PUBSUB
  2. Subscribe to this channel in livescore stream view, receive message and push it to client
  3. Subscribe to livescore stream in client and update scores when new message received
  4. ...
  5. PROFIT

Step 1. Fetching & publishing scores change

def main(*args):
    season, week = args

    loop = asyncio.get_event_loop()

    week_obj = loop.run_until_complete(fetch_week(season, week, False))
    week_data = [to_game_data(game) for game in week_obj.iterfind('gms/g')]

    redis = loop.run_until_complete(create_redis(('localhost', 6379)))
    loop.run_until_complete(redis.publish('CHANNEL', ujson.dumps(week_data)))

    loop.close()
    return False

if __name__ == '__main__':
    sys.exit(int(main(*sys.argv[1:])))

Step 2. Implementing livescore stream

async def livescore(request):
    if request.headers['Accept'] != 'text/event-stream':
        raise web.HTTPFound('/api/')

    response = web.StreamResponse(status=200, headers={
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
    })
    await response.prepare(request)

    ...

    return response

Step 2. Subscribing to scores update

    ...

    redis = await create_redis(('localhost', 6379))
    channel = (await redis.subscribe('CHANNEL'))[0]

    while (await channel.wait_message()):
        message = await channel.get()
        response.write(b'event: update\r\n')
        response.write(b'data: ' + message + b'\r\n\r\n')

    ...

Step 3. Registering stream to the app

from aiohttp import web

app = web.Application()
app.router.add_route('GET', '/api/livescore', livescore)

Step 4. Subscribing to livescore stream on client

import React, {Component} from "react";

export default App extends Component {
  liveScore = null;
  constructor(props) {
    super(props);
    this.state = {games: props.games};
  }
  componentDidMount() {
    this.liveScore = new EventSource("/api/");
    this.liveScore.addEventListener("update", evt => {
        this.setState({games: JSON.parse(evt.data)});
    });
  }
  componentWillUnmount() {
    this.liveScore.close();
  }
  render() {
    return <Games data={this.state.games} />;
  }
}

Step 5. Setting up the nginx

location /api/livescore {
    chunked_transfer_encoding off;
    proxy_buffering off;
    proxy_cache off;
    proxy_http_version 1.1;
    proxy_pass http://127.0.0.1:8000;
    proxy_set_header Connection '';
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}

Step 6. It works

EventSource works

async/await era

WebSockets

WebSockets. Theory

  • Protocol providing full-duplex communication channels over a single TCP connection
  • Still being standardized by W3C
  • Supported in:
    • Google Chrome 16+
    • Mozilla Firefox 11+
    • Safari 6+
    • Even IE 10+
  • Latest draft: RFC 6455

WebSockets. When to use?

  • Chat
  • Game updates
  • JS Live Reload

WebSockets. Server

import logging
from aiohttp import MsgType, web

logger = logging.getLogger()

async def websocket_server(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for message in ws:
        if message.tp == MsgType.text:
            if message.data == 'close':
                ws.close()
            else:
                logger.info('Message received {0}'.format(message.data))
                ws.send_str('Welcome, {0}'.format(message.data))
        elif message.tp == MsgType.error:
            logger.warning('WebSocket error', extra={'ws_exc': ws.exception()})

    logger.info('WebSocket connection closed')
    return ws

WebSocket. Server

app.router.add('GET', '/ws', websocket_server)

WebSockets. Python Client

from aiohttp import ClientSession

async def send_ws_message(message=None):
    session = ClientSession()
    async with session.ws_connect('ws://<server>/path') as ws:
        ws.send_str('Hello, {0}!'.format(message or 'world'))
        

WebSockets. Python Client

from aiohttp import ClientSession

async def ws_client(message=None):
    session = ClientSession()
    async with session.ws_connect('ws://<server>/path') as ws:
        async for msg in ws:
            ...
        

WebSockets. JS Client

const ws = new WebSocket('ws://<server>/path');

// Send message to WebSocket server
ws.send("Hello, world!");

// Receive message from WebSocket server
ws.onmessage = function(event) {
  console.log(event.data);
}

async/await era

Conclusion

The Future is Here

  • Python 3.5 released and ready to be used in production
  • Python 3.5 contains new async/await statements
  • asyncio stack got great addition, code now is simple to read and understand
  • asyncio stack (for web applications):
    • aiohttp – interact with remote API
    • aiohttp.web – create API backends
    • aiopg – interact with PostgreSQL (DB)
    • aioredis – interact with Redis (cache)

The Future is Here

  • asyncio stack is a great reason why you'll finally need to switch to Python 3
  • asyncio is good fit for:
    • Database-less API
    • Requesting remote API
    • Async code execution
    • Predictable async I/O

Questions?