GraphQL client for Python that enables developers to execute GraphQL queries, mutations, and subscriptions using multiple transport protocols including HTTP, WebSockets, and local schemas with support for both synchronous and asynchronous usage patterns
—
Core client functionality for executing GraphQL operations, managing connections, and handling schemas. Provides both synchronous and asynchronous execution patterns with session management for connection pooling and batched operations.
The main entrypoint for executing GraphQL requests on a transport. Provides both synchronous and asynchronous execution methods with automatic schema management and result parsing.
class Client:
def __init__(
self,
transport: Optional[Union[Transport, AsyncTransport]] = None,
fetch_schema_from_transport: bool = False,
schema: Optional[GraphQLSchema] = None,
parse_results: bool = False,
introspection: Optional[Dict[str, Any]] = None,
type_def: Optional[str] = None,
execute_timeout: Optional[Union[int, float]] = 10
):
"""
Initialize GraphQL client.
Args:
transport: Transport to use for GraphQL operations
fetch_schema_from_transport: Automatically fetch schema via introspection
schema: Pre-built GraphQL schema
parse_results: Parse results using schema for custom scalars/enums
introspection: Pre-fetched introspection result
type_def: Schema definition string
execute_timeout: Default timeout for operations in seconds
"""
def execute(
self,
request: GraphQLRequest,
**kwargs
) -> ExecutionResult:
"""
Execute GraphQL request synchronously.
Args:
request: GraphQL request to execute
**kwargs: Additional arguments passed to transport
Returns:
ExecutionResult with data, errors, and extensions
Raises:
TransportError: If transport operation fails
GraphQLError: If GraphQL validation fails
"""
async def execute_async(
self,
request: GraphQLRequest,
**kwargs
) -> ExecutionResult:
"""
Execute GraphQL request asynchronously.
Args:
request: GraphQL request to execute
**kwargs: Additional arguments passed to transport
Returns:
ExecutionResult with data, errors, and extensions
"""
def session(self, **kwargs) -> SyncClientSession:
"""
Create synchronous session for batched operations.
Args:
**kwargs: Additional arguments for session configuration
Returns:
SyncClientSession context manager
"""
async def connect_async(self, **kwargs) -> AsyncClientSession:
"""
Create asynchronous session for batched operations.
Args:
**kwargs: Additional arguments for session configuration
Returns:
AsyncClientSession context manager
"""
def get_schema(self) -> GraphQLSchema:
"""
Get the GraphQL schema.
Returns:
GraphQLSchema object
Raises:
Exception: If no schema is available
"""
def close(self) -> None:
"""Close synchronous client and transport."""
async def close_async(self) -> None:
"""Close asynchronous client and transport."""Create GraphQL requests from query strings with variable support and operation naming.
def gql(request_string: str) -> GraphQLRequest:
"""
Parse GraphQL query string into GraphQLRequest object.
Args:
request_string: GraphQL query, mutation, or subscription as string
Returns:
GraphQLRequest object ready for execution
Raises:
GraphQLError: If query string has syntax errors
"""
class GraphQLRequest:
def __init__(
self,
request: Union[DocumentNode, "GraphQLRequest", str],
*,
variable_values: Optional[Dict[str, Any]] = None,
operation_name: Optional[str] = None
):
"""
Initialize GraphQL request.
Args:
request: GraphQL query as string, DocumentNode, or existing GraphQLRequest
variable_values: Variables for the GraphQL operation
operation_name: Operation name for multi-operation documents
Raises:
GraphQLError: If request has syntax errors
TypeError: If request type is invalid
"""
def serialize_variable_values(self, schema: GraphQLSchema) -> GraphQLRequest:
"""
Serialize variable values using schema type information.
Args:
schema: GraphQL schema for type information
Returns:
New GraphQLRequest with serialized variables
"""
@property
def payload(self) -> Dict[str, Any]:
"""
Get JSON payload representation of the request.
Returns:
Dictionary with 'query', 'variables', and 'operationName' keys
"""
# Properties
document: DocumentNode # Parsed GraphQL document
variable_values: Optional[Dict[str, Any]] # Query variables
operation_name: Optional[str] # Operation nameSynchronous session management for batched operations and connection reuse.
class SyncClientSession:
def execute(
self,
request: GraphQLRequest,
**kwargs
) -> ExecutionResult:
"""
Execute GraphQL request in session context.
Args:
request: GraphQL request to execute
**kwargs: Additional arguments for execution
Returns:
ExecutionResult with data, errors, and extensions
"""
def execute_batch(
self,
requests: List[GraphQLRequest],
**kwargs
) -> List[ExecutionResult]:
"""
Execute multiple GraphQL requests in a batch.
Args:
requests: List of GraphQL requests to execute
**kwargs: Additional arguments for batch execution
Returns:
List of ExecutionResult objects
"""
def close(self) -> None:
"""Close the session and transport connection."""
# Context manager support
def __enter__(self) -> SyncClientSession: ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...Asynchronous session management with subscription support and automatic reconnection capabilities.
class AsyncClientSession:
async def execute(
self,
request: GraphQLRequest,
**kwargs
) -> ExecutionResult:
"""
Execute GraphQL request asynchronously in session context.
Args:
request: GraphQL request to execute
**kwargs: Additional arguments for execution
Returns:
ExecutionResult with data, errors, and extensions
"""
async def execute_batch(
self,
requests: List[GraphQLRequest],
**kwargs
) -> List[ExecutionResult]:
"""
Execute multiple GraphQL requests in a batch asynchronously.
Args:
requests: List of GraphQL requests to execute
**kwargs: Additional arguments for batch execution
Returns:
List of ExecutionResult objects
"""
def subscribe(
self,
request: GraphQLRequest,
**kwargs
) -> AsyncGenerator[ExecutionResult, None]:
"""
Subscribe to GraphQL subscription.
Args:
request: GraphQL subscription request
**kwargs: Additional arguments for subscription
Yields:
ExecutionResult objects as they arrive
Raises:
TransportError: If subscription setup fails
"""
async def close(self) -> None:
"""Close the async session and transport connection."""
# Async context manager support
async def __aenter__(self) -> AsyncClientSession: ...
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
class ReconnectingAsyncClientSession(AsyncClientSession):
"""
Async session with automatic reconnection on connection failures.
Inherits all methods from AsyncClientSession with added reconnection logic.
"""from gql import gql, Client
from gql.transport.requests import RequestsHTTPTransport
# Create client with automatic schema fetching
transport = RequestsHTTPTransport(url="https://api.example.com/graphql")
client = Client(transport=transport, fetch_schema_from_transport=True)
# Execute query
query = gql('''
query GetUsers($limit: Int!) {
users(limit: $limit) {
id
name
email
}
}
''')
result = client.execute(query, variable_values={"limit": 10})
users = result["users"]
# Close client when done
client.close()from gql import gql, Client
from gql.transport.requests import RequestsHTTPTransport
transport = RequestsHTTPTransport(url="https://api.example.com/graphql")
client = Client(transport=transport)
# Using synchronous session
with client.session() as session:
query1 = gql('{ user(id: "1") { name } }')
query2 = gql('{ user(id: "2") { name } }')
# Execute individually
result1 = session.execute(query1)
result2 = session.execute(query2)
# Or execute as batch
results = session.execute_batch([query1, query2])import asyncio
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
async def main():
transport = AIOHTTPTransport(url="https://api.example.com/graphql")
client = Client(transport=transport)
# Using async context manager
async with client.connect_async() as session:
query = gql('{ hello }')
result = await session.execute(query)
print(result)
# Batch execution
queries = [gql('{ user1: user(id: "1") { name } }'),
gql('{ user2: user(id: "2") { name } }')]
results = await session.execute_batch(queries)
await client.close_async()
asyncio.run(main())import asyncio
from gql import gql, Client
from gql.transport.websockets import WebsocketsTransport
async def handle_subscriptions():
transport = WebsocketsTransport(url="wss://api.example.com/graphql")
client = Client(transport=transport)
async with client.connect_async() as session:
subscription = gql('''
subscription {
messageAdded {
id
content
user {
name
}
}
}
''')
# Handle subscription messages
async for result in session.subscribe(subscription):
if result.data:
message = result.data["messageAdded"]
print(f"New message from {message['user']['name']}: {message['content']}")
if result.errors:
print(f"Subscription error: {result.errors}")
break
asyncio.run(handle_subscriptions())from gql import gql, Client
from gql.transport.requests import RequestsHTTPTransport
from gql.utilities import update_schema_scalar
from graphql import GraphQLScalarType
# Custom scalar for datetime handling
datetime_scalar = GraphQLScalarType(
name="DateTime",
serialize=lambda dt: dt.isoformat(),
parse_value=lambda value: datetime.fromisoformat(value),
parse_literal=lambda ast: datetime.fromisoformat(ast.value)
)
transport = RequestsHTTPTransport(url="https://api.example.com/graphql")
client = Client(
transport=transport,
fetch_schema_from_transport=True,
parse_results=True # Enable result parsing with custom scalars
)
# Update schema with custom scalar
update_schema_scalar(client.schema, "DateTime", datetime_scalar)
# Queries will now automatically parse DateTime fields
query = gql('{ posts { title createdAt } }')
result = client.execute(query)
# result["posts"][0]["createdAt"] is now a datetime objectInstall with Tessl CLI
npx tessl i tessl/pypi-gql