CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-gql

GraphQL client for Python that enables developers to execute GraphQL queries, mutations, and subscriptions using multiple transport protocols including HTTP, WebSockets, and local schemas with support for both synchronous and asynchronous usage patterns

Pending
Overview
Eval results
Files

client-sessions.mddocs/

Client and Sessions

Core client functionality for executing GraphQL operations, managing connections, and handling schemas. Provides both synchronous and asynchronous execution patterns with session management for connection pooling and batched operations.

Capabilities

Client Class

The main entrypoint for executing GraphQL requests on a transport. Provides both synchronous and asynchronous execution methods with automatic schema management and result parsing.

class Client:
    def __init__(
        self,
        transport: Optional[Union[Transport, AsyncTransport]] = None,
        fetch_schema_from_transport: bool = False,
        schema: Optional[GraphQLSchema] = None,
        parse_results: bool = False,
        introspection: Optional[Dict[str, Any]] = None,
        type_def: Optional[str] = None,
        execute_timeout: Optional[Union[int, float]] = 10
    ):
        """
        Initialize GraphQL client.

        Args:
            transport: Transport to use for GraphQL operations
            fetch_schema_from_transport: Automatically fetch schema via introspection
            schema: Pre-built GraphQL schema
            parse_results: Parse results using schema for custom scalars/enums
            introspection: Pre-fetched introspection result
            type_def: Schema definition string
            execute_timeout: Default timeout for operations in seconds
        """

    def execute(
        self, 
        request: GraphQLRequest, 
        **kwargs
    ) -> ExecutionResult:
        """
        Execute GraphQL request synchronously.

        Args:
            request: GraphQL request to execute
            **kwargs: Additional arguments passed to transport

        Returns:
            ExecutionResult with data, errors, and extensions

        Raises:
            TransportError: If transport operation fails
            GraphQLError: If GraphQL validation fails
        """

    async def execute_async(
        self, 
        request: GraphQLRequest, 
        **kwargs
    ) -> ExecutionResult:
        """
        Execute GraphQL request asynchronously.

        Args:
            request: GraphQL request to execute
            **kwargs: Additional arguments passed to transport

        Returns:
            ExecutionResult with data, errors, and extensions
        """

    def session(self, **kwargs) -> SyncClientSession:
        """
        Create synchronous session for batched operations.

        Args:
            **kwargs: Additional arguments for session configuration

        Returns:
            SyncClientSession context manager
        """

    async def connect_async(self, **kwargs) -> AsyncClientSession:
        """
        Create asynchronous session for batched operations.

        Args:
            **kwargs: Additional arguments for session configuration

        Returns:
            AsyncClientSession context manager
        """

    def get_schema(self) -> GraphQLSchema:
        """
        Get the GraphQL schema.

        Returns:
            GraphQLSchema object

        Raises:
            Exception: If no schema is available
        """

    def close(self) -> None:
        """Close synchronous client and transport."""

    async def close_async(self) -> None:
        """Close asynchronous client and transport."""

GraphQL Request Creation

Create GraphQL requests from query strings with variable support and operation naming.

def gql(request_string: str) -> GraphQLRequest:
    """
    Parse GraphQL query string into GraphQLRequest object.

    Args:
        request_string: GraphQL query, mutation, or subscription as string

    Returns:
        GraphQLRequest object ready for execution

    Raises:
        GraphQLError: If query string has syntax errors
    """

class GraphQLRequest:
    def __init__(
        self,
        request: Union[DocumentNode, "GraphQLRequest", str],
        *,
        variable_values: Optional[Dict[str, Any]] = None,
        operation_name: Optional[str] = None
    ):
        """
        Initialize GraphQL request.

        Args:
            request: GraphQL query as string, DocumentNode, or existing GraphQLRequest
            variable_values: Variables for the GraphQL operation
            operation_name: Operation name for multi-operation documents

        Raises:
            GraphQLError: If request has syntax errors
            TypeError: If request type is invalid
        """

    def serialize_variable_values(self, schema: GraphQLSchema) -> GraphQLRequest:
        """
        Serialize variable values using schema type information.

        Args:
            schema: GraphQL schema for type information

        Returns:
            New GraphQLRequest with serialized variables
        """

    @property
    def payload(self) -> Dict[str, Any]:
        """
        Get JSON payload representation of the request.

        Returns:
            Dictionary with 'query', 'variables', and 'operationName' keys
        """

    # Properties
    document: DocumentNode  # Parsed GraphQL document
    variable_values: Optional[Dict[str, Any]]  # Query variables
    operation_name: Optional[str]  # Operation name

Synchronous Sessions

Synchronous session management for batched operations and connection reuse.

class SyncClientSession:
    def execute(
        self,
        request: GraphQLRequest,
        **kwargs
    ) -> ExecutionResult:
        """
        Execute GraphQL request in session context.

        Args:
            request: GraphQL request to execute
            **kwargs: Additional arguments for execution

        Returns:
            ExecutionResult with data, errors, and extensions
        """

    def execute_batch(
        self,
        requests: List[GraphQLRequest],
        **kwargs
    ) -> List[ExecutionResult]:
        """
        Execute multiple GraphQL requests in a batch.

        Args:
            requests: List of GraphQL requests to execute
            **kwargs: Additional arguments for batch execution

        Returns:
            List of ExecutionResult objects
        """

    def close(self) -> None:
        """Close the session and transport connection."""

    # Context manager support
    def __enter__(self) -> SyncClientSession: ...
    def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

Asynchronous Sessions

Asynchronous session management with subscription support and automatic reconnection capabilities.

class AsyncClientSession:
    async def execute(
        self,
        request: GraphQLRequest,
        **kwargs
    ) -> ExecutionResult:
        """
        Execute GraphQL request asynchronously in session context.

        Args:
            request: GraphQL request to execute
            **kwargs: Additional arguments for execution

        Returns:
            ExecutionResult with data, errors, and extensions
        """

    async def execute_batch(
        self,
        requests: List[GraphQLRequest],
        **kwargs
    ) -> List[ExecutionResult]:
        """
        Execute multiple GraphQL requests in a batch asynchronously.

        Args:
            requests: List of GraphQL requests to execute
            **kwargs: Additional arguments for batch execution

        Returns:
            List of ExecutionResult objects
        """

    def subscribe(
        self,
        request: GraphQLRequest,
        **kwargs
    ) -> AsyncGenerator[ExecutionResult, None]:
        """
        Subscribe to GraphQL subscription.

        Args:
            request: GraphQL subscription request
            **kwargs: Additional arguments for subscription

        Yields:
            ExecutionResult objects as they arrive

        Raises:
            TransportError: If subscription setup fails
        """

    async def close(self) -> None:
        """Close the async session and transport connection."""

    # Async context manager support
    async def __aenter__(self) -> AsyncClientSession: ...
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...

class ReconnectingAsyncClientSession(AsyncClientSession):
    """
    Async session with automatic reconnection on connection failures.
    
    Inherits all methods from AsyncClientSession with added reconnection logic.
    """

Usage Examples

Basic Client Usage

from gql import gql, Client
from gql.transport.requests import RequestsHTTPTransport

# Create client with automatic schema fetching
transport = RequestsHTTPTransport(url="https://api.example.com/graphql")
client = Client(transport=transport, fetch_schema_from_transport=True)

# Execute query
query = gql('''
    query GetUsers($limit: Int!) {
        users(limit: $limit) {
            id
            name
            email
        }
    }
''')

result = client.execute(query, variable_values={"limit": 10})
users = result["users"]

# Close client when done
client.close()

Session-based Batch Operations

from gql import gql, Client
from gql.transport.requests import RequestsHTTPTransport

transport = RequestsHTTPTransport(url="https://api.example.com/graphql")
client = Client(transport=transport)

# Using synchronous session
with client.session() as session:
    query1 = gql('{ user(id: "1") { name } }')
    query2 = gql('{ user(id: "2") { name } }')
    
    # Execute individually
    result1 = session.execute(query1)
    result2 = session.execute(query2)
    
    # Or execute as batch
    results = session.execute_batch([query1, query2])

Asynchronous Operations

import asyncio
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport

async def main():
    transport = AIOHTTPTransport(url="https://api.example.com/graphql")
    client = Client(transport=transport)
    
    # Using async context manager
    async with client.connect_async() as session:
        query = gql('{ hello }')
        result = await session.execute(query)
        print(result)
        
        # Batch execution
        queries = [gql('{ user1: user(id: "1") { name } }'), 
                  gql('{ user2: user(id: "2") { name } }')]
        results = await session.execute_batch(queries)
        
    await client.close_async()

asyncio.run(main())

Subscription Handling

import asyncio
from gql import gql, Client
from gql.transport.websockets import WebsocketsTransport

async def handle_subscriptions():
    transport = WebsocketsTransport(url="wss://api.example.com/graphql")
    client = Client(transport=transport)
    
    async with client.connect_async() as session:
        subscription = gql('''
            subscription {
                messageAdded {
                    id
                    content
                    user {
                        name
                    }
                }
            }
        ''')
        
        # Handle subscription messages
        async for result in session.subscribe(subscription):
            if result.data:
                message = result.data["messageAdded"]
                print(f"New message from {message['user']['name']}: {message['content']}")
            
            if result.errors:
                print(f"Subscription error: {result.errors}")
                break

asyncio.run(handle_subscriptions())

Custom Schema Handling

from gql import gql, Client
from gql.transport.requests import RequestsHTTPTransport
from gql.utilities import update_schema_scalar
from graphql import GraphQLScalarType

# Custom scalar for datetime handling
datetime_scalar = GraphQLScalarType(
    name="DateTime",
    serialize=lambda dt: dt.isoformat(),
    parse_value=lambda value: datetime.fromisoformat(value),
    parse_literal=lambda ast: datetime.fromisoformat(ast.value)
)

transport = RequestsHTTPTransport(url="https://api.example.com/graphql")
client = Client(
    transport=transport, 
    fetch_schema_from_transport=True,
    parse_results=True  # Enable result parsing with custom scalars
)

# Update schema with custom scalar
update_schema_scalar(client.schema, "DateTime", datetime_scalar)

# Queries will now automatically parse DateTime fields
query = gql('{ posts { title createdAt } }')
result = client.execute(query)
# result["posts"][0]["createdAt"] is now a datetime object

Install with Tessl CLI

npx tessl i tessl/pypi-gql

docs

cli.md

client-sessions.md

dsl.md

index.md

transports.md

utilities.md

tile.json