CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-gql

GraphQL client for Python that enables developers to execute GraphQL queries, mutations, and subscriptions using multiple transport protocols including HTTP, WebSockets, and local schemas with support for both synchronous and asynchronous usage patterns

Pending
Overview
Eval results
Files

transports.mddocs/

Transport Protocols

Comprehensive transport implementations for HTTP, WebSocket, and local schema operations. Includes synchronous and asynchronous variants with protocol-specific optimizations, authentication support, and file upload capabilities.

Capabilities

HTTP Transports

HTTP-based transports for GraphQL operations using popular Python HTTP clients. Support synchronous and asynchronous execution, file uploads, request batching, and comprehensive configuration options.

RequestsHTTPTransport

Synchronous HTTP transport using the requests library with extensive configuration options and file upload support.

class RequestsHTTPTransport(Transport):
    def __init__(
        self,
        url: str,
        headers: Optional[Dict[str, Any]] = None,
        cookies: Optional[Union[Dict[str, Any], RequestsCookieJar]] = None,
        auth: Optional[AuthBase] = None,
        use_json: bool = True,
        timeout: Optional[int] = None,
        verify: Union[bool, str] = True,
        retries: int = 0,
        method: str = "POST",
        retry_backoff_factor: float = 0.1,
        retry_status_forcelist: Collection[int] = None,
        json_serialize: Callable = json.dumps,
        json_deserialize: Callable = json.loads
    ):
        """
        Initialize requests-based HTTP transport.

        Args:
            url: GraphQL server endpoint URL
            headers: HTTP headers to send with requests
            cookies: HTTP cookies for requests
            auth: requests authentication object
            use_json: Send requests as JSON vs form-encoded
            timeout: Request timeout in seconds
            verify: SSL certificate verification (bool or CA bundle path)
            retries: Number of retry attempts on failure
            method: HTTP method to use (POST, GET)
            retry_backoff_factor: Backoff multiplier for retries
            retry_status_forcelist: HTTP status codes to retry
            json_serialize: JSON serialization function
            json_deserialize: JSON deserialization function
        """

    def execute(
        self,
        request: GraphQLRequest,
        timeout: Optional[int] = None,
        extra_args: Optional[Dict] = None,
        upload_files: bool = False
    ) -> ExecutionResult:
        """
        Execute GraphQL request via HTTP.

        Args:
            request: GraphQL request to execute
            timeout: Override default timeout
            extra_args: Additional arguments for requests
            upload_files: Enable multipart file upload support

        Returns:
            ExecutionResult with response data
        """

    def execute_batch(
        self,
        reqs: List[GraphQLRequest],
        timeout: Optional[int] = None,
        extra_args: Optional[Dict] = None
    ) -> List[ExecutionResult]:
        """Execute multiple requests in a single HTTP call."""

    # Properties
    response_headers: Optional[CaseInsensitiveDict[str]]  # Last response headers

HTTPXTransport and HTTPXAsyncTransport

HTTP transports using the httpx library with both synchronous and asynchronous variants.

class HTTPXTransport(Transport):
    def __init__(
        self,
        url: Union[str, httpx.URL],
        json_serialize: Callable = json.dumps,
        json_deserialize: Callable = json.loads,
        **kwargs
    ):
        """
        Initialize httpx-based synchronous HTTP transport.

        Args:
            url: GraphQL server endpoint URL
            json_serialize: JSON serialization function
            json_deserialize: JSON deserialization function  
            **kwargs: Additional httpx client parameters
        """

    def execute(
        self,
        request: GraphQLRequest,
        extra_args: Optional[Dict] = None,
        upload_files: bool = False
    ) -> ExecutionResult: ...

    # Properties
    response_headers: Optional[httpx.Headers]  # Last response headers

class HTTPXAsyncTransport(AsyncTransport):
    def __init__(
        self,
        url: Union[str, httpx.URL],
        json_serialize: Callable = json.dumps,
        json_deserialize: Callable = json.loads,
        **kwargs
    ):
        """Initialize httpx-based asynchronous HTTP transport."""

    async def execute(
        self,
        request: GraphQLRequest,
        extra_args: Optional[Dict] = None,
        upload_files: bool = False
    ) -> ExecutionResult: ...

    async def execute_batch(
        self,
        reqs: List[GraphQLRequest],
        extra_args: Optional[Dict] = None
    ) -> List[ExecutionResult]: ...

    def subscribe(self, request: GraphQLRequest) -> AsyncGenerator[ExecutionResult, None]:
        """Raises NotImplementedError - HTTP doesn't support subscriptions."""

AIOHTTPTransport

Asynchronous HTTP transport using aiohttp with comprehensive configuration and streaming file upload support.

class AIOHTTPTransport(AsyncTransport):
    def __init__(
        self,
        url: str,
        headers: Optional[LooseHeaders] = None,
        cookies: Optional[LooseCookies] = None,
        auth: Optional[Union[BasicAuth, AppSyncAuthentication]] = None,
        ssl: Union[SSLContext, bool, Fingerprint] = True,
        timeout: Optional[int] = None, 
        ssl_close_timeout: Optional[Union[int, float]] = 10,
        json_serialize: Callable = json.dumps,
        json_deserialize: Callable = json.loads,
        client_session_args: Optional[Dict[str, Any]] = None
    ):
        """
        Initialize aiohttp-based HTTP transport.

        Args:
            url: GraphQL server endpoint URL
            headers: HTTP headers for requests
            cookies: HTTP cookies for requests
            auth: Authentication (BasicAuth or AppSync)
            ssl: SSL context or verification settings
            timeout: Request timeout in seconds
            ssl_close_timeout: SSL connection close timeout
            json_serialize: JSON serialization function
            json_deserialize: JSON deserialization function
            client_session_args: Extra aiohttp ClientSession arguments
        """

    async def execute(
        self,
        request: GraphQLRequest,
        extra_args: Optional[Dict] = None,
        upload_files: bool = False
    ) -> ExecutionResult:
        """
        Execute GraphQL request asynchronously.

        Supports streaming file uploads using aiohttp.StreamReader 
        and AsyncGenerator file types.
        """

    # Properties  
    response_headers: Optional[CIMultiDictProxy[str]]  # Last response headers

WebSocket Transports

WebSocket-based transports for GraphQL subscriptions and real-time operations. Support multiple protocols and authentication methods.

WebsocketsTransport

Primary WebSocket transport using the websockets library with support for multiple GraphQL WebSocket protocols.

class WebsocketsTransport(AsyncTransport):
    def __init__(
        self,
        url: str,
        headers: Optional[HeadersLike] = None,
        ssl: Union[SSLContext, bool] = False,
        init_payload: Optional[Dict[str, Any]] = None,
        connect_timeout: Optional[Union[int, float]] = 10,
        close_timeout: Optional[Union[int, float]] = 10,
        ack_timeout: Optional[Union[int, float]] = 10,
        keep_alive_timeout: Optional[Union[int, float]] = None,
        ping_interval: Optional[Union[int, float]] = None,
        pong_timeout: Optional[Union[int, float]] = None,
        answer_pings: bool = True,
        connect_args: Optional[Dict[str, Any]] = None,
        subprotocols: Optional[List[str]] = None
    ):
        """
        Initialize WebSocket transport for GraphQL subscriptions.

        Args:
            url: WebSocket server URL (wss://example.com/graphql)
            headers: HTTP headers for WebSocket handshake
            ssl: SSL context or verification settings
            init_payload: Connection initialization payload
            connect_timeout: Connection establishment timeout
            close_timeout: Connection close timeout
            ack_timeout: Acknowledgment timeout for operations
            keep_alive_timeout: Keep-alive message timeout
            ping_interval: Ping interval for graphql-ws protocol
            pong_timeout: Pong response timeout
            answer_pings: Whether to respond to server pings
            connect_args: Additional websockets.connect arguments
            subprotocols: WebSocket subprotocols to negotiate
        """

    async def execute(self, request: GraphQLRequest) -> ExecutionResult:
        """Execute single GraphQL operation over WebSocket."""

    def subscribe(
        self,
        request: GraphQLRequest,
        send_stop: bool = True
    ) -> AsyncGenerator[ExecutionResult, None]:
        """
        Subscribe to GraphQL subscription.

        Args:
            request: GraphQL subscription request
            send_stop: Send stop message when subscription ends

        Yields:
            ExecutionResult objects as they arrive from server
        """

    async def send_ping(self, payload: Optional[Any] = None) -> None:
        """Send ping message (graphql-ws protocol only)."""

    async def send_pong(self, payload: Optional[Any] = None) -> None:
        """Send pong message (graphql-ws protocol only)."""

    # Properties
    response_headers: Dict[str, str]  # WebSocket handshake response headers
    url: str  # Connection URL
    headers: Optional[HeadersLike]  # Connection headers
    ssl: Union[SSLContext, bool]  # SSL configuration

Supported Subprotocols:

  • graphql-ws - Apollo GraphQL WebSocket protocol
  • graphql-transport-ws - GraphQL WS protocol

AIOHTTPWebsocketsTransport

WebSocket transport using aiohttp's WebSocket client with additional configuration options.

class AIOHTTPWebsocketsTransport(AsyncTransport):
    def __init__(
        self,
        url: StrOrURL,
        subprotocols: Optional[List[str]] = None,
        heartbeat: Optional[float] = None,
        auth: Optional[BasicAuth] = None,
        origin: Optional[str] = None,
        params: Optional[Mapping[str, str]] = None,
        headers: Optional[LooseHeaders] = None,
        proxy: Optional[StrOrURL] = None,
        proxy_auth: Optional[BasicAuth] = None,
        proxy_headers: Optional[LooseHeaders] = None,
        ssl: Optional[Union[SSLContext, Literal[False], Fingerprint]] = None,
        websocket_close_timeout: float = 10.0,
        receive_timeout: Optional[float] = None,
        ssl_close_timeout: Optional[Union[int, float]] = 10,
        session: Optional[ClientSession] = None,
        client_session_args: Optional[Dict[str, Any]] = None,
        connect_args: Optional[Dict[str, Any]] = None,
        # Plus all WebSocket protocol parameters
        **kwargs
    ):
        """
        Initialize aiohttp-based WebSocket transport.

        Args:
            url: WebSocket server URL
            subprotocols: WebSocket subprotocols to negotiate
            heartbeat: Low-level ping heartbeat interval
            auth: Basic authentication for connection
            origin: Origin header for WebSocket handshake
            params: Query parameters for connection URL
            headers: HTTP headers for handshake
            proxy: Proxy server URL
            proxy_auth: Proxy authentication
            proxy_headers: Proxy-specific headers
            ssl: SSL configuration
            websocket_close_timeout: WebSocket close timeout
            receive_timeout: Message receive timeout
            ssl_close_timeout: SSL close timeout
            session: Existing aiohttp ClientSession to use
            client_session_args: ClientSession creation arguments
            connect_args: WebSocket connection arguments
        """

PhoenixChannelWebsocketsTransport

Specialized transport for Phoenix Framework Absinthe GraphQL servers using Phoenix Channel protocol.

class PhoenixChannelWebsocketsTransport(AsyncTransport):
    def __init__(
        self,
        url: str,
        channel_name: str = "__absinthe__:control",
        heartbeat_interval: float = 30,
        ack_timeout: Optional[Union[int, float]] = 10,
        # Plus WebSocket connection parameters
        **kwargs
    ):
        """
        Initialize Phoenix Channel WebSocket transport.

        Args:
            url: Phoenix server URL
            channel_name: Phoenix channel name for GraphQL operations
            heartbeat_interval: Heartbeat interval in seconds
            ack_timeout: Acknowledgment timeout for messages
        """

    async def execute(self, request: GraphQLRequest) -> ExecutionResult: ...

    def subscribe(
        self,
        request: GraphQLRequest,
        send_stop: bool = True
    ) -> AsyncGenerator[ExecutionResult, None]: ...

Phoenix-specific Features:

  • Automatic Phoenix Channel protocol handling (phx_join, phx_leave)
  • Heartbeat messages to maintain connection
  • Subscription management with subscriptionId tracking

AppSyncWebsocketsTransport

Specialized transport for AWS AppSync realtime subscriptions with AWS authentication support.

class AppSyncWebsocketsTransport(AsyncTransport):
    def __init__(
        self,
        url: str,
        auth: Optional[AppSyncAuthentication] = None,
        session: Optional[botocore.session.Session] = None,
        ssl: Union[SSLContext, bool] = False,
        connect_timeout: int = 10,
        close_timeout: int = 10,
        ack_timeout: int = 10,
        keep_alive_timeout: Optional[Union[int, float]] = None,
        connect_args: Dict[str, Any] = {}
    ):
        """
        Initialize AWS AppSync WebSocket transport.

        Args:
            url: AppSync GraphQL endpoint URL (automatically converted to realtime endpoint)
            auth: AWS authentication method (defaults to IAM)
            session: Boto3 session for IAM authentication
            ssl: SSL configuration
            connect_timeout: Connection timeout
            close_timeout: Close timeout
            ack_timeout: Acknowledgment timeout
            keep_alive_timeout: Keep-alive timeout
            connect_args: WebSocket connection arguments
        """

    def execute(self, request: GraphQLRequest) -> ExecutionResult:
        """Raises AssertionError - only subscriptions supported on realtime endpoint."""

    def subscribe(
        self,
        request: GraphQLRequest,
        send_stop: bool = True
    ) -> AsyncGenerator[ExecutionResult, None]:
        """Subscribe to AppSync realtime subscription."""

AppSync-specific Features:

  • Only supports subscriptions (queries/mutations not allowed on realtime endpoint)
  • Automatic URL conversion from GraphQL to realtime endpoint
  • AWS signature-based authentication
  • Supports multiple AWS auth methods

Authentication Classes

Authentication implementations for AWS AppSync WebSocket connections.

class AppSyncAuthentication:
    """Abstract base class for AppSync authentication methods."""
    
    def get_auth_url(self, url: str) -> str:
        """Convert HTTP GraphQL URL to authenticated WebSocket URL."""
    
    def get_headers(self, data=None, headers=None) -> Dict[str, Any]:
        """Get authentication headers for WebSocket connection."""

class AppSyncApiKeyAuthentication(AppSyncAuthentication):
    def __init__(self, host: str, api_key: str):
        """
        API key authentication for AppSync.

        Args:
            host: AppSync API host
            api_key: AppSync API key
        """

class AppSyncJWTAuthentication(AppSyncAuthentication):
    def __init__(self, host: str, jwt: str):
        """
        JWT authentication for AppSync (Cognito User Pools, OIDC).

        Args:
            host: AppSync API host  
            jwt: JWT access token
        """

class AppSyncIAMAuthentication(AppSyncAuthentication):
    def __init__(
        self,
        host: str,
        region_name: Optional[str] = None,
        signer: Optional[botocore.auth.BaseSigner] = None,
        request_creator: Optional[Callable] = None,
        credentials: Optional[botocore.credentials.Credentials] = None,
        session: Optional[botocore.session.Session] = None
    ):
        """
        IAM authentication for AppSync with SigV4 signing.

        Args:
            host: AppSync API host
            region_name: AWS region (auto-detected if not provided)
            signer: Custom botocore signer
            request_creator: Custom request creator
            credentials: AWS credentials
            session: Boto3 session for credential resolution
        """

Local Schema Transport

Execute GraphQL operations directly against local schemas without network communication.

class LocalSchemaTransport(AsyncTransport):
    def __init__(self, schema: GraphQLSchema):
        """
        Initialize local schema transport.

        Args:
            schema: Local GraphQL schema object to execute against
        """

    async def connect(self) -> None:
        """No-op connection (no network required)."""

    async def execute(
        self,
        request: GraphQLRequest,
        *args,
        **kwargs
    ) -> ExecutionResult:
        """
        Execute GraphQL request against local schema.

        Args:
            request: GraphQL request to execute
            *args: Additional positional arguments
            **kwargs: Additional keyword arguments

        Returns:
            ExecutionResult from local schema execution
        """

    def subscribe(
        self,
        request: GraphQLRequest,
        *args,
        **kwargs
    ) -> AsyncGenerator[ExecutionResult, None]:
        """
        Execute GraphQL subscription against local schema.

        Args:
            request: GraphQL subscription request
            *args: Additional positional arguments
            **kwargs: Additional keyword arguments

        Yields:
            ExecutionResult objects from subscription
        """

    async def close(self) -> None:
        """No-op close (no connection to close)."""

File Upload Support

Support for GraphQL multipart file uploads across HTTP transports.

class FileVar:
    def __init__(
        self,
        f: Any,  # str | io.IOBase | aiohttp.StreamReader | AsyncGenerator
        *,
        filename: Optional[str] = None,
        content_type: Optional[str] = None,
        streaming: bool = False,
        streaming_block_size: int = 64 * 1024
    ):
        """
        File variable for GraphQL multipart uploads.

        Args:
            f: File object (path string, file handle, stream, or async generator)
            filename: Override filename for upload
            content_type: MIME content type
            streaming: Enable streaming upload (requires transport support)
            streaming_block_size: Chunk size for streaming uploads
        """

    def open_file(self, transport_supports_streaming: bool = False) -> None:
        """
        Open file for upload.

        Args:
            transport_supports_streaming: Whether transport supports streaming
        """

    def close_file(self) -> None:
        """Close opened file handle."""

# File utility functions
def open_files(
    filevars: List[FileVar], 
    transport_supports_streaming: bool = False
) -> None:
    """Open multiple FileVar objects for upload."""

def close_files(filevars: List[FileVar]) -> None:
    """Close multiple FileVar objects."""

def extract_files(
    variables: Dict, 
    file_classes: Tuple[Type[Any], ...]
) -> Tuple[Dict, List[FileVar]]:
    """Extract FileVar objects from GraphQL variables."""

Usage Examples

HTTP Transport Configuration

from gql import Client
from gql.transport.requests import RequestsHTTPTransport
from requests.auth import HTTPBasicAuth

# Basic HTTP transport
transport = RequestsHTTPTransport(
    url="https://api.example.com/graphql",
    headers={"User-Agent": "MyApp/1.0"},
    timeout=30
)

# With authentication and retries
transport = RequestsHTTPTransport(
    url="https://api.example.com/graphql",
    auth=HTTPBasicAuth("username", "password"),
    retries=3,
    retry_backoff_factor=0.5,
    retry_status_forcelist=[500, 502, 503, 504]
)

client = Client(transport=transport)

File Upload Example

from gql import gql, Client, FileVar
from gql.transport.aiohttp import AIOHTTPTransport

async def upload_file():
    transport = AIOHTTPTransport(url="https://api.example.com/graphql")
    client = Client(transport=transport)
    
    # Create file variable
    file_var = FileVar(
        "document.pdf",
        content_type="application/pdf"
    )
    
    # GraphQL mutation with file upload
    mutation = gql('''
        mutation UploadDocument($file: Upload!) {
            uploadDocument(file: $file) {
                id
                filename
                url
            }
        }
    ''')
    
    async with client.connect_async() as session:
        result = await session.execute(
            mutation,
            variable_values={"file": file_var},
            upload_files=True
        )
        
    return result["uploadDocument"]

WebSocket Subscription Setup

import asyncio
from gql import gql, Client
from gql.transport.websockets import WebsocketsTransport

async def handle_messages():
    # Configure WebSocket transport
    transport = WebsocketsTransport(
        url="wss://api.example.com/graphql",
        headers={"Authorization": "Bearer token123"},
        init_payload={"authToken": "token123"},
        subprotocols=["graphql-ws"]
    )
    
    client = Client(transport=transport)
    
    async with client.connect_async() as session:
        subscription = gql('''
            subscription MessageSubscription($channelId: ID!) {
                messageAdded(channelId: $channelId) {
                    id
                    content
                    user {
                        name
                    }
                    timestamp
                }
            }
        ''')
        
        async for result in session.subscribe(
            subscription, 
            variable_values={"channelId": "general"}
        ):
            if result.data:
                message = result.data["messageAdded"]
                print(f"[{message['timestamp']}] {message['user']['name']}: {message['content']}")
            
            if result.errors:
                print(f"Subscription error: {result.errors}")

asyncio.run(handle_messages())

AWS AppSync Integration

import asyncio
from gql import gql, Client
from gql.transport.appsync_websockets import AppSyncWebsocketsTransport
from gql.transport.appsync_auth import AppSyncIAMAuthentication

async def appsync_subscription():
    # Configure AppSync authentication
    auth = AppSyncIAMAuthentication(
        host="example.appsync-api.us-east-1.amazonaws.com",
        region_name="us-east-1"
    )
    
    # Create AppSync transport
    transport = AppSyncWebsocketsTransport(
        url="https://example.appsync-api.us-east-1.amazonaws.com/graphql",
        auth=auth
    )
    
    client = Client(transport=transport)
    
    async with client.connect_async() as session:
        subscription = gql('''
            subscription OnCommentAdded($postId: ID!) {
                onCommentAdded(postId: $postId) {
                    id
                    content
                    author
                    createdAt
                }
            }
        ''')
        
        async for result in session.subscribe(
            subscription,
            variable_values={"postId": "post-123"}
        ):
            comment = result.data["onCommentAdded"]
            print(f"New comment by {comment['author']}: {comment['content']}")

asyncio.run(appsync_subscription())

Local Schema Testing

from gql import gql, Client
from gql.transport.local_schema import LocalSchemaTransport
from graphql import build_schema

# Define schema
type_defs = '''
    type Query {
        hello(name: String): String
    }
    
    type Subscription {
        counter: Int
    }
'''

# Create executable schema with resolvers
schema = build_schema(type_defs)

# Add resolvers
def resolve_hello(root, info, name="World"):
    return f"Hello, {name}!"

async def resolve_counter(root, info):
    for i in range(10):
        await asyncio.sleep(1)
        yield {"counter": i}

schema.query_type.fields["hello"].resolve = resolve_hello
schema.subscription_type.fields["counter"].subscribe = resolve_counter

# Use local transport
transport = LocalSchemaTransport(schema)
client = Client(transport=transport)

async with client.connect_async() as session:
    # Execute query
    query = gql('{ hello(name: "Alice") }')
    result = await session.execute(query)
    print(result)  # {'hello': 'Hello, Alice!'}
    
    # Execute subscription
    subscription = gql('subscription { counter }')
    async for result in session.subscribe(subscription):
        print(f"Counter: {result['counter']}")

Install with Tessl CLI

npx tessl i tessl/pypi-gql

docs

cli.md

client-sessions.md

dsl.md

index.md

transports.md

utilities.md

tile.json