docs
0
# Transport Layer
1
2
The Transport abstract class provides a low-level interface for communicating with Claude Code. While most users should use the query() function or ClaudeSDKClient, custom transports enable advanced use cases like remote connections.
3
4
## Capabilities
5
6
### Transport Abstract Class
7
8
Abstract base class for Claude communication transports.
9
10
```python { .api }
11
class Transport(ABC):
12
"""
13
Abstract transport for Claude communication.
14
15
WARNING: This internal API is exposed for custom transport implementations
16
(e.g., remote Claude Code connections). The Claude Code team may change or
17
remove this abstract class in any future release. Custom implementations
18
must be updated to match interface changes.
19
20
This is a low-level transport interface that handles raw I/O with the Claude
21
process or service. The Query class builds on top of this to implement the
22
control protocol and message routing.
23
24
Use cases for custom transports:
25
- Connect to remote Claude Code instances
26
- Implement custom communication protocols
27
- Add middleware (logging, encryption, compression)
28
- Support alternative execution environments
29
30
Most users should use the default subprocess transport via query() or
31
ClaudeSDKClient rather than implementing custom transports.
32
"""
33
34
@abstractmethod
35
async def connect(self) -> None:
36
"""
37
Connect the transport and prepare for communication.
38
39
For subprocess transports, this starts the process.
40
For network transports, this establishes the connection.
41
42
This method must be called before any read or write operations.
43
Should be idempotent - calling multiple times should be safe.
44
45
Raises:
46
CLIConnectionError: If connection fails
47
CLINotFoundError: If Claude CLI is not found (subprocess transport)
48
49
Example:
50
transport = MyTransport()
51
await transport.connect()
52
"""
53
54
@abstractmethod
55
async def write(self, data: str) -> None:
56
"""
57
Write raw data to the transport.
58
59
Args:
60
data: Raw string data to write (typically JSON + newline)
61
62
The data should be a complete message, usually JSON-encoded with a
63
trailing newline. The transport is responsible for delivering this
64
data to the Claude process.
65
66
Raises:
67
Exception: If write fails or transport is not connected
68
69
Example:
70
await transport.write('{"type": "user", "content": "Hello"}\\n')
71
"""
72
73
@abstractmethod
74
def read_messages(self) -> AsyncIterator[dict[str, Any]]:
75
"""
76
Read and parse messages from the transport.
77
78
This method returns an async iterator that yields parsed JSON messages
79
from the Claude process. Each message is a dictionary.
80
81
The implementation should:
82
- Read lines from the transport
83
- Parse each line as JSON
84
- Yield the parsed message dictionary
85
- Continue until the transport is closed
86
87
Yields:
88
Parsed JSON messages as dictionaries
89
90
Raises:
91
CLIJSONDecodeError: If JSON parsing fails
92
Exception: If read fails or transport is not connected
93
94
Example:
95
async for message in transport.read_messages():
96
print(f"Received: {message}")
97
"""
98
99
@abstractmethod
100
async def close(self) -> None:
101
"""
102
Close the transport connection and clean up resources.
103
104
This method should:
105
- Close any open connections or file handles
106
- Terminate any associated processes (subprocess transport)
107
- Release any allocated resources
108
- Be safe to call multiple times
109
110
Should be idempotent - calling multiple times should be safe.
111
112
Example:
113
await transport.close()
114
"""
115
116
@abstractmethod
117
def is_ready(self) -> bool:
118
"""
119
Check if transport is ready for communication.
120
121
Returns:
122
True if transport is ready to send/receive messages, False otherwise
123
124
This method should return True after connect() succeeds and before
125
close() is called. It's used to verify the transport is in a usable
126
state.
127
128
Example:
129
if transport.is_ready():
130
await transport.write(data)
131
"""
132
133
@abstractmethod
134
async def end_input(self) -> None:
135
"""
136
End the input stream (close stdin for process transports).
137
138
This signals to the Claude process that no more input will be sent.
139
For subprocess transports, this closes stdin. For network transports,
140
this might send an end-of-stream signal.
141
142
After calling this, you should only read remaining output, not write
143
new messages.
144
145
Example:
146
await transport.end_input()
147
# Can still read remaining messages
148
async for msg in transport.read_messages():
149
print(msg)
150
"""
151
```
152
153
## Usage Examples
154
155
### Using Default Transport
156
157
```python
158
from claude_agent_sdk import query
159
160
# Most users should just use query() which handles transport internally
161
async for msg in query(prompt="Hello"):
162
print(msg)
163
164
# Or use ClaudeSDKClient which also manages transport
165
from claude_agent_sdk import ClaudeSDKClient
166
167
async with ClaudeSDKClient() as client:
168
await client.query("Hello")
169
async for msg in client.receive_response():
170
print(msg)
171
```
172
173
### Custom Transport Skeleton
174
175
```python
176
from claude_agent_sdk import Transport
177
from typing import AsyncIterator, Any
178
179
class MyCustomTransport(Transport):
180
"""Custom transport implementation."""
181
182
def __init__(self):
183
self._connected = False
184
self._reader = None
185
self._writer = None
186
187
async def connect(self) -> None:
188
"""Connect to Claude."""
189
if self._connected:
190
return
191
192
# Implement connection logic
193
# For network: establish TCP/HTTP connection
194
# For subprocess: start process
195
# Set self._reader and self._writer
196
197
self._connected = True
198
199
async def write(self, data: str) -> None:
200
"""Write data to Claude."""
201
if not self._connected:
202
raise Exception("Not connected")
203
204
# Implement write logic
205
# For network: send over socket
206
# For subprocess: write to stdin
207
await self._writer.write(data.encode())
208
209
async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
210
"""Read messages from Claude."""
211
if not self._connected:
212
raise Exception("Not connected")
213
214
# Implement read logic
215
while True:
216
line = await self._reader.readline()
217
if not line:
218
break
219
220
# Parse and yield message
221
import json
222
try:
223
message = json.loads(line.decode())
224
yield message
225
except json.JSONDecodeError as e:
226
from claude_agent_sdk import CLIJSONDecodeError
227
raise CLIJSONDecodeError(line.decode(), e)
228
229
async def close(self) -> None:
230
"""Close the transport."""
231
if not self._connected:
232
return
233
234
# Implement close logic
235
if self._writer:
236
self._writer.close()
237
await self._writer.wait_closed()
238
239
self._connected = False
240
241
def is_ready(self) -> bool:
242
"""Check if ready."""
243
return self._connected
244
245
async def end_input(self) -> None:
246
"""End input stream."""
247
if self._writer:
248
self._writer.write_eof()
249
```
250
251
### Network Transport Example
252
253
```python
254
import asyncio
255
import json
256
from claude_agent_sdk import Transport, ClaudeSDKClient
257
from typing import AsyncIterator, Any
258
259
class NetworkTransport(Transport):
260
"""Transport for remote Claude Code over network."""
261
262
def __init__(self, host: str, port: int):
263
self.host = host
264
self.port = port
265
self._reader = None
266
self._writer = None
267
268
async def connect(self) -> None:
269
"""Connect to remote Claude Code."""
270
self._reader, self._writer = await asyncio.open_connection(
271
self.host,
272
self.port
273
)
274
275
async def write(self, data: str) -> None:
276
"""Write to network socket."""
277
self._writer.write(data.encode('utf-8'))
278
await self._writer.drain()
279
280
async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
281
"""Read from network socket."""
282
while True:
283
line = await self._reader.readline()
284
if not line:
285
break
286
287
try:
288
message = json.loads(line.decode('utf-8'))
289
yield message
290
except json.JSONDecodeError as e:
291
from claude_agent_sdk import CLIJSONDecodeError
292
raise CLIJSONDecodeError(line.decode(), e)
293
294
async def close(self) -> None:
295
"""Close network connection."""
296
if self._writer:
297
self._writer.close()
298
await self._writer.wait_closed()
299
300
def is_ready(self) -> bool:
301
"""Check if connected."""
302
return self._writer is not None and not self._writer.is_closing()
303
304
async def end_input(self) -> None:
305
"""Close write side of connection."""
306
if self._writer:
307
self._writer.write_eof()
308
await self._writer.drain()
309
310
# Use custom transport
311
transport = NetworkTransport("claude.example.com", 8080)
312
client = ClaudeSDKClient(transport=transport)
313
314
async with client:
315
await client.query("Hello")
316
async for msg in client.receive_response():
317
print(msg)
318
```
319
320
### Logging Transport Wrapper
321
322
```python
323
import logging
324
from claude_agent_sdk import Transport, query
325
from typing import AsyncIterator, Any
326
327
logging.basicConfig(level=logging.DEBUG)
328
logger = logging.getLogger(__name__)
329
330
class LoggingTransport(Transport):
331
"""Transport wrapper that logs all I/O."""
332
333
def __init__(self, wrapped: Transport):
334
self._transport = wrapped
335
336
async def connect(self) -> None:
337
logger.info("Connecting transport")
338
await self._transport.connect()
339
logger.info("Transport connected")
340
341
async def write(self, data: str) -> None:
342
logger.debug(f"Write: {data[:100]}...")
343
await self._transport.write(data)
344
345
async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
346
logger.info("Starting to read messages")
347
async for message in self._transport.read_messages():
348
logger.debug(f"Read: {message}")
349
yield message
350
logger.info("Finished reading messages")
351
352
async def close(self) -> None:
353
logger.info("Closing transport")
354
await self._transport.close()
355
logger.info("Transport closed")
356
357
def is_ready(self) -> bool:
358
ready = self._transport.is_ready()
359
logger.debug(f"Transport ready: {ready}")
360
return ready
361
362
async def end_input(self) -> None:
363
logger.info("Ending input")
364
await self._transport.end_input()
365
366
# Use with default transport wrapped in logging
367
from claude_agent_sdk._internal.transport.subprocess_cli import SubprocessTransport
368
369
base_transport = SubprocessTransport()
370
logged_transport = LoggingTransport(base_transport)
371
372
async for msg in query(prompt="Hello", transport=logged_transport):
373
print(msg)
374
```
375
376
### Transport with Retry Logic
377
378
```python
379
import anyio
380
from claude_agent_sdk import Transport, CLIConnectionError
381
from typing import AsyncIterator, Any
382
383
class RetryingTransport(Transport):
384
"""Transport wrapper with automatic retry."""
385
386
def __init__(self, wrapped: Transport, max_retries: int = 3):
387
self._transport = wrapped
388
self._max_retries = max_retries
389
390
async def connect(self) -> None:
391
"""Connect with retry."""
392
for attempt in range(self._max_retries):
393
try:
394
await self._transport.connect()
395
return
396
except CLIConnectionError as e:
397
if attempt < self._max_retries - 1:
398
print(f"Connection failed, retrying... ({attempt + 1}/{self._max_retries})")
399
await anyio.sleep(2 ** attempt) # Exponential backoff
400
else:
401
raise
402
403
async def write(self, data: str) -> None:
404
"""Write with retry."""
405
for attempt in range(self._max_retries):
406
try:
407
await self._transport.write(data)
408
return
409
except Exception as e:
410
if attempt < self._max_retries - 1:
411
print(f"Write failed, retrying... ({attempt + 1}/{self._max_retries})")
412
await anyio.sleep(1)
413
else:
414
raise
415
416
async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
417
"""Delegate to wrapped transport."""
418
async for message in self._transport.read_messages():
419
yield message
420
421
async def close(self) -> None:
422
"""Delegate to wrapped transport."""
423
await self._transport.close()
424
425
def is_ready(self) -> bool:
426
"""Delegate to wrapped transport."""
427
return self._transport.is_ready()
428
429
async def end_input(self) -> None:
430
"""Delegate to wrapped transport."""
431
await self._transport.end_input()
432
```
433
434
### Transport with Metrics
435
436
```python
437
import time
438
from claude_agent_sdk import Transport
439
from typing import AsyncIterator, Any
440
441
class MetricsTransport(Transport):
442
"""Transport wrapper that collects metrics."""
443
444
def __init__(self, wrapped: Transport):
445
self._transport = wrapped
446
self.bytes_written = 0
447
self.bytes_read = 0
448
self.messages_read = 0
449
self.connect_time = 0
450
451
async def connect(self) -> None:
452
start = time.time()
453
await self._transport.connect()
454
self.connect_time = time.time() - start
455
456
async def write(self, data: str) -> None:
457
self.bytes_written += len(data)
458
await self._transport.write(data)
459
460
async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
461
async for message in self._transport.read_messages():
462
self.messages_read += 1
463
# Estimate bytes (rough)
464
self.bytes_read += len(str(message))
465
yield message
466
467
async def close(self) -> None:
468
await self._transport.close()
469
470
def is_ready(self) -> bool:
471
return self._transport.is_ready()
472
473
async def end_input(self) -> None:
474
await self._transport.end_input()
475
476
def print_metrics(self):
477
print(f"Connect time: {self.connect_time:.3f}s")
478
print(f"Bytes written: {self.bytes_written}")
479
print(f"Bytes read: {self.bytes_read}")
480
print(f"Messages read: {self.messages_read}")
481
482
# Usage
483
from claude_agent_sdk._internal.transport.subprocess_cli import SubprocessTransport
484
485
base_transport = SubprocessTransport()
486
metrics_transport = MetricsTransport(base_transport)
487
488
async for msg in query(prompt="Hello", transport=metrics_transport):
489
print(msg)
490
491
metrics_transport.print_metrics()
492
```
493
494
### Transport State Management
495
496
```python
497
from enum import Enum
498
from claude_agent_sdk import Transport
499
from typing import AsyncIterator, Any
500
501
class TransportState(Enum):
502
DISCONNECTED = "disconnected"
503
CONNECTING = "connecting"
504
CONNECTED = "connected"
505
CLOSING = "closing"
506
CLOSED = "closed"
507
508
class StatefulTransport(Transport):
509
"""Transport with explicit state management."""
510
511
def __init__(self, wrapped: Transport):
512
self._transport = wrapped
513
self._state = TransportState.DISCONNECTED
514
515
async def connect(self) -> None:
516
if self._state == TransportState.CONNECTED:
517
return
518
519
self._state = TransportState.CONNECTING
520
try:
521
await self._transport.connect()
522
self._state = TransportState.CONNECTED
523
except Exception:
524
self._state = TransportState.DISCONNECTED
525
raise
526
527
async def write(self, data: str) -> None:
528
if self._state != TransportState.CONNECTED:
529
raise Exception(f"Cannot write in state: {self._state}")
530
await self._transport.write(data)
531
532
async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
533
if self._state != TransportState.CONNECTED:
534
raise Exception(f"Cannot read in state: {self._state}")
535
async for message in self._transport.read_messages():
536
yield message
537
538
async def close(self) -> None:
539
if self._state in (TransportState.CLOSING, TransportState.CLOSED):
540
return
541
542
self._state = TransportState.CLOSING
543
try:
544
await self._transport.close()
545
finally:
546
self._state = TransportState.CLOSED
547
548
def is_ready(self) -> bool:
549
return self._state == TransportState.CONNECTED
550
551
async def end_input(self) -> None:
552
if self._state == TransportState.CONNECTED:
553
await self._transport.end_input()
554
555
@property
556
def state(self) -> TransportState:
557
return self._state
558
```
559
560
### Using Transport Directly (Advanced)
561
562
```python
563
import json
564
from claude_agent_sdk._internal.transport.subprocess_cli import SubprocessTransport
565
from claude_agent_sdk import ClaudeAgentOptions
566
567
async def use_transport_directly():
568
"""Example of using transport directly (advanced)."""
569
# Create and connect transport
570
transport = SubprocessTransport(options=ClaudeAgentOptions())
571
await transport.connect()
572
573
try:
574
# Send a message
575
message = {
576
"type": "user",
577
"message": {"role": "user", "content": "Hello"},
578
"session_id": "default"
579
}
580
await transport.write(json.dumps(message) + "\n")
581
582
# Read responses
583
async for response in transport.read_messages():
584
print(f"Response: {response}")
585
586
# Check for completion
587
if response.get("type") == "result":
588
break
589
590
finally:
591
# Clean up
592
await transport.end_input()
593
await transport.close()
594
595
# Note: Most users should use query() or ClaudeSDKClient instead
596
await use_transport_directly()
597
```
598
599
### Transport Testing
600
601
```python
602
from claude_agent_sdk import Transport
603
from typing import AsyncIterator, Any
604
import json
605
606
class MockTransport(Transport):
607
"""Mock transport for testing."""
608
609
def __init__(self, responses: list[dict]):
610
self.responses = responses
611
self.written = []
612
self._connected = False
613
self._response_index = 0
614
615
async def connect(self) -> None:
616
self._connected = True
617
618
async def write(self, data: str) -> None:
619
self.written.append(json.loads(data))
620
621
async def read_messages(self) -> AsyncIterator[dict[str, Any]]:
622
for response in self.responses:
623
yield response
624
625
async def close(self) -> None:
626
self._connected = False
627
628
def is_ready(self) -> bool:
629
return self._connected
630
631
async def end_input(self) -> None:
632
pass
633
634
# Use in tests
635
async def test_query():
636
mock = MockTransport([
637
{"type": "assistant", "content": [{"type": "text", "text": "Hello!"}]},
638
{"type": "result", "is_error": False}
639
])
640
641
# Test your code with mock transport
642
# ...
643
```
644