or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-options.mdcustom-tools.mderror-handling.mdhook-system.mdindex.mdinteractive-client.mdmessage-types.mdsimple-queries.mdtransport-system.md

transport-system.mddocs/

0

# Transport System

1

2

Abstract transport interface for custom Claude Code communication implementations. Enables custom transport implementations for remote Claude Code connections or alternative communication methods beyond the default subprocess transport.

3

4

## Capabilities

5

6

### Abstract Transport Class

7

8

Base class for implementing custom transport mechanisms for Claude Code communication.

9

10

```python { .api }

11

class Transport(ABC):

12

"""

13

Abstract transport for Claude communication.

14

15

WARNING: This internal API is exposed for custom transport implementations

16

(e.g., remote Claude Code connections). The Claude Code team may change or

17

or remove this abstract class in any future release. Custom implementations

18

must be updated to match interface changes.

19

20

This is a low-level transport interface that handles raw I/O with the Claude

21

process or service. The Query class builds on top of this to implement the

22

control protocol and message routing.

23

"""

24

25

@abstractmethod

26

async def connect(self) -> None:

27

"""

28

Connect the transport and prepare for communication.

29

30

For subprocess transports, this starts the process.

31

For network transports, this establishes the connection.

32

"""

33

34

@abstractmethod

35

async def write(self, data: str) -> None:

36

"""

37

Write raw data to the transport.

38

39

Args:

40

data: Raw string data to write (typically JSON + newline)

41

"""

42

43

@abstractmethod

44

def read_messages(self) -> AsyncIterator[dict[str, Any]]:

45

"""

46

Read and parse messages from the transport.

47

48

Yields:

49

Parsed JSON messages from the transport

50

"""

51

52

@abstractmethod

53

async def close(self) -> None:

54

"""Close the transport connection and clean up resources."""

55

56

@abstractmethod

57

def is_ready(self) -> bool:

58

"""

59

Check if transport is ready for communication.

60

61

Returns:

62

True if transport is ready to send/receive messages

63

"""

64

65

@abstractmethod

66

async def end_input(self) -> None:

67

"""End the input stream (close stdin for process transports)."""

68

```

69

70

## Usage Examples

71

72

### Custom Network Transport

73

74

```python

75

import asyncio

76

import json

77

from typing import AsyncIterator, Any

78

from claude_code_sdk import Transport, query

79

80

class NetworkTransport(Transport):

81

"""Custom transport that communicates with Claude Code over network."""

82

83

def __init__(self, host: str, port: int):

84

self.host = host

85

self.port = port

86

self.reader: asyncio.StreamReader | None = None

87

self.writer: asyncio.StreamWriter | None = None

88

self.connected = False

89

90

async def connect(self) -> None:

91

"""Establish network connection to Claude Code server."""

92

try:

93

self.reader, self.writer = await asyncio.open_connection(

94

self.host, self.port

95

)

96

self.connected = True

97

print(f"Connected to Claude Code at {self.host}:{self.port}")

98

99

# Send initial handshake

100

handshake = {"type": "handshake", "version": "1.0"}

101

await self.write(json.dumps(handshake) + "\n")

102

103

except Exception as e:

104

raise ConnectionError(f"Failed to connect to {self.host}:{self.port}: {e}")

105

106

async def write(self, data: str) -> None:

107

"""Send data over the network connection."""

108

if not self.writer or not self.connected:

109

raise RuntimeError("Transport not connected")

110

111

self.writer.write(data.encode())

112

await self.writer.drain()

113

114

async def read_messages(self) -> AsyncIterator[dict[str, Any]]:

115

"""Read and parse JSON messages from network stream."""

116

if not self.reader or not self.connected:

117

raise RuntimeError("Transport not connected")

118

119

while self.connected:

120

try:

121

line = await self.reader.readline()

122

if not line:

123

break

124

125

line_str = line.decode().strip()

126

if line_str:

127

try:

128

message = json.loads(line_str)

129

yield message

130

except json.JSONDecodeError as e:

131

print(f"Failed to decode JSON: {line_str[:100]}")

132

continue

133

134

except asyncio.CancelledError:

135

break

136

except Exception as e:

137

print(f"Error reading message: {e}")

138

break

139

140

async def close(self) -> None:

141

"""Close the network connection."""

142

self.connected = False

143

144

if self.writer:

145

self.writer.close()

146

await self.writer.wait_closed()

147

148

self.reader = None

149

self.writer = None

150

151

def is_ready(self) -> bool:

152

"""Check if network transport is ready."""

153

return self.connected and self.writer is not None

154

155

async def end_input(self) -> None:

156

"""Signal end of input to remote server."""

157

if self.connected:

158

await self.write(json.dumps({"type": "end_input"}) + "\n")

159

160

# Usage

161

async def main():

162

transport = NetworkTransport("localhost", 8080)

163

164

async for message in query(

165

prompt="Hello from network transport",

166

transport=transport

167

):

168

print(message)

169

```

170

171

### Custom File Transport

172

173

```python

174

import json

175

import asyncio

176

from pathlib import Path

177

from typing import AsyncIterator, Any

178

from claude_code_sdk import Transport

179

180

class FileTransport(Transport):

181

"""Transport that reads/writes to files for testing or offline processing."""

182

183

def __init__(self, input_file: str, output_file: str):

184

self.input_file = Path(input_file)

185

self.output_file = Path(output_file)

186

self.connected = False

187

self.input_queue: asyncio.Queue = asyncio.Queue()

188

189

async def connect(self) -> None:

190

"""Initialize file transport."""

191

self.output_file.parent.mkdir(parents=True, exist_ok=True)

192

193

# Clear output file

194

with open(self.output_file, "w") as f:

195

f.write("")

196

197

self.connected = True

198

print(f"File transport ready: {self.input_file} -> {self.output_file}")

199

200

async def write(self, data: str) -> None:

201

"""Write data to output file."""

202

if not self.connected:

203

raise RuntimeError("Transport not connected")

204

205

with open(self.output_file, "a") as f:

206

f.write(data)

207

208

# Simulate processing delay

209

await asyncio.sleep(0.1)

210

211

# Generate mock response

212

try:

213

request = json.loads(data.strip())

214

if request.get("type") == "user":

215

response = {

216

"type": "assistant",

217

"message": {

218

"role": "assistant",

219

"content": [

220

{

221

"type": "text",

222

"text": f"Mock response to: {request['message']['content']}"

223

}

224

]

225

}

226

}

227

await self.input_queue.put(response)

228

229

except (json.JSONDecodeError, KeyError):

230

pass

231

232

async def read_messages(self) -> AsyncIterator[dict[str, Any]]:

233

"""Read messages from input queue."""

234

while self.connected:

235

try:

236

message = await asyncio.wait_for(

237

self.input_queue.get(), timeout=1.0

238

)

239

yield message

240

except asyncio.TimeoutError:

241

continue

242

except asyncio.CancelledError:

243

break

244

245

async def close(self) -> None:

246

"""Close file transport."""

247

self.connected = False

248

print("File transport closed")

249

250

def is_ready(self) -> bool:

251

"""Check if file transport is ready."""

252

return self.connected

253

254

async def end_input(self) -> None:

255

"""Signal end of input."""

256

if self.connected:

257

await self.input_queue.put({"type": "end"})

258

259

# Usage

260

async def main():

261

transport = FileTransport("input.jsonl", "output.jsonl")

262

263

async for message in query(

264

prompt="Test file transport",

265

transport=transport

266

):

267

print(message)

268

```

269

270

### Debug Transport Wrapper

271

272

```python

273

import json

274

from typing import AsyncIterator, Any

275

from claude_code_sdk import Transport

276

277

class DebugTransport(Transport):

278

"""Wrapper transport that logs all communication for debugging."""

279

280

def __init__(self, wrapped_transport: Transport, log_file: str = "debug.log"):

281

self.wrapped = wrapped_transport

282

self.log_file = log_file

283

284

def log(self, direction: str, data: Any) -> None:

285

"""Log communication data."""

286

with open(self.log_file, "a") as f:

287

timestamp = __import__("datetime").datetime.now().isoformat()

288

f.write(f"[{timestamp}] {direction}: {json.dumps(data)}\n")

289

290

async def connect(self) -> None:

291

"""Connect wrapped transport with logging."""

292

self.log("CONNECT", {"action": "connecting"})

293

await self.wrapped.connect()

294

self.log("CONNECT", {"action": "connected"})

295

296

async def write(self, data: str) -> None:

297

"""Write data with logging."""

298

try:

299

parsed_data = json.loads(data.strip())

300

self.log("WRITE", parsed_data)

301

except json.JSONDecodeError:

302

self.log("WRITE", {"raw": data[:200]})

303

304

await self.wrapped.write(data)

305

306

async def read_messages(self) -> AsyncIterator[dict[str, Any]]:

307

"""Read messages with logging."""

308

async for message in self.wrapped.read_messages():

309

self.log("READ", message)

310

yield message

311

312

async def close(self) -> None:

313

"""Close wrapped transport with logging."""

314

self.log("CLOSE", {"action": "closing"})

315

await self.wrapped.close()

316

self.log("CLOSE", {"action": "closed"})

317

318

def is_ready(self) -> bool:

319

"""Check if wrapped transport is ready."""

320

return self.wrapped.is_ready()

321

322

async def end_input(self) -> None:

323

"""End input on wrapped transport with logging."""

324

self.log("END_INPUT", {"action": "ending_input"})

325

await self.wrapped.end_input()

326

327

# Usage

328

async def main():

329

# Wrap any existing transport with debug logging

330

base_transport = NetworkTransport("localhost", 8080)

331

debug_transport = DebugTransport(base_transport, "claude_debug.log")

332

333

async for message in query(

334

prompt="Debug this communication",

335

transport=debug_transport

336

):

337

print(message)

338

```

339

340

### Mock Transport for Testing

341

342

```python

343

import asyncio

344

import json

345

from typing import AsyncIterator, Any

346

from claude_code_sdk import Transport

347

348

class MockTransport(Transport):

349

"""Mock transport for testing that returns predefined responses."""

350

351

def __init__(self, responses: list[dict[str, Any]]):

352

self.responses = responses

353

self.response_index = 0

354

self.connected = False

355

self.requests: list[dict[str, Any]] = []

356

357

async def connect(self) -> None:

358

"""Mock connection."""

359

self.connected = True

360

361

async def write(self, data: str) -> None:

362

"""Record requests."""

363

if not self.connected:

364

raise RuntimeError("Transport not connected")

365

366

try:

367

request = json.loads(data.strip())

368

self.requests.append(request)

369

except json.JSONDecodeError:

370

pass

371

372

async def read_messages(self) -> AsyncIterator[dict[str, Any]]:

373

"""Return predefined responses."""

374

while self.connected and self.response_index < len(self.responses):

375

await asyncio.sleep(0.1) # Simulate delay

376

response = self.responses[self.response_index]

377

self.response_index += 1

378

yield response

379

380

async def close(self) -> None:

381

"""Mock close."""

382

self.connected = False

383

384

def is_ready(self) -> bool:

385

"""Mock ready check."""

386

return self.connected

387

388

async def end_input(self) -> None:

389

"""Mock end input."""

390

pass

391

392

def get_requests(self) -> list[dict[str, Any]]:

393

"""Get recorded requests for testing."""

394

return self.requests.copy()

395

396

# Usage in tests

397

async def test_query():

398

mock_responses = [

399

{

400

"type": "assistant",

401

"message": {

402

"role": "assistant",

403

"content": [{"type": "text", "text": "Hello! I'm a mock response."}]

404

}

405

},

406

{

407

"type": "result",

408

"subtype": "result",

409

"duration_ms": 100,

410

"duration_api_ms": 50,

411

"is_error": False,

412

"num_turns": 1,

413

"session_id": "test",

414

"total_cost_usd": 0.01

415

}

416

]

417

418

transport = MockTransport(mock_responses)

419

420

messages = []

421

async for message in query(

422

prompt="Test message",

423

transport=transport

424

):

425

messages.append(message)

426

427

# Verify requests were recorded

428

requests = transport.get_requests()

429

assert len(requests) == 1

430

assert requests[0]["message"]["content"] == "Test message"

431

432

# Verify responses were received

433

assert len(messages) == 2

434

```

435

436

## Transport Interface Requirements

437

438

### Connection Management

439

440

- `connect()`: Establish connection and prepare for communication

441

- `close()`: Clean up resources and close connection

442

- `is_ready()`: Return current connection status

443

444

### Communication

445

446

- `write(data)`: Send raw string data (usually JSON + newline)

447

- `read_messages()`: Return async iterator of parsed JSON messages

448

- `end_input()`: Signal end of input stream

449

450

### Error Handling

451

452

Transports should handle errors appropriately:

453

- Connection failures in `connect()`

454

- I/O errors in `write()` and `read_messages()`

455

- Resource cleanup in `close()`

456

457

### Message Format

458

459

**Outgoing messages** (to transport):

460

- JSON strings ending with newline

461

- Usually contain `type`, `message`, `session_id` fields

462

- Control messages for SDK features

463

464

**Incoming messages** (from transport):

465

- JSON objects with parsed message data

466

- Various types: user, assistant, system, result, stream events

467

- Processed by internal message parser

468

469

## Integration with SDK

470

471

### Default Transport

472

473

The SDK automatically selects the appropriate transport:

474

- `SubprocessCLITransport`: Default subprocess transport

475

- Custom transport via `transport` parameter

476

477

### Transport Configuration

478

479

```python

480

from claude_code_sdk import query, ClaudeSDKClient

481

482

# With query function

483

async for message in query(

484

prompt="Hello",

485

transport=custom_transport

486

):

487

print(message)

488

489

# With ClaudeSDKClient

490

client = ClaudeSDKClient()

491

await client.connect() # Uses default transport

492

493

# Custom transport would be configured differently

494

# (SDK client doesn't currently support custom transports in constructor)

495

```

496

497

### Query vs Client Integration

498

499

- `query()` function: Accepts custom transport via parameter

500

- `ClaudeSDKClient`: Uses internal transport selection (primarily subprocess)

501

502

## Important Warnings

503

504

### API Stability

505

506

The Transport interface is marked as internal and may change:

507

- Interface methods may be added, removed, or modified

508

- Custom implementations must be updated with SDK releases

509

- Not covered by semantic versioning guarantees

510

511

### Thread Safety

512

513

- Transport implementations should be async-safe

514

- Multiple concurrent operations may occur

515

- Proper synchronization required for shared resources

516

517

### Resource Management

518

519

- Implement proper cleanup in `close()`

520

- Handle connection failures gracefully

521

- Avoid resource leaks in long-running applications

522

523

For integration with other SDK components, see [Configuration and Options](./configuration-options.md) and [Simple Queries](./simple-queries.md).