graphql-core
, graphene
, and
Hiku
graphql-core
for parsing/resolving GraphQL
queries
graphene
as GraphQL framework
graphene-django
for Django integration
graphene-sqlalchemy
for SQLAlchemy integration
flask-graphql
import graphql
GRAPHQL_CORE_SCHEMA = graphql.GraphQLSchema(
query=graphql.GraphQLObjectType('Query', lambda: {
'hello': graphql.GraphQLField(
graphql.GraphQLString,
resolver=lambda *_: 'Hello, world!',
),
},
)
GRAPHQL_CORE_TEAM = graphql.GraphQLObjectType('Team', lambda: {
'slug': graphql.GraphQLField(
graphql.GraphQLString,
resolver=core_resolve_attr()),
'name': graphql.GraphQLField(
graphql.GraphQLString,
resolver=core_resolve_attr()),
'originalName': graphql.GraphQLField(
graphql.GraphQLString,
resolver=core_resolve_attr('original_name'),
),
})
GRAPHQL_CORE_SCHEMA = graphql.GraphQLSchema(
query=graphql.GraphQLObjectType('Query', lambda: {
...
'allTeams': graphql.GraphQLField(
graphql.GraphQLList(GRAPHQL_CORE_TEAM),
resolver=core_resolve_teams,
),
},
)
def core_resolve_attr(name=None):
def resolver(source, args, context, info):
return source[name or info.field_name]
return resolver
async def core_resolve_teams(source, args, context, info):
async with context['db'].acquire() as conn:
return await conn.fetch('TABLE teams')
from aiohttp import web
from graphql.execution.executors.asyncio import AsyncioExecutor
async def graphql_core(request: web.Request) -> web.Response:
query = await parse_graphql_query(request)
result = await graphql(
GRAPHQL_CORE_SCHEMA,
query.query,
context_value={'db': request.app['db']},
executor=AsyncioExecutor(loop=request.app.loop),
variable_values=query.variables,
operation_name=query.operation_name,
return_promise=True)
status = 400 if bool(result.errors) else 200
return web.json_response(result, status=status)
from typing import NamedTuple, Optional
class GraphQLQuery(NamedTuple):
query: str
variables: Optional[dict]
operationName: Optional[str]
async def parse_graphql_query(request: web.Request) -> GraphQLQuery:
if request.method == 'GET':
return GraphQLQuery(request.rel_url.query['query'], None, None)
if request.headers['Content-Type'] == 'application/graphql':
return GraphQLQuery(await request.text(), None, None)
data = await request.json()
return GraphQLQuery(
data['query'],
data.get('variables'),
data.get('operationName'))
...
'allTweets': graphql.GraphQLField(
graphql.GraphQLList(GRAPHQL_CORE_TWEET),
{
'limit': graphql.GraphQLArgument(graphql.GraphQLInt, 'ALL'),
'offset': graphql.GraphQLArgument(graphql.GraphQLInt, 0),
},
resolver=core_resolve_tweets,
),
...
async def core_resolve_tweets(source, args, context, info):
sql = f"""
SELECT *
FROM tweets
ORDER BY created_at DESC
LIMIT {args['limit']}
OFFSET {args['offset']}
"""
async with context['db'].acquire() as conn:
return await conn.fetch(sql)
import graphene
class GrapheneQuery(graphene.ObjectType):
hello = graphene.String()
allTeams = graphene.List(GrapheneTeam)
allTweets = graphene.Field(
graphene.List(GrapheneTweet),
limit=graphene.Argument(graphene.Int, 'ALL'),
offset=graphene.Argument(graphene.String, 0))
...
def resolve_hello(self, args, context, info):
return core_resolve_hello(self, args, context, info)
async def resolve_allTeams(self, args, context, info):
rows = await core_resolve_teams(self, args, context, info)
return [
GrapheneTeam(slug=row['slug'],
name=row['name'],
originalName=row['original_name'])
for row in rows]
async def resolve_allTweets(self, args, context, info):
rows = await core_resolve_tweets(self, args, context, info)
return [
GrapheneTweet(id=row['id'], text=row['text'], url=row['url'])
for row in rows]
GRAPHENE_SCHEMA = graphene.Schema(GrapheneQuery)
from hiku import graph as hiku
async def hiku_resolve_hello(param):
return ['Hello, world!']
HIKU_ROOT = hiku.Root([
hiku.Field('hello', hiku_types.String, hiku_resolve_hello),
])
HIKU_LOW_LEVEL_GRAPH = hiku.Graph([
HIKU_ROOT,
])
from ..models import teams_table
HIKU_DB_ENGINE_KEY = 'db'
HIKU_TEAM_SRC = hiku_asyncpg.FieldsQuery(HIKU_DB_ENGINE_KEY, teams_table)
@hiku_pass_context
async def hiku_resolve_teams(ctx: dict) -> List[str]:
"""Query database to fetch all existing teams."""
async with ctx['db'].acquire() as conn:
result = await conn.fetch('SELECT slug FROM teams')
return [row['slug'] for row in result]
HIKU_ROOT = [
...,
hiku.Link(
'allTeams',
hiku_types.Sequence[hiku_types.TypeRef['team']],
hiku_resolve_teams,
requires=None),
]
from hiku.readers.graphql import read
from hiku.result import denormalize
async def hiku(request: web.Request) -> web.Response:
graph = HIKU_LOW_LEVEL_GRAPH
hiku_engine = request.app['hiku_engine']
try:
query = await parse_graphql_query(request)
hiku_query = read(query.query)
result = await hiku_engine.execute(
graph,
hiku_query,
ctx={HIKU_DB_ENGINE_KEY: request.app['db']})
data = denormalize(graph, result, hiku_query)
except Exception as err:
return web.json_response({
'data': None,
'errors': [str(err)],
}, status=400)
return web.json_response({'data': data})
from hiku.engine import Engine as HikuEngine
from hiku.executors.asyncio import AsyncIOExecutor
async def init_hiku(app: web.Application) -> None:
"""Instantiate Hiku engine."""
app['hiku_engine'] = HikuEngine(AsyncIOExecutor(app.loop))
HIKU_TEAM_SG = SubGraph(HIKU_LOW_LEVEL_GRAPH, 'team')
HIKU_TWEET_SG = SubGraph(HIKU_LOW_LEVEL_GRAPH, 'tweet')
HIKU_HIGH_LEVEL_GRAPH = hiku.Graph([
hiku.Node('team', [
Expr('slug', HIKU_TEAM_SG, S.this.slug),
Expr('name', HIKU_TEAM_SG, S.this.name),
Expr('originalName', HIKU_TEAM_SG, S.this.original_name),
]),
hiku.Node('tweet', [
Expr('id', HIKU_TWEET_SG, S.this.id),
Expr('text', HIKU_TWEET_SG, S.this.text),
Expr('url', HIKU_TWEET_SG, S.this.url),
]),
HIKU_ROOT,
])
from hiku.readers.graphql import read
from hiku.result import denormalize
async def hiku(request: web.Request) -> web.Response:
graph = HIKU_HIGH_LEVEL_GRAPH
hiku_engine = request.app['hiku_engine']
try:
query = await parse_graphql_query(request)
hiku_query = read(query.query)
result = await hiku_engine.execute(
graph,
hiku_query,
ctx={HIKU_DB_ENGINE_KEY: request.app['db']})
data = denormalize(graph, result, hiku_query)
except Exception as err:
return web.json_response({
'data': None,
'errors': [str(err)],
}, status=400)
return web.json_response({'data': data})
fetch
, appollo
, and
Relay
fetch('/path/to/grapqhl', {
method: 'POST',
credentials: 'same-origin',
body: JSON.stringify({
'query': ...
})
})
.then(...)
import React, { Component, PropTypes } from 'react';
import { gql, graphql } from 'react-apollo';
class Profile extends Component { ... }
// We use the gql tag to parse our query string into a query document
const CurrentUserForLayout = gql`
query CurrentUserForLayout {
currentUser {
login
avatar_url
}
}
`;
const ProfileWithData = graphql(CurrentUserForLayout)(Profile);
export default Relay.createContainer(
ListPage,
{
fragments: {
viewer: () => Relay.QL`
fragment on Viewer {
id
}
`,
},
},
);