or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

agent-definitions.mdagents.mdclient.mdconfiguration-options.mdcontent-blocks.mdcore-query-interface.mdcustom-tools.mderror-handling.mderrors.mdhook-system.mdhooks.mdindex.mdmcp-config.mdmcp-server-configuration.mdmessages-and-content.mdmessages.mdoptions.mdpermission-control.mdpermissions.mdquery.mdtransport.md
COMPLETION_SUMMARY.md

transport.mddocs/

0

# Transport Layer

1

2

The Transport abstract class provides a low-level interface for communicating with Claude Code. While most users should use the query() function or ClaudeSDKClient, custom transports enable advanced use cases like remote connections.

3

4

## Capabilities

5

6

### Transport Abstract Class

7

8

Abstract base class for Claude communication transports.

9

10

```python { .api }

11

class Transport(ABC):

12

"""

13

Abstract transport for Claude communication.

14

15

WARNING: This internal API is exposed for custom transport implementations

16

(e.g., remote Claude Code connections). The Claude Code team may change or

17

remove this abstract class in any future release. Custom implementations

18

must be updated to match interface changes.

19

20

This is a low-level transport interface that handles raw I/O with the Claude

21

process or service. The Query class builds on top of this to implement the

22

control protocol and message routing.

23

24

Use cases for custom transports:

25

- Connect to remote Claude Code instances

26

- Implement custom communication protocols

27

- Add middleware (logging, encryption, compression)

28

- Support alternative execution environments

29

30

Most users should use the default subprocess transport via query() or

31

ClaudeSDKClient rather than implementing custom transports.

32

"""

33

34

@abstractmethod

35

async def connect(self) -> None:

36

"""

37

Connect the transport and prepare for communication.

38

39

For subprocess transports, this starts the process.

40

For network transports, this establishes the connection.

41

42

This method must be called before any read or write operations.

43

Should be idempotent - calling multiple times should be safe.

44

45

Raises:

46

CLIConnectionError: If connection fails

47

CLINotFoundError: If Claude CLI is not found (subprocess transport)

48

49

Example:

50

transport = MyTransport()

51

await transport.connect()

52

"""

53

54

@abstractmethod

55

async def write(self, data: str) -> None:

56

"""

57

Write raw data to the transport.

58

59

Args:

60

data: Raw string data to write (typically JSON + newline)

61

62

The data should be a complete message, usually JSON-encoded with a

63

trailing newline. The transport is responsible for delivering this

64

data to the Claude process.

65

66

Raises:

67

Exception: If write fails or transport is not connected

68

69

Example:

70

await transport.write('{"type": "user", "content": "Hello"}\\n')

71

"""

72

73

@abstractmethod

74

def read_messages(self) -> AsyncIterator[dict[str, Any]]:

75

"""

76

Read and parse messages from the transport.

77

78

This method returns an async iterator that yields parsed JSON messages

79

from the Claude process. Each message is a dictionary.

80

81

The implementation should:

82

- Read lines from the transport

83

- Parse each line as JSON

84

- Yield the parsed message dictionary

85

- Continue until the transport is closed

86

87

Yields:

88

Parsed JSON messages as dictionaries

89

90

Raises:

91

CLIJSONDecodeError: If JSON parsing fails

92

Exception: If read fails or transport is not connected

93

94

Example:

95

async for message in transport.read_messages():

96

print(f"Received: {message}")

97

"""

98

99

@abstractmethod

100

async def close(self) -> None:

101

"""

102

Close the transport connection and clean up resources.

103

104

This method should:

105

- Close any open connections or file handles

106

- Terminate any associated processes (subprocess transport)

107

- Release any allocated resources

108

- Be safe to call multiple times

109

110

Should be idempotent - calling multiple times should be safe.

111

112

Example:

113

await transport.close()

114

"""

115

116

@abstractmethod

117

def is_ready(self) -> bool:

118

"""

119

Check if transport is ready for communication.

120

121

Returns:

122

True if transport is ready to send/receive messages, False otherwise

123

124

This method should return True after connect() succeeds and before

125

close() is called. It's used to verify the transport is in a usable

126

state.

127

128

Example:

129

if transport.is_ready():

130

await transport.write(data)

131

"""

132

133

@abstractmethod

134

async def end_input(self) -> None:

135

"""

136

End the input stream (close stdin for process transports).

137

138

This signals to the Claude process that no more input will be sent.

139

For subprocess transports, this closes stdin. For network transports,

140

this might send an end-of-stream signal.

141

142

After calling this, you should only read remaining output, not write

143

new messages.

144

145

Example:

146

await transport.end_input()

147

# Can still read remaining messages

148

async for msg in transport.read_messages():

149

print(msg)

150

"""

151

```

152

153

## Usage Examples

154

155

### Using Default Transport

156

157

```python

158

from claude_agent_sdk import query

159

160

# Most users should just use query() which handles transport internally

161

async for msg in query(prompt="Hello"):

162

print(msg)

163

164

# Or use ClaudeSDKClient which also manages transport

165

from claude_agent_sdk import ClaudeSDKClient

166

167

async with ClaudeSDKClient() as client:

168

await client.query("Hello")

169

async for msg in client.receive_response():

170

print(msg)

171

```

172

173

### Custom Transport Skeleton

174

175

```python

176

from claude_agent_sdk import Transport

177

from typing import AsyncIterator, Any

178

179

class MyCustomTransport(Transport):

180

"""Custom transport implementation."""

181

182

def __init__(self):

183

self._connected = False

184

self._reader = None

185

self._writer = None

186

187

async def connect(self) -> None:

188

"""Connect to Claude."""

189

if self._connected:

190

return

191

192

# Implement connection logic

193

# For network: establish TCP/HTTP connection

194

# For subprocess: start process

195

# Set self._reader and self._writer

196

197

self._connected = True

198

199

async def write(self, data: str) -> None:

200

"""Write data to Claude."""

201

if not self._connected:

202

raise Exception("Not connected")

203

204

# Implement write logic

205

# For network: send over socket

206

# For subprocess: write to stdin

207

await self._writer.write(data.encode())

208

209

async def read_messages(self) -> AsyncIterator[dict[str, Any]]:

210

"""Read messages from Claude."""

211

if not self._connected:

212

raise Exception("Not connected")

213

214

# Implement read logic

215

while True:

216

line = await self._reader.readline()

217

if not line:

218

break

219

220

# Parse and yield message

221

import json

222

try:

223

message = json.loads(line.decode())

224

yield message

225

except json.JSONDecodeError as e:

226

from claude_agent_sdk import CLIJSONDecodeError

227

raise CLIJSONDecodeError(line.decode(), e)

228

229

async def close(self) -> None:

230

"""Close the transport."""

231

if not self._connected:

232

return

233

234

# Implement close logic

235

if self._writer:

236

self._writer.close()

237

await self._writer.wait_closed()

238

239

self._connected = False

240

241

def is_ready(self) -> bool:

242

"""Check if ready."""

243

return self._connected

244

245

async def end_input(self) -> None:

246

"""End input stream."""

247

if self._writer:

248

self._writer.write_eof()

249

```

250

251

### Network Transport Example

252

253

```python

254

import asyncio

255

import json

256

from claude_agent_sdk import Transport, ClaudeSDKClient

257

from typing import AsyncIterator, Any

258

259

class NetworkTransport(Transport):

260

"""Transport for remote Claude Code over network."""

261

262

def __init__(self, host: str, port: int):

263

self.host = host

264

self.port = port

265

self._reader = None

266

self._writer = None

267

268

async def connect(self) -> None:

269

"""Connect to remote Claude Code."""

270

self._reader, self._writer = await asyncio.open_connection(

271

self.host,

272

self.port

273

)

274

275

async def write(self, data: str) -> None:

276

"""Write to network socket."""

277

self._writer.write(data.encode('utf-8'))

278

await self._writer.drain()

279

280

async def read_messages(self) -> AsyncIterator[dict[str, Any]]:

281

"""Read from network socket."""

282

while True:

283

line = await self._reader.readline()

284

if not line:

285

break

286

287

try:

288

message = json.loads(line.decode('utf-8'))

289

yield message

290

except json.JSONDecodeError as e:

291

from claude_agent_sdk import CLIJSONDecodeError

292

raise CLIJSONDecodeError(line.decode(), e)

293

294

async def close(self) -> None:

295

"""Close network connection."""

296

if self._writer:

297

self._writer.close()

298

await self._writer.wait_closed()

299

300

def is_ready(self) -> bool:

301

"""Check if connected."""

302

return self._writer is not None and not self._writer.is_closing()

303

304

async def end_input(self) -> None:

305

"""Close write side of connection."""

306

if self._writer:

307

self._writer.write_eof()

308

await self._writer.drain()

309

310

# Use custom transport

311

transport = NetworkTransport("claude.example.com", 8080)

312

client = ClaudeSDKClient(transport=transport)

313

314

async with client:

315

await client.query("Hello")

316

async for msg in client.receive_response():

317

print(msg)

318

```

319

320

### Logging Transport Wrapper

321

322

```python

323

import logging

324

from claude_agent_sdk import Transport, query

325

from typing import AsyncIterator, Any

326

327

logging.basicConfig(level=logging.DEBUG)

328

logger = logging.getLogger(__name__)

329

330

class LoggingTransport(Transport):

331

"""Transport wrapper that logs all I/O."""

332

333

def __init__(self, wrapped: Transport):

334

self._transport = wrapped

335

336

async def connect(self) -> None:

337

logger.info("Connecting transport")

338

await self._transport.connect()

339

logger.info("Transport connected")

340

341

async def write(self, data: str) -> None:

342

logger.debug(f"Write: {data[:100]}...")

343

await self._transport.write(data)

344

345

async def read_messages(self) -> AsyncIterator[dict[str, Any]]:

346

logger.info("Starting to read messages")

347

async for message in self._transport.read_messages():

348

logger.debug(f"Read: {message}")

349

yield message

350

logger.info("Finished reading messages")

351

352

async def close(self) -> None:

353

logger.info("Closing transport")

354

await self._transport.close()

355

logger.info("Transport closed")

356

357

def is_ready(self) -> bool:

358

ready = self._transport.is_ready()

359

logger.debug(f"Transport ready: {ready}")

360

return ready

361

362

async def end_input(self) -> None:

363

logger.info("Ending input")

364

await self._transport.end_input()

365

366

# Use with default transport wrapped in logging

367

from claude_agent_sdk._internal.transport.subprocess_cli import SubprocessTransport

368

369

base_transport = SubprocessTransport()

370

logged_transport = LoggingTransport(base_transport)

371

372

async for msg in query(prompt="Hello", transport=logged_transport):

373

print(msg)

374

```

375

376

### Transport with Retry Logic

377

378

```python

379

import anyio

380

from claude_agent_sdk import Transport, CLIConnectionError

381

from typing import AsyncIterator, Any

382

383

class RetryingTransport(Transport):

384

"""Transport wrapper with automatic retry."""

385

386

def __init__(self, wrapped: Transport, max_retries: int = 3):

387

self._transport = wrapped

388

self._max_retries = max_retries

389

390

async def connect(self) -> None:

391

"""Connect with retry."""

392

for attempt in range(self._max_retries):

393

try:

394

await self._transport.connect()

395

return

396

except CLIConnectionError as e:

397

if attempt < self._max_retries - 1:

398

print(f"Connection failed, retrying... ({attempt + 1}/{self._max_retries})")

399

await anyio.sleep(2 ** attempt) # Exponential backoff

400

else:

401

raise

402

403

async def write(self, data: str) -> None:

404

"""Write with retry."""

405

for attempt in range(self._max_retries):

406

try:

407

await self._transport.write(data)

408

return

409

except Exception as e:

410

if attempt < self._max_retries - 1:

411

print(f"Write failed, retrying... ({attempt + 1}/{self._max_retries})")

412

await anyio.sleep(1)

413

else:

414

raise

415

416

async def read_messages(self) -> AsyncIterator[dict[str, Any]]:

417

"""Delegate to wrapped transport."""

418

async for message in self._transport.read_messages():

419

yield message

420

421

async def close(self) -> None:

422

"""Delegate to wrapped transport."""

423

await self._transport.close()

424

425

def is_ready(self) -> bool:

426

"""Delegate to wrapped transport."""

427

return self._transport.is_ready()

428

429

async def end_input(self) -> None:

430

"""Delegate to wrapped transport."""

431

await self._transport.end_input()

432

```

433

434

### Transport with Metrics

435

436

```python

437

import time

438

from claude_agent_sdk import Transport

439

from typing import AsyncIterator, Any

440

441

class MetricsTransport(Transport):

442

"""Transport wrapper that collects metrics."""

443

444

def __init__(self, wrapped: Transport):

445

self._transport = wrapped

446

self.bytes_written = 0

447

self.bytes_read = 0

448

self.messages_read = 0

449

self.connect_time = 0

450

451

async def connect(self) -> None:

452

start = time.time()

453

await self._transport.connect()

454

self.connect_time = time.time() - start

455

456

async def write(self, data: str) -> None:

457

self.bytes_written += len(data)

458

await self._transport.write(data)

459

460

async def read_messages(self) -> AsyncIterator[dict[str, Any]]:

461

async for message in self._transport.read_messages():

462

self.messages_read += 1

463

# Estimate bytes (rough)

464

self.bytes_read += len(str(message))

465

yield message

466

467

async def close(self) -> None:

468

await self._transport.close()

469

470

def is_ready(self) -> bool:

471

return self._transport.is_ready()

472

473

async def end_input(self) -> None:

474

await self._transport.end_input()

475

476

def print_metrics(self):

477

print(f"Connect time: {self.connect_time:.3f}s")

478

print(f"Bytes written: {self.bytes_written}")

479

print(f"Bytes read: {self.bytes_read}")

480

print(f"Messages read: {self.messages_read}")

481

482

# Usage

483

from claude_agent_sdk._internal.transport.subprocess_cli import SubprocessTransport

484

485

base_transport = SubprocessTransport()

486

metrics_transport = MetricsTransport(base_transport)

487

488

async for msg in query(prompt="Hello", transport=metrics_transport):

489

print(msg)

490

491

metrics_transport.print_metrics()

492

```

493

494

### Transport State Management

495

496

```python

497

from enum import Enum

498

from claude_agent_sdk import Transport

499

from typing import AsyncIterator, Any

500

501

class TransportState(Enum):

502

DISCONNECTED = "disconnected"

503

CONNECTING = "connecting"

504

CONNECTED = "connected"

505

CLOSING = "closing"

506

CLOSED = "closed"

507

508

class StatefulTransport(Transport):

509

"""Transport with explicit state management."""

510

511

def __init__(self, wrapped: Transport):

512

self._transport = wrapped

513

self._state = TransportState.DISCONNECTED

514

515

async def connect(self) -> None:

516

if self._state == TransportState.CONNECTED:

517

return

518

519

self._state = TransportState.CONNECTING

520

try:

521

await self._transport.connect()

522

self._state = TransportState.CONNECTED

523

except Exception:

524

self._state = TransportState.DISCONNECTED

525

raise

526

527

async def write(self, data: str) -> None:

528

if self._state != TransportState.CONNECTED:

529

raise Exception(f"Cannot write in state: {self._state}")

530

await self._transport.write(data)

531

532

async def read_messages(self) -> AsyncIterator[dict[str, Any]]:

533

if self._state != TransportState.CONNECTED:

534

raise Exception(f"Cannot read in state: {self._state}")

535

async for message in self._transport.read_messages():

536

yield message

537

538

async def close(self) -> None:

539

if self._state in (TransportState.CLOSING, TransportState.CLOSED):

540

return

541

542

self._state = TransportState.CLOSING

543

try:

544

await self._transport.close()

545

finally:

546

self._state = TransportState.CLOSED

547

548

def is_ready(self) -> bool:

549

return self._state == TransportState.CONNECTED

550

551

async def end_input(self) -> None:

552

if self._state == TransportState.CONNECTED:

553

await self._transport.end_input()

554

555

@property

556

def state(self) -> TransportState:

557

return self._state

558

```

559

560

### Using Transport Directly (Advanced)

561

562

```python

563

import json

564

from claude_agent_sdk._internal.transport.subprocess_cli import SubprocessTransport

565

from claude_agent_sdk import ClaudeAgentOptions

566

567

async def use_transport_directly():

568

"""Example of using transport directly (advanced)."""

569

# Create and connect transport

570

transport = SubprocessTransport(options=ClaudeAgentOptions())

571

await transport.connect()

572

573

try:

574

# Send a message

575

message = {

576

"type": "user",

577

"message": {"role": "user", "content": "Hello"},

578

"session_id": "default"

579

}

580

await transport.write(json.dumps(message) + "\n")

581

582

# Read responses

583

async for response in transport.read_messages():

584

print(f"Response: {response}")

585

586

# Check for completion

587

if response.get("type") == "result":

588

break

589

590

finally:

591

# Clean up

592

await transport.end_input()

593

await transport.close()

594

595

# Note: Most users should use query() or ClaudeSDKClient instead

596

await use_transport_directly()

597

```

598

599

### Transport Testing

600

601

```python

602

from claude_agent_sdk import Transport

603

from typing import AsyncIterator, Any

604

import json

605

606

class MockTransport(Transport):

607

"""Mock transport for testing."""

608

609

def __init__(self, responses: list[dict]):

610

self.responses = responses

611

self.written = []

612

self._connected = False

613

self._response_index = 0

614

615

async def connect(self) -> None:

616

self._connected = True

617

618

async def write(self, data: str) -> None:

619

self.written.append(json.loads(data))

620

621

async def read_messages(self) -> AsyncIterator[dict[str, Any]]:

622

for response in self.responses:

623

yield response

624

625

async def close(self) -> None:

626

self._connected = False

627

628

def is_ready(self) -> bool:

629

return self._connected

630

631

async def end_input(self) -> None:

632

pass

633

634

# Use in tests

635

async def test_query():

636

mock = MockTransport([

637

{"type": "assistant", "content": [{"type": "text", "text": "Hello!"}]},

638

{"type": "result", "is_error": False}

639

])

640

641

# Test your code with mock transport

642

# ...

643

```

644