Python SDK for Claude Code providing simple query functions and advanced bidirectional interactive conversations with custom tool support
Abstract transport interface for custom Claude Code communication implementations. Enables custom transport implementations for remote Claude Code connections or alternative communication methods beyond the default subprocess transport.
Base class for implementing custom transport mechanisms for Claude Code communication.
class Transport(ABC):
"""
Abstract transport for Claude communication.
WARNING: This internal API is exposed for custom transport implementations
(e.g., remote Claude Code connections). The Claude Code team may change or
or remove this abstract class in any future release. Custom implementations
must be updated to match interface changes.
This is a low-level transport interface that handles raw I/O with the Claude
process or service. The Query class builds on top of this to implement the
control protocol and message routing.
"""
@abstractmethod
async def connect(self) -> None:
"""
Connect the transport and prepare for communication.
For subprocess transports, this starts the process.
For network transports, this establishes the connection.
"""
@abstractmethod
async def write(self, data: str) -> None:
"""
Write raw data to the transport.
Args:
data: Raw string data to write (typically JSON + newline)
"""
@abstractmethod
def read_messages(self) -> AsyncIterator[dict[str, Any]]:
"""
Read and parse messages from the transport.
Yields:
Parsed JSON messages from the transport
"""
@abstractmethod
async def close(self) -> None:
"""Close the transport connection and clean up resources."""
@abstractmethod
def is_ready(self) -> bool:
"""
Check if transport is ready for communication.
Returns:
True if transport is ready to send/receive messages
"""
@abstractmethod
async def end_input(self) -> None:
"""End the input stream (close stdin for process transports)."""import asyncio
import json
from typing import AsyncIterator, Any
from claude_code_sdk import Transport, query
class NetworkTransport(Transport):
"""Custom transport that communicates with Claude Code over network."""
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.reader: asyncio.StreamReader | None = None
self.writer: asyncio.StreamWriter | None = None
self.connected = False
async def connect(self) -> None:
"""Establish network connection to Claude Code server."""
try:
self.reader, self.writer = await asyncio.open_connection(
self.host, self.port
)
self.connected = True
print(f"Connected to Claude Code at {self.host}:{self.port}")
# Send initial handshake
handshake = {"type": "handshake", "version": "1.0"}
await self.write(json.dumps(handshake) + "\n")
except Exception as e:
raise ConnectionError(f"Failed to connect to {self.host}:{self.port}: {e}")
async def write(self, data: str) -> None:
"""Send data over the network connection."""
if not self.writer or not self.connected:
raise RuntimeError("Transport not connected")
self.writer.write(data.encode())
await self.writer.drain()
async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
"""Read and parse JSON messages from network stream."""
if not self.reader or not self.connected:
raise RuntimeError("Transport not connected")
while self.connected:
try:
line = await self.reader.readline()
if not line:
break
line_str = line.decode().strip()
if line_str:
try:
message = json.loads(line_str)
yield message
except json.JSONDecodeError as e:
print(f"Failed to decode JSON: {line_str[:100]}")
continue
except asyncio.CancelledError:
break
except Exception as e:
print(f"Error reading message: {e}")
break
async def close(self) -> None:
"""Close the network connection."""
self.connected = False
if self.writer:
self.writer.close()
await self.writer.wait_closed()
self.reader = None
self.writer = None
def is_ready(self) -> bool:
"""Check if network transport is ready."""
return self.connected and self.writer is not None
async def end_input(self) -> None:
"""Signal end of input to remote server."""
if self.connected:
await self.write(json.dumps({"type": "end_input"}) + "\n")
# Usage
async def main():
transport = NetworkTransport("localhost", 8080)
async for message in query(
prompt="Hello from network transport",
transport=transport
):
print(message)import json
import asyncio
from pathlib import Path
from typing import AsyncIterator, Any
from claude_code_sdk import Transport
class FileTransport(Transport):
"""Transport that reads/writes to files for testing or offline processing."""
def __init__(self, input_file: str, output_file: str):
self.input_file = Path(input_file)
self.output_file = Path(output_file)
self.connected = False
self.input_queue: asyncio.Queue = asyncio.Queue()
async def connect(self) -> None:
"""Initialize file transport."""
self.output_file.parent.mkdir(parents=True, exist_ok=True)
# Clear output file
with open(self.output_file, "w") as f:
f.write("")
self.connected = True
print(f"File transport ready: {self.input_file} -> {self.output_file}")
async def write(self, data: str) -> None:
"""Write data to output file."""
if not self.connected:
raise RuntimeError("Transport not connected")
with open(self.output_file, "a") as f:
f.write(data)
# Simulate processing delay
await asyncio.sleep(0.1)
# Generate mock response
try:
request = json.loads(data.strip())
if request.get("type") == "user":
response = {
"type": "assistant",
"message": {
"role": "assistant",
"content": [
{
"type": "text",
"text": f"Mock response to: {request['message']['content']}"
}
]
}
}
await self.input_queue.put(response)
except (json.JSONDecodeError, KeyError):
pass
async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
"""Read messages from input queue."""
while self.connected:
try:
message = await asyncio.wait_for(
self.input_queue.get(), timeout=1.0
)
yield message
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
async def close(self) -> None:
"""Close file transport."""
self.connected = False
print("File transport closed")
def is_ready(self) -> bool:
"""Check if file transport is ready."""
return self.connected
async def end_input(self) -> None:
"""Signal end of input."""
if self.connected:
await self.input_queue.put({"type": "end"})
# Usage
async def main():
transport = FileTransport("input.jsonl", "output.jsonl")
async for message in query(
prompt="Test file transport",
transport=transport
):
print(message)import json
from typing import AsyncIterator, Any
from claude_code_sdk import Transport
class DebugTransport(Transport):
"""Wrapper transport that logs all communication for debugging."""
def __init__(self, wrapped_transport: Transport, log_file: str = "debug.log"):
self.wrapped = wrapped_transport
self.log_file = log_file
def log(self, direction: str, data: Any) -> None:
"""Log communication data."""
with open(self.log_file, "a") as f:
timestamp = __import__("datetime").datetime.now().isoformat()
f.write(f"[{timestamp}] {direction}: {json.dumps(data)}\n")
async def connect(self) -> None:
"""Connect wrapped transport with logging."""
self.log("CONNECT", {"action": "connecting"})
await self.wrapped.connect()
self.log("CONNECT", {"action": "connected"})
async def write(self, data: str) -> None:
"""Write data with logging."""
try:
parsed_data = json.loads(data.strip())
self.log("WRITE", parsed_data)
except json.JSONDecodeError:
self.log("WRITE", {"raw": data[:200]})
await self.wrapped.write(data)
async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
"""Read messages with logging."""
async for message in self.wrapped.read_messages():
self.log("READ", message)
yield message
async def close(self) -> None:
"""Close wrapped transport with logging."""
self.log("CLOSE", {"action": "closing"})
await self.wrapped.close()
self.log("CLOSE", {"action": "closed"})
def is_ready(self) -> bool:
"""Check if wrapped transport is ready."""
return self.wrapped.is_ready()
async def end_input(self) -> None:
"""End input on wrapped transport with logging."""
self.log("END_INPUT", {"action": "ending_input"})
await self.wrapped.end_input()
# Usage
async def main():
# Wrap any existing transport with debug logging
base_transport = NetworkTransport("localhost", 8080)
debug_transport = DebugTransport(base_transport, "claude_debug.log")
async for message in query(
prompt="Debug this communication",
transport=debug_transport
):
print(message)import asyncio
import json
from typing import AsyncIterator, Any
from claude_code_sdk import Transport
class MockTransport(Transport):
"""Mock transport for testing that returns predefined responses."""
def __init__(self, responses: list[dict[str, Any]]):
self.responses = responses
self.response_index = 0
self.connected = False
self.requests: list[dict[str, Any]] = []
async def connect(self) -> None:
"""Mock connection."""
self.connected = True
async def write(self, data: str) -> None:
"""Record requests."""
if not self.connected:
raise RuntimeError("Transport not connected")
try:
request = json.loads(data.strip())
self.requests.append(request)
except json.JSONDecodeError:
pass
async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
"""Return predefined responses."""
while self.connected and self.response_index < len(self.responses):
await asyncio.sleep(0.1) # Simulate delay
response = self.responses[self.response_index]
self.response_index += 1
yield response
async def close(self) -> None:
"""Mock close."""
self.connected = False
def is_ready(self) -> bool:
"""Mock ready check."""
return self.connected
async def end_input(self) -> None:
"""Mock end input."""
pass
def get_requests(self) -> list[dict[str, Any]]:
"""Get recorded requests for testing."""
return self.requests.copy()
# Usage in tests
async def test_query():
mock_responses = [
{
"type": "assistant",
"message": {
"role": "assistant",
"content": [{"type": "text", "text": "Hello! I'm a mock response."}]
}
},
{
"type": "result",
"subtype": "result",
"duration_ms": 100,
"duration_api_ms": 50,
"is_error": False,
"num_turns": 1,
"session_id": "test",
"total_cost_usd": 0.01
}
]
transport = MockTransport(mock_responses)
messages = []
async for message in query(
prompt="Test message",
transport=transport
):
messages.append(message)
# Verify requests were recorded
requests = transport.get_requests()
assert len(requests) == 1
assert requests[0]["message"]["content"] == "Test message"
# Verify responses were received
assert len(messages) == 2connect(): Establish connection and prepare for communicationclose(): Clean up resources and close connectionis_ready(): Return current connection statuswrite(data): Send raw string data (usually JSON + newline)read_messages(): Return async iterator of parsed JSON messagesend_input(): Signal end of input streamTransports should handle errors appropriately:
connect()write() and read_messages()close()Outgoing messages (to transport):
type, message, session_id fieldsIncoming messages (from transport):
The SDK automatically selects the appropriate transport:
SubprocessCLITransport: Default subprocess transporttransport parameterfrom claude_code_sdk import query, ClaudeSDKClient
# With query function
async for message in query(
prompt="Hello",
transport=custom_transport
):
print(message)
# With ClaudeSDKClient
client = ClaudeSDKClient()
await client.connect() # Uses default transport
# Custom transport would be configured differently
# (SDK client doesn't currently support custom transports in constructor)query() function: Accepts custom transport via parameterClaudeSDKClient: Uses internal transport selection (primarily subprocess)The Transport interface is marked as internal and may change:
close()For integration with other SDK components, see Configuration and Options and Simple Queries.
Install with Tessl CLI
npx tessl i tessl/pypi-claude-code-sdk