GraphQL client for Python that enables developers to execute GraphQL queries, mutations, and subscriptions using multiple transport protocols including HTTP, WebSockets, and local schemas with support for both synchronous and asynchronous usage patterns
—
Comprehensive transport implementations for HTTP, WebSocket, and local schema operations. Includes synchronous and asynchronous variants with protocol-specific optimizations, authentication support, and file upload capabilities.
HTTP-based transports for GraphQL operations using popular Python HTTP clients. Support synchronous and asynchronous execution, file uploads, request batching, and comprehensive configuration options.
Synchronous HTTP transport using the requests library with extensive configuration options and file upload support.
class RequestsHTTPTransport(Transport):
def __init__(
self,
url: str,
headers: Optional[Dict[str, Any]] = None,
cookies: Optional[Union[Dict[str, Any], RequestsCookieJar]] = None,
auth: Optional[AuthBase] = None,
use_json: bool = True,
timeout: Optional[int] = None,
verify: Union[bool, str] = True,
retries: int = 0,
method: str = "POST",
retry_backoff_factor: float = 0.1,
retry_status_forcelist: Collection[int] = None,
json_serialize: Callable = json.dumps,
json_deserialize: Callable = json.loads
):
"""
Initialize requests-based HTTP transport.
Args:
url: GraphQL server endpoint URL
headers: HTTP headers to send with requests
cookies: HTTP cookies for requests
auth: requests authentication object
use_json: Send requests as JSON vs form-encoded
timeout: Request timeout in seconds
verify: SSL certificate verification (bool or CA bundle path)
retries: Number of retry attempts on failure
method: HTTP method to use (POST, GET)
retry_backoff_factor: Backoff multiplier for retries
retry_status_forcelist: HTTP status codes to retry
json_serialize: JSON serialization function
json_deserialize: JSON deserialization function
"""
def execute(
self,
request: GraphQLRequest,
timeout: Optional[int] = None,
extra_args: Optional[Dict] = None,
upload_files: bool = False
) -> ExecutionResult:
"""
Execute GraphQL request via HTTP.
Args:
request: GraphQL request to execute
timeout: Override default timeout
extra_args: Additional arguments for requests
upload_files: Enable multipart file upload support
Returns:
ExecutionResult with response data
"""
def execute_batch(
self,
reqs: List[GraphQLRequest],
timeout: Optional[int] = None,
extra_args: Optional[Dict] = None
) -> List[ExecutionResult]:
"""Execute multiple requests in a single HTTP call."""
# Properties
response_headers: Optional[CaseInsensitiveDict[str]] # Last response headersHTTP transports using the httpx library with both synchronous and asynchronous variants.
class HTTPXTransport(Transport):
def __init__(
self,
url: Union[str, httpx.URL],
json_serialize: Callable = json.dumps,
json_deserialize: Callable = json.loads,
**kwargs
):
"""
Initialize httpx-based synchronous HTTP transport.
Args:
url: GraphQL server endpoint URL
json_serialize: JSON serialization function
json_deserialize: JSON deserialization function
**kwargs: Additional httpx client parameters
"""
def execute(
self,
request: GraphQLRequest,
extra_args: Optional[Dict] = None,
upload_files: bool = False
) -> ExecutionResult: ...
# Properties
response_headers: Optional[httpx.Headers] # Last response headers
class HTTPXAsyncTransport(AsyncTransport):
def __init__(
self,
url: Union[str, httpx.URL],
json_serialize: Callable = json.dumps,
json_deserialize: Callable = json.loads,
**kwargs
):
"""Initialize httpx-based asynchronous HTTP transport."""
async def execute(
self,
request: GraphQLRequest,
extra_args: Optional[Dict] = None,
upload_files: bool = False
) -> ExecutionResult: ...
async def execute_batch(
self,
reqs: List[GraphQLRequest],
extra_args: Optional[Dict] = None
) -> List[ExecutionResult]: ...
def subscribe(self, request: GraphQLRequest) -> AsyncGenerator[ExecutionResult, None]:
"""Raises NotImplementedError - HTTP doesn't support subscriptions."""Asynchronous HTTP transport using aiohttp with comprehensive configuration and streaming file upload support.
class AIOHTTPTransport(AsyncTransport):
def __init__(
self,
url: str,
headers: Optional[LooseHeaders] = None,
cookies: Optional[LooseCookies] = None,
auth: Optional[Union[BasicAuth, AppSyncAuthentication]] = None,
ssl: Union[SSLContext, bool, Fingerprint] = True,
timeout: Optional[int] = None,
ssl_close_timeout: Optional[Union[int, float]] = 10,
json_serialize: Callable = json.dumps,
json_deserialize: Callable = json.loads,
client_session_args: Optional[Dict[str, Any]] = None
):
"""
Initialize aiohttp-based HTTP transport.
Args:
url: GraphQL server endpoint URL
headers: HTTP headers for requests
cookies: HTTP cookies for requests
auth: Authentication (BasicAuth or AppSync)
ssl: SSL context or verification settings
timeout: Request timeout in seconds
ssl_close_timeout: SSL connection close timeout
json_serialize: JSON serialization function
json_deserialize: JSON deserialization function
client_session_args: Extra aiohttp ClientSession arguments
"""
async def execute(
self,
request: GraphQLRequest,
extra_args: Optional[Dict] = None,
upload_files: bool = False
) -> ExecutionResult:
"""
Execute GraphQL request asynchronously.
Supports streaming file uploads using aiohttp.StreamReader
and AsyncGenerator file types.
"""
# Properties
response_headers: Optional[CIMultiDictProxy[str]] # Last response headersWebSocket-based transports for GraphQL subscriptions and real-time operations. Support multiple protocols and authentication methods.
Primary WebSocket transport using the websockets library with support for multiple GraphQL WebSocket protocols.
class WebsocketsTransport(AsyncTransport):
def __init__(
self,
url: str,
headers: Optional[HeadersLike] = None,
ssl: Union[SSLContext, bool] = False,
init_payload: Optional[Dict[str, Any]] = None,
connect_timeout: Optional[Union[int, float]] = 10,
close_timeout: Optional[Union[int, float]] = 10,
ack_timeout: Optional[Union[int, float]] = 10,
keep_alive_timeout: Optional[Union[int, float]] = None,
ping_interval: Optional[Union[int, float]] = None,
pong_timeout: Optional[Union[int, float]] = None,
answer_pings: bool = True,
connect_args: Optional[Dict[str, Any]] = None,
subprotocols: Optional[List[str]] = None
):
"""
Initialize WebSocket transport for GraphQL subscriptions.
Args:
url: WebSocket server URL (wss://example.com/graphql)
headers: HTTP headers for WebSocket handshake
ssl: SSL context or verification settings
init_payload: Connection initialization payload
connect_timeout: Connection establishment timeout
close_timeout: Connection close timeout
ack_timeout: Acknowledgment timeout for operations
keep_alive_timeout: Keep-alive message timeout
ping_interval: Ping interval for graphql-ws protocol
pong_timeout: Pong response timeout
answer_pings: Whether to respond to server pings
connect_args: Additional websockets.connect arguments
subprotocols: WebSocket subprotocols to negotiate
"""
async def execute(self, request: GraphQLRequest) -> ExecutionResult:
"""Execute single GraphQL operation over WebSocket."""
def subscribe(
self,
request: GraphQLRequest,
send_stop: bool = True
) -> AsyncGenerator[ExecutionResult, None]:
"""
Subscribe to GraphQL subscription.
Args:
request: GraphQL subscription request
send_stop: Send stop message when subscription ends
Yields:
ExecutionResult objects as they arrive from server
"""
async def send_ping(self, payload: Optional[Any] = None) -> None:
"""Send ping message (graphql-ws protocol only)."""
async def send_pong(self, payload: Optional[Any] = None) -> None:
"""Send pong message (graphql-ws protocol only)."""
# Properties
response_headers: Dict[str, str] # WebSocket handshake response headers
url: str # Connection URL
headers: Optional[HeadersLike] # Connection headers
ssl: Union[SSLContext, bool] # SSL configurationSupported Subprotocols:
graphql-ws - Apollo GraphQL WebSocket protocolgraphql-transport-ws - GraphQL WS protocolWebSocket transport using aiohttp's WebSocket client with additional configuration options.
class AIOHTTPWebsocketsTransport(AsyncTransport):
def __init__(
self,
url: StrOrURL,
subprotocols: Optional[List[str]] = None,
heartbeat: Optional[float] = None,
auth: Optional[BasicAuth] = None,
origin: Optional[str] = None,
params: Optional[Mapping[str, str]] = None,
headers: Optional[LooseHeaders] = None,
proxy: Optional[StrOrURL] = None,
proxy_auth: Optional[BasicAuth] = None,
proxy_headers: Optional[LooseHeaders] = None,
ssl: Optional[Union[SSLContext, Literal[False], Fingerprint]] = None,
websocket_close_timeout: float = 10.0,
receive_timeout: Optional[float] = None,
ssl_close_timeout: Optional[Union[int, float]] = 10,
session: Optional[ClientSession] = None,
client_session_args: Optional[Dict[str, Any]] = None,
connect_args: Optional[Dict[str, Any]] = None,
# Plus all WebSocket protocol parameters
**kwargs
):
"""
Initialize aiohttp-based WebSocket transport.
Args:
url: WebSocket server URL
subprotocols: WebSocket subprotocols to negotiate
heartbeat: Low-level ping heartbeat interval
auth: Basic authentication for connection
origin: Origin header for WebSocket handshake
params: Query parameters for connection URL
headers: HTTP headers for handshake
proxy: Proxy server URL
proxy_auth: Proxy authentication
proxy_headers: Proxy-specific headers
ssl: SSL configuration
websocket_close_timeout: WebSocket close timeout
receive_timeout: Message receive timeout
ssl_close_timeout: SSL close timeout
session: Existing aiohttp ClientSession to use
client_session_args: ClientSession creation arguments
connect_args: WebSocket connection arguments
"""Specialized transport for Phoenix Framework Absinthe GraphQL servers using Phoenix Channel protocol.
class PhoenixChannelWebsocketsTransport(AsyncTransport):
def __init__(
self,
url: str,
channel_name: str = "__absinthe__:control",
heartbeat_interval: float = 30,
ack_timeout: Optional[Union[int, float]] = 10,
# Plus WebSocket connection parameters
**kwargs
):
"""
Initialize Phoenix Channel WebSocket transport.
Args:
url: Phoenix server URL
channel_name: Phoenix channel name for GraphQL operations
heartbeat_interval: Heartbeat interval in seconds
ack_timeout: Acknowledgment timeout for messages
"""
async def execute(self, request: GraphQLRequest) -> ExecutionResult: ...
def subscribe(
self,
request: GraphQLRequest,
send_stop: bool = True
) -> AsyncGenerator[ExecutionResult, None]: ...Phoenix-specific Features:
Specialized transport for AWS AppSync realtime subscriptions with AWS authentication support.
class AppSyncWebsocketsTransport(AsyncTransport):
def __init__(
self,
url: str,
auth: Optional[AppSyncAuthentication] = None,
session: Optional[botocore.session.Session] = None,
ssl: Union[SSLContext, bool] = False,
connect_timeout: int = 10,
close_timeout: int = 10,
ack_timeout: int = 10,
keep_alive_timeout: Optional[Union[int, float]] = None,
connect_args: Dict[str, Any] = {}
):
"""
Initialize AWS AppSync WebSocket transport.
Args:
url: AppSync GraphQL endpoint URL (automatically converted to realtime endpoint)
auth: AWS authentication method (defaults to IAM)
session: Boto3 session for IAM authentication
ssl: SSL configuration
connect_timeout: Connection timeout
close_timeout: Close timeout
ack_timeout: Acknowledgment timeout
keep_alive_timeout: Keep-alive timeout
connect_args: WebSocket connection arguments
"""
def execute(self, request: GraphQLRequest) -> ExecutionResult:
"""Raises AssertionError - only subscriptions supported on realtime endpoint."""
def subscribe(
self,
request: GraphQLRequest,
send_stop: bool = True
) -> AsyncGenerator[ExecutionResult, None]:
"""Subscribe to AppSync realtime subscription."""AppSync-specific Features:
Authentication implementations for AWS AppSync WebSocket connections.
class AppSyncAuthentication:
"""Abstract base class for AppSync authentication methods."""
def get_auth_url(self, url: str) -> str:
"""Convert HTTP GraphQL URL to authenticated WebSocket URL."""
def get_headers(self, data=None, headers=None) -> Dict[str, Any]:
"""Get authentication headers for WebSocket connection."""
class AppSyncApiKeyAuthentication(AppSyncAuthentication):
def __init__(self, host: str, api_key: str):
"""
API key authentication for AppSync.
Args:
host: AppSync API host
api_key: AppSync API key
"""
class AppSyncJWTAuthentication(AppSyncAuthentication):
def __init__(self, host: str, jwt: str):
"""
JWT authentication for AppSync (Cognito User Pools, OIDC).
Args:
host: AppSync API host
jwt: JWT access token
"""
class AppSyncIAMAuthentication(AppSyncAuthentication):
def __init__(
self,
host: str,
region_name: Optional[str] = None,
signer: Optional[botocore.auth.BaseSigner] = None,
request_creator: Optional[Callable] = None,
credentials: Optional[botocore.credentials.Credentials] = None,
session: Optional[botocore.session.Session] = None
):
"""
IAM authentication for AppSync with SigV4 signing.
Args:
host: AppSync API host
region_name: AWS region (auto-detected if not provided)
signer: Custom botocore signer
request_creator: Custom request creator
credentials: AWS credentials
session: Boto3 session for credential resolution
"""Execute GraphQL operations directly against local schemas without network communication.
class LocalSchemaTransport(AsyncTransport):
def __init__(self, schema: GraphQLSchema):
"""
Initialize local schema transport.
Args:
schema: Local GraphQL schema object to execute against
"""
async def connect(self) -> None:
"""No-op connection (no network required)."""
async def execute(
self,
request: GraphQLRequest,
*args,
**kwargs
) -> ExecutionResult:
"""
Execute GraphQL request against local schema.
Args:
request: GraphQL request to execute
*args: Additional positional arguments
**kwargs: Additional keyword arguments
Returns:
ExecutionResult from local schema execution
"""
def subscribe(
self,
request: GraphQLRequest,
*args,
**kwargs
) -> AsyncGenerator[ExecutionResult, None]:
"""
Execute GraphQL subscription against local schema.
Args:
request: GraphQL subscription request
*args: Additional positional arguments
**kwargs: Additional keyword arguments
Yields:
ExecutionResult objects from subscription
"""
async def close(self) -> None:
"""No-op close (no connection to close)."""Support for GraphQL multipart file uploads across HTTP transports.
class FileVar:
def __init__(
self,
f: Any, # str | io.IOBase | aiohttp.StreamReader | AsyncGenerator
*,
filename: Optional[str] = None,
content_type: Optional[str] = None,
streaming: bool = False,
streaming_block_size: int = 64 * 1024
):
"""
File variable for GraphQL multipart uploads.
Args:
f: File object (path string, file handle, stream, or async generator)
filename: Override filename for upload
content_type: MIME content type
streaming: Enable streaming upload (requires transport support)
streaming_block_size: Chunk size for streaming uploads
"""
def open_file(self, transport_supports_streaming: bool = False) -> None:
"""
Open file for upload.
Args:
transport_supports_streaming: Whether transport supports streaming
"""
def close_file(self) -> None:
"""Close opened file handle."""
# File utility functions
def open_files(
filevars: List[FileVar],
transport_supports_streaming: bool = False
) -> None:
"""Open multiple FileVar objects for upload."""
def close_files(filevars: List[FileVar]) -> None:
"""Close multiple FileVar objects."""
def extract_files(
variables: Dict,
file_classes: Tuple[Type[Any], ...]
) -> Tuple[Dict, List[FileVar]]:
"""Extract FileVar objects from GraphQL variables."""from gql import Client
from gql.transport.requests import RequestsHTTPTransport
from requests.auth import HTTPBasicAuth
# Basic HTTP transport
transport = RequestsHTTPTransport(
url="https://api.example.com/graphql",
headers={"User-Agent": "MyApp/1.0"},
timeout=30
)
# With authentication and retries
transport = RequestsHTTPTransport(
url="https://api.example.com/graphql",
auth=HTTPBasicAuth("username", "password"),
retries=3,
retry_backoff_factor=0.5,
retry_status_forcelist=[500, 502, 503, 504]
)
client = Client(transport=transport)from gql import gql, Client, FileVar
from gql.transport.aiohttp import AIOHTTPTransport
async def upload_file():
transport = AIOHTTPTransport(url="https://api.example.com/graphql")
client = Client(transport=transport)
# Create file variable
file_var = FileVar(
"document.pdf",
content_type="application/pdf"
)
# GraphQL mutation with file upload
mutation = gql('''
mutation UploadDocument($file: Upload!) {
uploadDocument(file: $file) {
id
filename
url
}
}
''')
async with client.connect_async() as session:
result = await session.execute(
mutation,
variable_values={"file": file_var},
upload_files=True
)
return result["uploadDocument"]import asyncio
from gql import gql, Client
from gql.transport.websockets import WebsocketsTransport
async def handle_messages():
# Configure WebSocket transport
transport = WebsocketsTransport(
url="wss://api.example.com/graphql",
headers={"Authorization": "Bearer token123"},
init_payload={"authToken": "token123"},
subprotocols=["graphql-ws"]
)
client = Client(transport=transport)
async with client.connect_async() as session:
subscription = gql('''
subscription MessageSubscription($channelId: ID!) {
messageAdded(channelId: $channelId) {
id
content
user {
name
}
timestamp
}
}
''')
async for result in session.subscribe(
subscription,
variable_values={"channelId": "general"}
):
if result.data:
message = result.data["messageAdded"]
print(f"[{message['timestamp']}] {message['user']['name']}: {message['content']}")
if result.errors:
print(f"Subscription error: {result.errors}")
asyncio.run(handle_messages())import asyncio
from gql import gql, Client
from gql.transport.appsync_websockets import AppSyncWebsocketsTransport
from gql.transport.appsync_auth import AppSyncIAMAuthentication
async def appsync_subscription():
# Configure AppSync authentication
auth = AppSyncIAMAuthentication(
host="example.appsync-api.us-east-1.amazonaws.com",
region_name="us-east-1"
)
# Create AppSync transport
transport = AppSyncWebsocketsTransport(
url="https://example.appsync-api.us-east-1.amazonaws.com/graphql",
auth=auth
)
client = Client(transport=transport)
async with client.connect_async() as session:
subscription = gql('''
subscription OnCommentAdded($postId: ID!) {
onCommentAdded(postId: $postId) {
id
content
author
createdAt
}
}
''')
async for result in session.subscribe(
subscription,
variable_values={"postId": "post-123"}
):
comment = result.data["onCommentAdded"]
print(f"New comment by {comment['author']}: {comment['content']}")
asyncio.run(appsync_subscription())from gql import gql, Client
from gql.transport.local_schema import LocalSchemaTransport
from graphql import build_schema
# Define schema
type_defs = '''
type Query {
hello(name: String): String
}
type Subscription {
counter: Int
}
'''
# Create executable schema with resolvers
schema = build_schema(type_defs)
# Add resolvers
def resolve_hello(root, info, name="World"):
return f"Hello, {name}!"
async def resolve_counter(root, info):
for i in range(10):
await asyncio.sleep(1)
yield {"counter": i}
schema.query_type.fields["hello"].resolve = resolve_hello
schema.subscription_type.fields["counter"].subscribe = resolve_counter
# Use local transport
transport = LocalSchemaTransport(schema)
client = Client(transport=transport)
async with client.connect_async() as session:
# Execute query
query = gql('{ hello(name: "Alice") }')
result = await session.execute(query)
print(result) # {'hello': 'Hello, Alice!'}
# Execute subscription
subscription = gql('subscription { counter }')
async for result in session.subscribe(subscription):
print(f"Counter: {result['counter']}")Install with Tessl CLI
npx tessl i tessl/pypi-gql