0
# Transport System
1
2
Abstract transport interface for custom Claude Code communication implementations. Enables custom transport implementations for remote Claude Code connections or alternative communication methods beyond the default subprocess transport.
3
4
## Capabilities
5
6
### Abstract Transport Class
7
8
Base class for implementing custom transport mechanisms for Claude Code communication.
9
10
```python { .api }
11
class Transport(ABC):
12
"""
13
Abstract transport for Claude communication.
14
15
WARNING: This internal API is exposed for custom transport implementations
16
(e.g., remote Claude Code connections). The Claude Code team may change or
17
or remove this abstract class in any future release. Custom implementations
18
must be updated to match interface changes.
19
20
This is a low-level transport interface that handles raw I/O with the Claude
21
process or service. The Query class builds on top of this to implement the
22
control protocol and message routing.
23
"""
24
25
@abstractmethod
26
async def connect(self) -> None:
27
"""
28
Connect the transport and prepare for communication.
29
30
For subprocess transports, this starts the process.
31
For network transports, this establishes the connection.
32
"""
33
34
@abstractmethod
35
async def write(self, data: str) -> None:
36
"""
37
Write raw data to the transport.
38
39
Args:
40
data: Raw string data to write (typically JSON + newline)
41
"""
42
43
@abstractmethod
44
def read_messages(self) -> AsyncIterator[dict[str, Any]]:
45
"""
46
Read and parse messages from the transport.
47
48
Yields:
49
Parsed JSON messages from the transport
50
"""
51
52
@abstractmethod
53
async def close(self) -> None:
54
"""Close the transport connection and clean up resources."""
55
56
@abstractmethod
57
def is_ready(self) -> bool:
58
"""
59
Check if transport is ready for communication.
60
61
Returns:
62
True if transport is ready to send/receive messages
63
"""
64
65
@abstractmethod
66
async def end_input(self) -> None:
67
"""End the input stream (close stdin for process transports)."""
68
```
69
70
## Usage Examples
71
72
### Custom Network Transport
73
74
```python
75
import asyncio
76
import json
77
from typing import AsyncIterator, Any
78
from claude_code_sdk import Transport, query
79
80
class NetworkTransport(Transport):
81
"""Custom transport that communicates with Claude Code over network."""
82
83
def __init__(self, host: str, port: int):
84
self.host = host
85
self.port = port
86
self.reader: asyncio.StreamReader | None = None
87
self.writer: asyncio.StreamWriter | None = None
88
self.connected = False
89
90
async def connect(self) -> None:
91
"""Establish network connection to Claude Code server."""
92
try:
93
self.reader, self.writer = await asyncio.open_connection(
94
self.host, self.port
95
)
96
self.connected = True
97
print(f"Connected to Claude Code at {self.host}:{self.port}")
98
99
# Send initial handshake
100
handshake = {"type": "handshake", "version": "1.0"}
101
await self.write(json.dumps(handshake) + "\n")
102
103
except Exception as e:
104
raise ConnectionError(f"Failed to connect to {self.host}:{self.port}: {e}")
105
106
async def write(self, data: str) -> None:
107
"""Send data over the network connection."""
108
if not self.writer or not self.connected:
109
raise RuntimeError("Transport not connected")
110
111
self.writer.write(data.encode())
112
await self.writer.drain()
113
114
async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
115
"""Read and parse JSON messages from network stream."""
116
if not self.reader or not self.connected:
117
raise RuntimeError("Transport not connected")
118
119
while self.connected:
120
try:
121
line = await self.reader.readline()
122
if not line:
123
break
124
125
line_str = line.decode().strip()
126
if line_str:
127
try:
128
message = json.loads(line_str)
129
yield message
130
except json.JSONDecodeError as e:
131
print(f"Failed to decode JSON: {line_str[:100]}")
132
continue
133
134
except asyncio.CancelledError:
135
break
136
except Exception as e:
137
print(f"Error reading message: {e}")
138
break
139
140
async def close(self) -> None:
141
"""Close the network connection."""
142
self.connected = False
143
144
if self.writer:
145
self.writer.close()
146
await self.writer.wait_closed()
147
148
self.reader = None
149
self.writer = None
150
151
def is_ready(self) -> bool:
152
"""Check if network transport is ready."""
153
return self.connected and self.writer is not None
154
155
async def end_input(self) -> None:
156
"""Signal end of input to remote server."""
157
if self.connected:
158
await self.write(json.dumps({"type": "end_input"}) + "\n")
159
160
# Usage
161
async def main():
162
transport = NetworkTransport("localhost", 8080)
163
164
async for message in query(
165
prompt="Hello from network transport",
166
transport=transport
167
):
168
print(message)
169
```
170
171
### Custom File Transport
172
173
```python
174
import json
175
import asyncio
176
from pathlib import Path
177
from typing import AsyncIterator, Any
178
from claude_code_sdk import Transport
179
180
class FileTransport(Transport):
181
"""Transport that reads/writes to files for testing or offline processing."""
182
183
def __init__(self, input_file: str, output_file: str):
184
self.input_file = Path(input_file)
185
self.output_file = Path(output_file)
186
self.connected = False
187
self.input_queue: asyncio.Queue = asyncio.Queue()
188
189
async def connect(self) -> None:
190
"""Initialize file transport."""
191
self.output_file.parent.mkdir(parents=True, exist_ok=True)
192
193
# Clear output file
194
with open(self.output_file, "w") as f:
195
f.write("")
196
197
self.connected = True
198
print(f"File transport ready: {self.input_file} -> {self.output_file}")
199
200
async def write(self, data: str) -> None:
201
"""Write data to output file."""
202
if not self.connected:
203
raise RuntimeError("Transport not connected")
204
205
with open(self.output_file, "a") as f:
206
f.write(data)
207
208
# Simulate processing delay
209
await asyncio.sleep(0.1)
210
211
# Generate mock response
212
try:
213
request = json.loads(data.strip())
214
if request.get("type") == "user":
215
response = {
216
"type": "assistant",
217
"message": {
218
"role": "assistant",
219
"content": [
220
{
221
"type": "text",
222
"text": f"Mock response to: {request['message']['content']}"
223
}
224
]
225
}
226
}
227
await self.input_queue.put(response)
228
229
except (json.JSONDecodeError, KeyError):
230
pass
231
232
async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
233
"""Read messages from input queue."""
234
while self.connected:
235
try:
236
message = await asyncio.wait_for(
237
self.input_queue.get(), timeout=1.0
238
)
239
yield message
240
except asyncio.TimeoutError:
241
continue
242
except asyncio.CancelledError:
243
break
244
245
async def close(self) -> None:
246
"""Close file transport."""
247
self.connected = False
248
print("File transport closed")
249
250
def is_ready(self) -> bool:
251
"""Check if file transport is ready."""
252
return self.connected
253
254
async def end_input(self) -> None:
255
"""Signal end of input."""
256
if self.connected:
257
await self.input_queue.put({"type": "end"})
258
259
# Usage
260
async def main():
261
transport = FileTransport("input.jsonl", "output.jsonl")
262
263
async for message in query(
264
prompt="Test file transport",
265
transport=transport
266
):
267
print(message)
268
```
269
270
### Debug Transport Wrapper
271
272
```python
273
import json
274
from typing import AsyncIterator, Any
275
from claude_code_sdk import Transport
276
277
class DebugTransport(Transport):
278
"""Wrapper transport that logs all communication for debugging."""
279
280
def __init__(self, wrapped_transport: Transport, log_file: str = "debug.log"):
281
self.wrapped = wrapped_transport
282
self.log_file = log_file
283
284
def log(self, direction: str, data: Any) -> None:
285
"""Log communication data."""
286
with open(self.log_file, "a") as f:
287
timestamp = __import__("datetime").datetime.now().isoformat()
288
f.write(f"[{timestamp}] {direction}: {json.dumps(data)}\n")
289
290
async def connect(self) -> None:
291
"""Connect wrapped transport with logging."""
292
self.log("CONNECT", {"action": "connecting"})
293
await self.wrapped.connect()
294
self.log("CONNECT", {"action": "connected"})
295
296
async def write(self, data: str) -> None:
297
"""Write data with logging."""
298
try:
299
parsed_data = json.loads(data.strip())
300
self.log("WRITE", parsed_data)
301
except json.JSONDecodeError:
302
self.log("WRITE", {"raw": data[:200]})
303
304
await self.wrapped.write(data)
305
306
async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
307
"""Read messages with logging."""
308
async for message in self.wrapped.read_messages():
309
self.log("READ", message)
310
yield message
311
312
async def close(self) -> None:
313
"""Close wrapped transport with logging."""
314
self.log("CLOSE", {"action": "closing"})
315
await self.wrapped.close()
316
self.log("CLOSE", {"action": "closed"})
317
318
def is_ready(self) -> bool:
319
"""Check if wrapped transport is ready."""
320
return self.wrapped.is_ready()
321
322
async def end_input(self) -> None:
323
"""End input on wrapped transport with logging."""
324
self.log("END_INPUT", {"action": "ending_input"})
325
await self.wrapped.end_input()
326
327
# Usage
328
async def main():
329
# Wrap any existing transport with debug logging
330
base_transport = NetworkTransport("localhost", 8080)
331
debug_transport = DebugTransport(base_transport, "claude_debug.log")
332
333
async for message in query(
334
prompt="Debug this communication",
335
transport=debug_transport
336
):
337
print(message)
338
```
339
340
### Mock Transport for Testing
341
342
```python
343
import asyncio
344
import json
345
from typing import AsyncIterator, Any
346
from claude_code_sdk import Transport
347
348
class MockTransport(Transport):
349
"""Mock transport for testing that returns predefined responses."""
350
351
def __init__(self, responses: list[dict[str, Any]]):
352
self.responses = responses
353
self.response_index = 0
354
self.connected = False
355
self.requests: list[dict[str, Any]] = []
356
357
async def connect(self) -> None:
358
"""Mock connection."""
359
self.connected = True
360
361
async def write(self, data: str) -> None:
362
"""Record requests."""
363
if not self.connected:
364
raise RuntimeError("Transport not connected")
365
366
try:
367
request = json.loads(data.strip())
368
self.requests.append(request)
369
except json.JSONDecodeError:
370
pass
371
372
async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
373
"""Return predefined responses."""
374
while self.connected and self.response_index < len(self.responses):
375
await asyncio.sleep(0.1) # Simulate delay
376
response = self.responses[self.response_index]
377
self.response_index += 1
378
yield response
379
380
async def close(self) -> None:
381
"""Mock close."""
382
self.connected = False
383
384
def is_ready(self) -> bool:
385
"""Mock ready check."""
386
return self.connected
387
388
async def end_input(self) -> None:
389
"""Mock end input."""
390
pass
391
392
def get_requests(self) -> list[dict[str, Any]]:
393
"""Get recorded requests for testing."""
394
return self.requests.copy()
395
396
# Usage in tests
397
async def test_query():
398
mock_responses = [
399
{
400
"type": "assistant",
401
"message": {
402
"role": "assistant",
403
"content": [{"type": "text", "text": "Hello! I'm a mock response."}]
404
}
405
},
406
{
407
"type": "result",
408
"subtype": "result",
409
"duration_ms": 100,
410
"duration_api_ms": 50,
411
"is_error": False,
412
"num_turns": 1,
413
"session_id": "test",
414
"total_cost_usd": 0.01
415
}
416
]
417
418
transport = MockTransport(mock_responses)
419
420
messages = []
421
async for message in query(
422
prompt="Test message",
423
transport=transport
424
):
425
messages.append(message)
426
427
# Verify requests were recorded
428
requests = transport.get_requests()
429
assert len(requests) == 1
430
assert requests[0]["message"]["content"] == "Test message"
431
432
# Verify responses were received
433
assert len(messages) == 2
434
```
435
436
## Transport Interface Requirements
437
438
### Connection Management
439
440
- `connect()`: Establish connection and prepare for communication
441
- `close()`: Clean up resources and close connection
442
- `is_ready()`: Return current connection status
443
444
### Communication
445
446
- `write(data)`: Send raw string data (usually JSON + newline)
447
- `read_messages()`: Return async iterator of parsed JSON messages
448
- `end_input()`: Signal end of input stream
449
450
### Error Handling
451
452
Transports should handle errors appropriately:
453
- Connection failures in `connect()`
454
- I/O errors in `write()` and `read_messages()`
455
- Resource cleanup in `close()`
456
457
### Message Format
458
459
**Outgoing messages** (to transport):
460
- JSON strings ending with newline
461
- Usually contain `type`, `message`, `session_id` fields
462
- Control messages for SDK features
463
464
**Incoming messages** (from transport):
465
- JSON objects with parsed message data
466
- Various types: user, assistant, system, result, stream events
467
- Processed by internal message parser
468
469
## Integration with SDK
470
471
### Default Transport
472
473
The SDK automatically selects the appropriate transport:
474
- `SubprocessCLITransport`: Default subprocess transport
475
- Custom transport via `transport` parameter
476
477
### Transport Configuration
478
479
```python
480
from claude_code_sdk import query, ClaudeSDKClient
481
482
# With query function
483
async for message in query(
484
prompt="Hello",
485
transport=custom_transport
486
):
487
print(message)
488
489
# With ClaudeSDKClient
490
client = ClaudeSDKClient()
491
await client.connect() # Uses default transport
492
493
# Custom transport would be configured differently
494
# (SDK client doesn't currently support custom transports in constructor)
495
```
496
497
### Query vs Client Integration
498
499
- `query()` function: Accepts custom transport via parameter
500
- `ClaudeSDKClient`: Uses internal transport selection (primarily subprocess)
501
502
## Important Warnings
503
504
### API Stability
505
506
The Transport interface is marked as internal and may change:
507
- Interface methods may be added, removed, or modified
508
- Custom implementations must be updated with SDK releases
509
- Not covered by semantic versioning guarantees
510
511
### Thread Safety
512
513
- Transport implementations should be async-safe
514
- Multiple concurrent operations may occur
515
- Proper synchronization required for shared resources
516
517
### Resource Management
518
519
- Implement proper cleanup in `close()`
520
- Handle connection failures gracefully
521
- Avoid resource leaks in long-running applications
522
523
For integration with other SDK components, see [Configuration and Options](./configuration-options.md) and [Simple Queries](./simple-queries.md).