Makefile
s via GitHub
webhooks
Registering projects
git clone
the repoTesting projects
make test
Deploying projects
make deploy
Configuring projects
COVERALLS_TOKEN
)
LEVEL
)DEBUG
if something doesn’t
work)
openapi: "3.0.2"
info:
...
paths:
...
components:
...
tags:
...
from apispec import APISpec
from marshmallow import fields, Schema
spec = APISpec(...)
class ConferenceSchema(Schema):
name = fields.Str(required=True)
Sync View
from rest_framework.decorators import api_view
from rest_framework.request import Request
from rest_framework.response import Response
from .models import Model
@api_view(['GET'])
def hello_world(request: Request) -> Response:
instance = Model.objects.get(...)
return Response(...)
Async View
from aiohttp import web
routes = web.RouteTableDef()
@routes.get('/')
async def hello_world(request: web.Request) -> web.Response:
async with request.app['db'].acquire() as conn:
instance = await conn.fetch(...)
return web.json_response(...)
aiohttp
mypy
config :)
rororo==2.0.0
What If?
openapi.yaml
operationId
to
use
request
instanceStep 1. Initialization (app.py
)
from pathlib import Path
from aiohttp import web
from rororo import setup_api
def create_app(argv: List[str] = None) -> web.Application:
app = web.Application()
setup_api(app, Path(__file__).parent / 'openapi.yaml')
return app
Step 2. Routing (views.py
)
from aiohttp import web
from rororo import RouteTableDef
routes = RouteTableDef(prefix="/api/repositories")
@routes.post("")
async def add_repository(request: web.Request) -> web.Response:
...
@routes.get("/{owner}/{name}")
async def retrieve_repository(request: web.Request) -> web.Response:
...
Step 3. Access the data (views.py
)
Request:
http POST /api/repositories \
Authorization:"Bearer {token}" X-GitHub-Username:{username} \
owner={owner} name={name}
Step 3. Access the data (views.py
)
Request Handler:
from rororo import openapi_context
@routes.post("")
async def add_repository(request: web.Request) -> web.Response:
with openapi_context(request) as context: # IMPORTANT: Sync context manager!
github_user = await github.authenticate(
context.parameters["X-GitHub-Username"], # Parameter from header
context.security["jwt"] # Security scheme
)
if await github.has_access(
github_user,
context.data["owner"], # Request body data
context.data["name"] # Request body data
):
...
In past rororo was:
Now rororo is a library for:
OpenAPI 3 schema support for aiohttp.web applications
aiohttp.client
for interacting with
it
github.py
from aiohttp import ClientSession
def session_context(personal_token: str) -> ClientSession:
return ClientSession(
headers={
"Authorization": f"Bearer {personal_token}",
"User-Agent": "YourUserAgent/1.0",
},
raise_for_status=True
)
github.py
from .constants import GITHUB_API_URL, GQL_VIEWER
async def authenticate(username: str, token: str) -> GitHubUser:
with session_context(token) as session:
response = await session.post(GITHUB_API_URL, json={"query": GQL_VIEWER})
response_data = await response.json()
viewer = GitHubUser(**response_data["data"]["viewer"])
if viewer.login != username:
raise InvalidCredentials()
return viewer
constants.py
GITHUB_API_URL = "https://api.github.com/graphql"
GQL_VIEWER = """
query GetViewer {
viewer {
id
login
name
url
}
}
"""
constants.py
GQL_REPOSITORY = """
query GetRepository($owner: String!, $name: String!) {
repository(owner: $owner, name: $name) {
id
name
description
}
}
"""
from .constants import GQL_REPOSITORY
async def has_access(user: GitHubUser, owner: str, name: str) -> bool:
with session_context(user.personal_token) as session:
response = await session.post(
GITHUB_API_URL,
json={
"query": GQL_REPOSITORY,
"variables": {
"owner": owner,
"name": name
}
}
)
return (await response.json()["data"]["repository"]) is not None
Clone the repo
git clone https://${username}:${personal_token}@github.com/${owner}/${name}.git
Update the repo?
git checkout ${branch}
git pull origin ${branch}
Important: Remove the repo after user access has been revoked!
asyncio.subprocess
python
at
/Users/playpauseandstop/Projects
import subprocess
assert subprocess.call(["pwd"]) == 0
assert subprocess.check_output(["pwd"]) == b"/Users/playpauseandstop/Projects"
result = subprocess.run(["pwd"])
assert result.returncode == 0
assert result.stdout == ""
result = subprocess.run(["pwd"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert result.stdbout == b"/Users/playpauseandstop/Projects""
subprocess
doesn’t fit?subprocess.run
blocks the request handlergunicorn
or
uwsgi
asyncio
library has a way to make any code
works async
import asyncio
import subprocess
from concurrent import futures
from subprocess import CompletedProcess
from typing import Any
async def subprocess_run(*args: Any, **kwargs: Any) -> CompletedProcess:
kwargs.setdefault("stdout", subprocess.PIPE)
kwargs.setdefault("stderr", subprocess.PIPE)
return asyncio.run_in_executor(None, subprocess.run, *args, **kwargs)
executor
instead of None
, how
to share it within the app?
shell.py
import asyncio
from asyncio.subprocess import PIPE, Process
async def run(cmd: str) -> Process:
return await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
views.py
from . import shell
@routes.post("")
async def add_repository(request: web.Request) -> web.Response:
...
proc = await shell.run(
f"git clone https://{username}:{personal_token}@{owner}/{name}.git "
f"{path_to_repo}"
)
_, stderr = awit proc.communicate()
return web.json_response(
{
"cloned": proc.returncode == 0,
"errors": stderr
}
)
Still searching for best workflow…
Makefile
SWAGGER_HOST ?= 0.0.0.0
SWAGGER_PORT ?= 8422
# Edit OpenAPI schema
swagger-editor:
docker run --rm -h $(SWAGGER_HOST) -p $(SWAGGER_PORT):8080 \
swaggerapi/swagger-editor:v3.6.36
# Test OpenAPI schema (requires project to be run as well)
swagger-ui:
docker run --rm -e URL="http://$(API_HOST):$(API_PORT)/api/openapi.json" \
-h $(SWAGGER_HOST) -p $(SWAGGER_PORT):8080 swaggerapi/swagger-ui:v3.23.11
Step 1. Add openapi-spec-validator to dev requiremements
poetry add -D openapi-spec-validator
Step 2. test_openapi.py
from openapi_spec_validator import validate_spec
from rororo import get_openapi_schema
from api.app import create_app
def test_openapi_schema():
validate_spec(get_openapi_schema(create_app()))
Step 3 (Optional). Validate openapi.json
/
openapi.yaml
response
shell.run
Avoid passing entire os.environ
as:
await asyncio.create_subprocess_shell(cmd, env=os.environ)
Instead filter out virtual environment env vars,
IGNORE_ENV_KEYS = {"POETRY", "VIRTUAL_ENV"}
await asyncio.create_subprocess_shell(
cmd,
env={
key: value
for key, value in os.environ.items()
if key not in IGNORE_ENV_KEYS
}
)
shell.run
actually "blocks" the request handlerasync def add_repository(request: web.Request) -> web.Response:
...
proc = await shell.run("...")
...
git clone
to finish
git clone
as with subprocess.run
git clone
asyncio.create_task
for rescue!import asyncio
async def add_repository(request: web.Request) -> web.Response:
...
# Create the task and not await on its result
asyncio.create_task(shell.run("..."))
# Which means status is in progress instead of done
return web.json_response({"status": "cloning"})
Step 1. Ensure all non-idempotent methods are safe against
CancelledError
s
from aiohttp_middlewares import NON_IDEMPOTENT_METHODS, shield_middleware
def create_app(argv: List[str] = None) -> web.Application:
app = web.Application(middlewares=(
shield_middleware(methods=NON_IDEMPOTENT_METHODS),
))
...
Step 2. Ensure the application exited only after last job is completed!
On start job:
request.app["jobs"].append(job.uid)
On finishing (canceling) job:
job_context.app["jobs"].remove(job.uid)
Step 2. Ensure the application exited only after last job is completed!
app.on_shutdown
signal:
from async_timeout import timeout
async def wait_for_empty_jobs(app: web.Application) -> None:
with timeout(18000): # 30 minutes should be enough to finish all the jobs
if not app["jobs"]:
return
await asyncio.sleep(15.)
Unit Tests
async def test_shell_run():
proc = await shell.run("echo 'Hello, world!'")
stdout, stderr = await proc.communicate()
assert proc.returncode == 0
assert stdout == b"Hello, world!\n"
assert stderr == b""
Mock GitHub Requests with aioresponses
import asyncio
from aioresponses import aioresponses
def test_has_not_access(github_user_factory):
loop = asyncio.get_event_loop()
with aioresponses() as mocked:
mocked.post(GITHUB_API_URL, payload={"data": {"repository": None}})
assert loop.run_until_complete(
github.has_access(github_user_factory(), "fake-owner", "fake-name")
) is False
Integrational Tests
conftest.py
def pytest_configure(config):
config.addinivalue_line("markers", "integrational: mark test as an integrational.")
api/repositories/tests/test_integrational.py
@pytest.mark.integrational
async def test_add_repository():
...
To run:
poetry run python -m pytest -m integrational
Before:
PORT
proxy_pass
PORT
After:
PORT_X
systemctl stop hobotnica-X &
)
old service on PORT_X
PORT_Y
proxy_pass
PORT_Y
asyncio
for years now