Ariadne is a Python library for implementing GraphQL servers using a schema-first approach.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Functions for executing GraphQL queries, mutations, and subscriptions with support for both synchronous and asynchronous execution contexts.
Executes GraphQL operations asynchronously, supporting async resolvers and I/O operations.
async def graphql(
schema: GraphQLSchema,
data: dict,
*,
context_value: Optional[Any] = None,
root_value: Optional[Any] = None,
debug: bool = False,
validation_rules: Optional[list] = None,
require_query: bool = False,
error_formatter: Optional[Callable] = None,
extensions: Optional[list[Extension]] = None,
middleware: Optional[list] = None,
**kwargs
) -> dict:
"""
Execute GraphQL operation asynchronously.
Parameters:
- schema: Executable GraphQL schema
- data: Dict containing 'query', optional 'variables', and 'operationName'
- context_value: Context object passed to all resolvers
- root_value: Root value passed to root resolvers
- debug: Enable debug mode for detailed error information
- validation_rules: Custom validation rules for the query
- require_query: Require 'query' key in data dict
- error_formatter: Custom error formatting function
- extensions: List of GraphQL extensions to apply
- middleware: List of middleware functions
Returns:
Dict with 'data' key and optional 'errors' key containing execution results
"""Executes GraphQL operations synchronously, suitable for non-async environments.
def graphql_sync(
schema: GraphQLSchema,
data: dict,
*,
context_value: Optional[Any] = None,
root_value: Optional[Any] = None,
debug: bool = False,
validation_rules: Optional[list] = None,
require_query: bool = False,
error_formatter: Optional[Callable] = None,
extensions: Optional[list[Extension]] = None,
middleware: Optional[list] = None,
**kwargs
) -> dict:
"""
Execute GraphQL operation synchronously.
Parameters:
- schema: Executable GraphQL schema
- data: Dict containing 'query', optional 'variables', and 'operationName'
- context_value: Context object passed to all resolvers
- root_value: Root value passed to root resolvers
- debug: Enable debug mode for detailed error information
- validation_rules: Custom validation rules for the query
- require_query: Require 'query' key in data dict
- error_formatter: Custom error formatting function
- extensions: List of GraphQL extensions to apply
- middleware: List of middleware functions
Returns:
Dict with 'data' key and optional 'errors' key containing execution results
"""Executes GraphQL subscriptions, returning an async iterator for real-time data streaming.
async def subscribe(
schema: GraphQLSchema,
data: dict,
*,
context_value: Optional[Any] = None,
root_value: Optional[Any] = None,
debug: bool = False,
validation_rules: Optional[list] = None,
require_query: bool = False,
error_formatter: Optional[Callable] = None,
extensions: Optional[list[Extension]] = None,
middleware: Optional[list] = None,
**kwargs
):
"""
Execute GraphQL subscription returning async iterator.
Parameters:
- schema: Executable GraphQL schema
- data: Dict containing subscription query, optional 'variables', and 'operationName'
- context_value: Context object passed to all resolvers
- root_value: Root value passed to root resolvers
- debug: Enable debug mode for detailed error information
- validation_rules: Custom validation rules for the query
- require_query: Require 'query' key in data dict
- error_formatter: Custom error formatting function
- extensions: List of GraphQL extensions to apply
- middleware: List of middleware functions
Returns:
AsyncIterator yielding subscription results or single error dict
"""import asyncio
from ariadne import graphql
async def execute_query():
query_data = {
"query": """
query GetUser($id: ID!) {
user(id: $id) {
id
name
email
}
}
""",
"variables": {"id": "123"}
}
result = await graphql(schema, query_data)
if result.get("errors"):
print("Query errors:", result["errors"])
else:
print("Query result:", result["data"])
# Run the async function
asyncio.run(execute_query())from ariadne import graphql_sync
query_data = {
"query": """
query {
hello
}
"""
}
result = graphql_sync(schema, query_data)
print(result) # {"data": {"hello": "Hello, World!"}}import asyncio
from ariadne import subscribe
async def handle_subscription():
subscription_data = {
"query": """
subscription {
messageAdded {
id
content
author
}
}
"""
}
async for result in await subscribe(schema, subscription_data):
if result.get("errors"):
print("Subscription error:", result["errors"])
break
else:
print("New message:", result["data"])
asyncio.run(handle_subscription())from ariadne import graphql
# Define context with user information
context = {
"user": {"id": "123", "role": "admin"},
"request": request_object
}
result = await graphql(
schema,
query_data,
context_value=context,
debug=True # Enable debug mode for development
)from ariadne import graphql
def custom_error_formatter(error, debug=False):
formatted = {"message": str(error)}
if debug:
formatted["locations"] = error.locations
formatted["path"] = error.path
return formatted
result = await graphql(
schema,
query_data,
error_formatter=custom_error_formatter,
debug=True
)Install with Tessl CLI
npx tessl i tessl/pypi-ariadne