CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-claude-code-sdk

Python SDK for Claude Code providing simple query functions and advanced bidirectional interactive conversations with custom tool support

Overview
Eval results
Files

transport-system.mddocs/

Transport System

Abstract transport interface for custom Claude Code communication implementations. Enables custom transport implementations for remote Claude Code connections or alternative communication methods beyond the default subprocess transport.

Capabilities

Abstract Transport Class

Base class for implementing custom transport mechanisms for Claude Code communication.

class Transport(ABC):
    """
    Abstract transport for Claude communication.

    WARNING: This internal API is exposed for custom transport implementations
    (e.g., remote Claude Code connections). The Claude Code team may change or
    or remove this abstract class in any future release. Custom implementations
    must be updated to match interface changes.

    This is a low-level transport interface that handles raw I/O with the Claude
    process or service. The Query class builds on top of this to implement the
    control protocol and message routing.
    """

    @abstractmethod
    async def connect(self) -> None:
        """
        Connect the transport and prepare for communication.

        For subprocess transports, this starts the process.
        For network transports, this establishes the connection.
        """

    @abstractmethod
    async def write(self, data: str) -> None:
        """
        Write raw data to the transport.

        Args:
            data: Raw string data to write (typically JSON + newline)
        """

    @abstractmethod
    def read_messages(self) -> AsyncIterator[dict[str, Any]]:
        """
        Read and parse messages from the transport.

        Yields:
            Parsed JSON messages from the transport
        """

    @abstractmethod
    async def close(self) -> None:
        """Close the transport connection and clean up resources."""

    @abstractmethod
    def is_ready(self) -> bool:
        """
        Check if transport is ready for communication.

        Returns:
            True if transport is ready to send/receive messages
        """

    @abstractmethod
    async def end_input(self) -> None:
        """End the input stream (close stdin for process transports)."""

Usage Examples

Custom Network Transport

import asyncio
import json
from typing import AsyncIterator, Any
from claude_code_sdk import Transport, query

class NetworkTransport(Transport):
    """Custom transport that communicates with Claude Code over network."""

    def __init__(self, host: str, port: int):
        self.host = host
        self.port = port
        self.reader: asyncio.StreamReader | None = None
        self.writer: asyncio.StreamWriter | None = None
        self.connected = False

    async def connect(self) -> None:
        """Establish network connection to Claude Code server."""
        try:
            self.reader, self.writer = await asyncio.open_connection(
                self.host, self.port
            )
            self.connected = True
            print(f"Connected to Claude Code at {self.host}:{self.port}")

            # Send initial handshake
            handshake = {"type": "handshake", "version": "1.0"}
            await self.write(json.dumps(handshake) + "\n")

        except Exception as e:
            raise ConnectionError(f"Failed to connect to {self.host}:{self.port}: {e}")

    async def write(self, data: str) -> None:
        """Send data over the network connection."""
        if not self.writer or not self.connected:
            raise RuntimeError("Transport not connected")

        self.writer.write(data.encode())
        await self.writer.drain()

    async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
        """Read and parse JSON messages from network stream."""
        if not self.reader or not self.connected:
            raise RuntimeError("Transport not connected")

        while self.connected:
            try:
                line = await self.reader.readline()
                if not line:
                    break

                line_str = line.decode().strip()
                if line_str:
                    try:
                        message = json.loads(line_str)
                        yield message
                    except json.JSONDecodeError as e:
                        print(f"Failed to decode JSON: {line_str[:100]}")
                        continue

            except asyncio.CancelledError:
                break
            except Exception as e:
                print(f"Error reading message: {e}")
                break

    async def close(self) -> None:
        """Close the network connection."""
        self.connected = False

        if self.writer:
            self.writer.close()
            await self.writer.wait_closed()

        self.reader = None
        self.writer = None

    def is_ready(self) -> bool:
        """Check if network transport is ready."""
        return self.connected and self.writer is not None

    async def end_input(self) -> None:
        """Signal end of input to remote server."""
        if self.connected:
            await self.write(json.dumps({"type": "end_input"}) + "\n")

# Usage
async def main():
    transport = NetworkTransport("localhost", 8080)

    async for message in query(
        prompt="Hello from network transport",
        transport=transport
    ):
        print(message)

Custom File Transport

import json
import asyncio
from pathlib import Path
from typing import AsyncIterator, Any
from claude_code_sdk import Transport

class FileTransport(Transport):
    """Transport that reads/writes to files for testing or offline processing."""

    def __init__(self, input_file: str, output_file: str):
        self.input_file = Path(input_file)
        self.output_file = Path(output_file)
        self.connected = False
        self.input_queue: asyncio.Queue = asyncio.Queue()

    async def connect(self) -> None:
        """Initialize file transport."""
        self.output_file.parent.mkdir(parents=True, exist_ok=True)

        # Clear output file
        with open(self.output_file, "w") as f:
            f.write("")

        self.connected = True
        print(f"File transport ready: {self.input_file} -> {self.output_file}")

    async def write(self, data: str) -> None:
        """Write data to output file."""
        if not self.connected:
            raise RuntimeError("Transport not connected")

        with open(self.output_file, "a") as f:
            f.write(data)

        # Simulate processing delay
        await asyncio.sleep(0.1)

        # Generate mock response
        try:
            request = json.loads(data.strip())
            if request.get("type") == "user":
                response = {
                    "type": "assistant",
                    "message": {
                        "role": "assistant",
                        "content": [
                            {
                                "type": "text",
                                "text": f"Mock response to: {request['message']['content']}"
                            }
                        ]
                    }
                }
                await self.input_queue.put(response)

        except (json.JSONDecodeError, KeyError):
            pass

    async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
        """Read messages from input queue."""
        while self.connected:
            try:
                message = await asyncio.wait_for(
                    self.input_queue.get(), timeout=1.0
                )
                yield message
            except asyncio.TimeoutError:
                continue
            except asyncio.CancelledError:
                break

    async def close(self) -> None:
        """Close file transport."""
        self.connected = False
        print("File transport closed")

    def is_ready(self) -> bool:
        """Check if file transport is ready."""
        return self.connected

    async def end_input(self) -> None:
        """Signal end of input."""
        if self.connected:
            await self.input_queue.put({"type": "end"})

# Usage
async def main():
    transport = FileTransport("input.jsonl", "output.jsonl")

    async for message in query(
        prompt="Test file transport",
        transport=transport
    ):
        print(message)

Debug Transport Wrapper

import json
from typing import AsyncIterator, Any
from claude_code_sdk import Transport

class DebugTransport(Transport):
    """Wrapper transport that logs all communication for debugging."""

    def __init__(self, wrapped_transport: Transport, log_file: str = "debug.log"):
        self.wrapped = wrapped_transport
        self.log_file = log_file

    def log(self, direction: str, data: Any) -> None:
        """Log communication data."""
        with open(self.log_file, "a") as f:
            timestamp = __import__("datetime").datetime.now().isoformat()
            f.write(f"[{timestamp}] {direction}: {json.dumps(data)}\n")

    async def connect(self) -> None:
        """Connect wrapped transport with logging."""
        self.log("CONNECT", {"action": "connecting"})
        await self.wrapped.connect()
        self.log("CONNECT", {"action": "connected"})

    async def write(self, data: str) -> None:
        """Write data with logging."""
        try:
            parsed_data = json.loads(data.strip())
            self.log("WRITE", parsed_data)
        except json.JSONDecodeError:
            self.log("WRITE", {"raw": data[:200]})

        await self.wrapped.write(data)

    async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
        """Read messages with logging."""
        async for message in self.wrapped.read_messages():
            self.log("READ", message)
            yield message

    async def close(self) -> None:
        """Close wrapped transport with logging."""
        self.log("CLOSE", {"action": "closing"})
        await self.wrapped.close()
        self.log("CLOSE", {"action": "closed"})

    def is_ready(self) -> bool:
        """Check if wrapped transport is ready."""
        return self.wrapped.is_ready()

    async def end_input(self) -> None:
        """End input on wrapped transport with logging."""
        self.log("END_INPUT", {"action": "ending_input"})
        await self.wrapped.end_input()

# Usage
async def main():
    # Wrap any existing transport with debug logging
    base_transport = NetworkTransport("localhost", 8080)
    debug_transport = DebugTransport(base_transport, "claude_debug.log")

    async for message in query(
        prompt="Debug this communication",
        transport=debug_transport
    ):
        print(message)

Mock Transport for Testing

import asyncio
import json
from typing import AsyncIterator, Any
from claude_code_sdk import Transport

class MockTransport(Transport):
    """Mock transport for testing that returns predefined responses."""

    def __init__(self, responses: list[dict[str, Any]]):
        self.responses = responses
        self.response_index = 0
        self.connected = False
        self.requests: list[dict[str, Any]] = []

    async def connect(self) -> None:
        """Mock connection."""
        self.connected = True

    async def write(self, data: str) -> None:
        """Record requests."""
        if not self.connected:
            raise RuntimeError("Transport not connected")

        try:
            request = json.loads(data.strip())
            self.requests.append(request)
        except json.JSONDecodeError:
            pass

    async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
        """Return predefined responses."""
        while self.connected and self.response_index < len(self.responses):
            await asyncio.sleep(0.1)  # Simulate delay
            response = self.responses[self.response_index]
            self.response_index += 1
            yield response

    async def close(self) -> None:
        """Mock close."""
        self.connected = False

    def is_ready(self) -> bool:
        """Mock ready check."""
        return self.connected

    async def end_input(self) -> None:
        """Mock end input."""
        pass

    def get_requests(self) -> list[dict[str, Any]]:
        """Get recorded requests for testing."""
        return self.requests.copy()

# Usage in tests
async def test_query():
    mock_responses = [
        {
            "type": "assistant",
            "message": {
                "role": "assistant",
                "content": [{"type": "text", "text": "Hello! I'm a mock response."}]
            }
        },
        {
            "type": "result",
            "subtype": "result",
            "duration_ms": 100,
            "duration_api_ms": 50,
            "is_error": False,
            "num_turns": 1,
            "session_id": "test",
            "total_cost_usd": 0.01
        }
    ]

    transport = MockTransport(mock_responses)

    messages = []
    async for message in query(
        prompt="Test message",
        transport=transport
    ):
        messages.append(message)

    # Verify requests were recorded
    requests = transport.get_requests()
    assert len(requests) == 1
    assert requests[0]["message"]["content"] == "Test message"

    # Verify responses were received
    assert len(messages) == 2

Transport Interface Requirements

Connection Management

  • connect(): Establish connection and prepare for communication
  • close(): Clean up resources and close connection
  • is_ready(): Return current connection status

Communication

  • write(data): Send raw string data (usually JSON + newline)
  • read_messages(): Return async iterator of parsed JSON messages
  • end_input(): Signal end of input stream

Error Handling

Transports should handle errors appropriately:

  • Connection failures in connect()
  • I/O errors in write() and read_messages()
  • Resource cleanup in close()

Message Format

Outgoing messages (to transport):

  • JSON strings ending with newline
  • Usually contain type, message, session_id fields
  • Control messages for SDK features

Incoming messages (from transport):

  • JSON objects with parsed message data
  • Various types: user, assistant, system, result, stream events
  • Processed by internal message parser

Integration with SDK

Default Transport

The SDK automatically selects the appropriate transport:

  • SubprocessCLITransport: Default subprocess transport
  • Custom transport via transport parameter

Transport Configuration

from claude_code_sdk import query, ClaudeSDKClient

# With query function
async for message in query(
    prompt="Hello",
    transport=custom_transport
):
    print(message)

# With ClaudeSDKClient
client = ClaudeSDKClient()
await client.connect()  # Uses default transport

# Custom transport would be configured differently
# (SDK client doesn't currently support custom transports in constructor)

Query vs Client Integration

  • query() function: Accepts custom transport via parameter
  • ClaudeSDKClient: Uses internal transport selection (primarily subprocess)

Important Warnings

API Stability

The Transport interface is marked as internal and may change:

  • Interface methods may be added, removed, or modified
  • Custom implementations must be updated with SDK releases
  • Not covered by semantic versioning guarantees

Thread Safety

  • Transport implementations should be async-safe
  • Multiple concurrent operations may occur
  • Proper synchronization required for shared resources

Resource Management

  • Implement proper cleanup in close()
  • Handle connection failures gracefully
  • Avoid resource leaks in long-running applications

For integration with other SDK components, see Configuration and Options and Simple Queries.

Install with Tessl CLI

npx tessl i tessl/pypi-claude-code-sdk

docs

configuration-options.md

custom-tools.md

error-handling.md

hook-system.md

index.md

interactive-client.md

message-types.md

simple-queries.md

transport-system.md

tile.json