or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication-and-security.mdclassic-mode.mdcli-tools.mdconnection-factory.mdindex.mdregistry-and-discovery.mdservers.mdservices-protocols.mdstreams-and-channels.mdutilities.md

streams-and-channels.mddocs/

0

# Streams and Channels

1

2

Low-level transport mechanisms and communication channels for RPyC connections. These classes provide the underlying transport layer for different communication methods including TCP sockets, SSL, pipes, and custom channels.

3

4

## Capabilities

5

6

### Stream Classes

7

8

Low-level stream implementations for different transport mechanisms.

9

10

```python { .api }

11

class SocketStream:

12

"""

13

TCP socket-based stream for network communication.

14

"""

15

16

def __init__(self, sock):

17

"""

18

Initialize socket stream.

19

20

Parameters:

21

- sock: TCP socket object

22

"""

23

24

@classmethod

25

def connect(cls, host, port, ipv6=False, keepalive=False):

26

"""

27

Create connection to remote host.

28

29

Parameters:

30

- host (str): Remote hostname or IP

31

- port (int): Remote port

32

- ipv6 (bool): Use IPv6 if True

33

- keepalive (bool): Enable TCP keepalive

34

35

Returns:

36

SocketStream: Connected socket stream

37

"""

38

39

def close(self):

40

"""Close the socket stream"""

41

42

def read(self, count):

43

"""

44

Read data from stream.

45

46

Parameters:

47

- count (int): Number of bytes to read

48

49

Returns:

50

bytes: Data read from stream

51

"""

52

53

def write(self, data):

54

"""

55

Write data to stream.

56

57

Parameters:

58

- data (bytes): Data to write

59

"""

60

61

@property

62

def closed(self) -> bool:

63

"""True if stream is closed"""

64

65

class TunneledSocketStream(SocketStream):

66

"""

67

Socket stream that operates through a tunnel (SSH, proxy, etc.).

68

"""

69

70

def __init__(self, sock):

71

"""

72

Initialize tunneled socket stream.

73

74

Parameters:

75

- sock: Tunneled socket object

76

"""

77

78

class PipeStream:

79

"""

80

Stream implementation using pipes for inter-process communication.

81

"""

82

83

def __init__(self, input, output):

84

"""

85

Initialize pipe stream.

86

87

Parameters:

88

- input: Input pipe/file object

89

- output: Output pipe/file object

90

"""

91

92

@classmethod

93

def create_pair(cls):

94

"""

95

Create pair of connected pipe streams.

96

97

Returns:

98

tuple: (stream1, stream2) connected pipe streams

99

"""

100

101

def close(self):

102

"""Close pipe stream"""

103

104

def read(self, count):

105

"""

106

Read data from input pipe.

107

108

Parameters:

109

- count (int): Number of bytes to read

110

111

Returns:

112

bytes: Data read from pipe

113

"""

114

115

def write(self, data):

116

"""

117

Write data to output pipe.

118

119

Parameters:

120

- data (bytes): Data to write

121

"""

122

123

@property

124

def closed(self) -> bool:

125

"""True if stream is closed"""

126

```

127

128

### Channel Classes

129

130

Higher-level channel abstractions built on top of streams.

131

132

```python { .api }

133

class Channel:

134

"""

135

Bidirectional communication channel with buffering and framing.

136

"""

137

138

def __init__(self, stream):

139

"""

140

Initialize channel.

141

142

Parameters:

143

- stream: Underlying stream object

144

"""

145

146

def close(self):

147

"""Close the channel"""

148

149

def send(self, data):

150

"""

151

Send data through channel.

152

153

Parameters:

154

- data (bytes): Data to send

155

"""

156

157

def recv(self):

158

"""

159

Receive data from channel.

160

161

Returns:

162

bytes: Received data

163

"""

164

165

def poll(self, timeout=0):

166

"""

167

Poll for available data.

168

169

Parameters:

170

- timeout (float): Timeout in seconds

171

172

Returns:

173

bool: True if data is available

174

"""

175

176

@property

177

def closed(self) -> bool:

178

"""True if channel is closed"""

179

180

class CompressedChannel(Channel):

181

"""

182

Channel with data compression for reduced bandwidth usage.

183

"""

184

185

def __init__(self, stream, compression_level=6):

186

"""

187

Initialize compressed channel.

188

189

Parameters:

190

- stream: Underlying stream object

191

- compression_level (int): Compression level (0-9)

192

"""

193

194

class EncryptedChannel(Channel):

195

"""

196

Channel with encryption for secure communication.

197

"""

198

199

def __init__(self, stream, key):

200

"""

201

Initialize encrypted channel.

202

203

Parameters:

204

- stream: Underlying stream object

205

- key (bytes): Encryption key

206

"""

207

```

208

209

### Stream Factories

210

211

Factory functions for creating different types of streams and channels.

212

213

```python { .api }

214

def create_socket_stream(host, port, ipv6=False, keepalive=False):

215

"""

216

Create TCP socket stream.

217

218

Parameters:

219

- host (str): Remote hostname

220

- port (int): Remote port

221

- ipv6 (bool): Use IPv6

222

- keepalive (bool): Enable keepalive

223

224

Returns:

225

SocketStream: Connected socket stream

226

"""

227

228

def create_ssl_stream(host, port, keyfile=None, certfile=None, ca_certs=None, **kwargs):

229

"""

230

Create SSL-encrypted socket stream.

231

232

Parameters:

233

- host (str): Remote hostname

234

- port (int): Remote port

235

- keyfile (str): Private key file path

236

- certfile (str): Certificate file path

237

- ca_certs (str): CA certificates file path

238

- kwargs: Additional SSL parameters

239

240

Returns:

241

SocketStream: SSL-encrypted socket stream

242

"""

243

244

def create_pipe_stream(cmd_args):

245

"""

246

Create pipe stream to subprocess.

247

248

Parameters:

249

- cmd_args (list): Command and arguments for subprocess

250

251

Returns:

252

PipeStream: Pipe stream to subprocess

253

"""

254

255

def create_unix_stream(path):

256

"""

257

Create Unix domain socket stream.

258

259

Parameters:

260

- path (str): Unix socket file path

261

262

Returns:

263

SocketStream: Unix socket stream

264

"""

265

```

266

267

### Buffering and Performance

268

269

Classes for optimizing stream performance through buffering and batching.

270

271

```python { .api }

272

class BufferedStream:

273

"""

274

Stream wrapper with read/write buffering for performance optimization.

275

"""

276

277

def __init__(self, stream, buffer_size=8192):

278

"""

279

Initialize buffered stream.

280

281

Parameters:

282

- stream: Underlying stream object

283

- buffer_size (int): Buffer size in bytes

284

"""

285

286

def flush(self):

287

"""Flush write buffer"""

288

289

def read(self, count):

290

"""Read with buffering"""

291

292

def write(self, data):

293

"""Write with buffering"""

294

295

class BatchingChannel(Channel):

296

"""

297

Channel that batches small messages for improved throughput.

298

"""

299

300

def __init__(self, stream, batch_size=10, batch_timeout=0.1):

301

"""

302

Initialize batching channel.

303

304

Parameters:

305

- stream: Underlying stream object

306

- batch_size (int): Maximum messages per batch

307

- batch_timeout (float): Maximum batch wait time

308

"""

309

```

310

311

## Examples

312

313

### Basic Stream Usage

314

315

```python

316

import rpyc

317

from rpyc.core import SocketStream, Channel

318

319

# Create socket stream

320

stream = SocketStream.connect('localhost', 12345)

321

322

# Create channel on top of stream

323

channel = Channel(stream)

324

325

# Send and receive data

326

channel.send(b'Hello RPyC')

327

response = channel.recv()

328

print("Response:", response)

329

330

# Cleanup

331

channel.close()

332

```

333

334

### SSL Stream Connection

335

336

```python

337

from rpyc.core import create_ssl_stream, Channel

338

339

# Create SSL stream with client certificates

340

ssl_stream = create_ssl_stream(

341

'secure-server.com', 18821,

342

keyfile='client.key',

343

certfile='client.crt',

344

ca_certs='ca.crt'

345

)

346

347

# Use with RPyC connection

348

channel = Channel(ssl_stream)

349

conn = rpyc.Connection(rpyc.VoidService, channel)

350

351

# Use connection

352

result = conn.root.some_function()

353

conn.close()

354

```

355

356

### Pipe Stream to Subprocess

357

358

```python

359

from rpyc.core import PipeStream

360

import rpyc

361

362

# Create pipe stream to subprocess running RPyC server

363

pipe_stream = PipeStream.create_pair()

364

365

# In real usage, you'd spawn subprocess with other end of pipe

366

# For demo, create connection directly

367

channel = rpyc.Channel(pipe_stream)

368

conn = rpyc.Connection(rpyc.ClassicService, channel)

369

370

# Use connection

371

conn.execute('print("Hello from subprocess")')

372

conn.close()

373

```

374

375

### Custom Stream Implementation

376

377

```python

378

from rpyc.core import Channel

379

import socket

380

import ssl

381

382

class CustomSecureStream:

383

"""Custom stream with additional security features"""

384

385

def __init__(self, host, port, auth_token):

386

# Create SSL socket

387

context = ssl.create_default_context()

388

sock = socket.create_connection((host, port))

389

self.ssl_sock = context.wrap_socket(sock, server_hostname=host)

390

391

# Send authentication token

392

self.ssl_sock.send(auth_token.encode())

393

response = self.ssl_sock.recv(100)

394

if response != b'AUTH_OK':

395

raise Exception("Authentication failed")

396

397

def read(self, count):

398

return self.ssl_sock.recv(count)

399

400

def write(self, data):

401

self.ssl_sock.send(data)

402

403

def close(self):

404

self.ssl_sock.close()

405

406

@property

407

def closed(self):

408

return self.ssl_sock._closed

409

410

# Use custom stream

411

custom_stream = CustomSecureStream('secure-host.com', 12345, 'secret_token')

412

channel = Channel(custom_stream)

413

conn = rpyc.Connection(rpyc.VoidService, channel)

414

415

# Use connection normally

416

result = conn.root.some_method()

417

conn.close()

418

```

419

420

### Compressed Communication

421

422

```python

423

from rpyc.core import SocketStream, CompressedChannel

424

import rpyc

425

426

# Create compressed channel for bandwidth efficiency

427

stream = SocketStream.connect('remote-host', 12345)

428

compressed_channel = CompressedChannel(stream, compression_level=9)

429

430

# Create connection with compression

431

conn = rpyc.Connection(rpyc.VoidService, compressed_channel)

432

433

# Large data transfers will be compressed automatically

434

large_data = list(range(10000))

435

result = conn.root.process_large_data(large_data)

436

437

conn.close()

438

```

439

440

### Performance Optimized Streaming

441

442

```python

443

from rpyc.core import SocketStream, BufferedStream, BatchingChannel

444

import rpyc

445

446

# Create performance-optimized connection

447

base_stream = SocketStream.connect('high-throughput-server', 12345)

448

buffered_stream = BufferedStream(base_stream, buffer_size=32768) # 32KB buffer

449

batching_channel = BatchingChannel(buffered_stream, batch_size=20, batch_timeout=0.05)

450

451

# Create connection

452

conn = rpyc.Connection(rpyc.VoidService, batching_channel)

453

454

# High-frequency operations benefit from batching and buffering

455

for i in range(1000):

456

result = conn.root.quick_operation(i)

457

458

conn.close()

459

```

460

461

### Unix Domain Socket Connection

462

463

```python

464

from rpyc.core import create_unix_stream, Channel

465

import rpyc

466

467

# Create Unix socket stream for local IPC

468

unix_stream = create_unix_stream('/tmp/rpyc.sock')

469

channel = Channel(unix_stream)

470

471

# Create connection

472

conn = rpyc.Connection(rpyc.ClassicService, channel)

473

474

# Use for local inter-process communication

475

local_files = conn.modules.os.listdir('/tmp')

476

print("Local temp files:", local_files)

477

478

conn.close()

479

```

480

481

### Multi-threaded Stream Pool

482

483

```python

484

from rpyc.core import SocketStream, Channel

485

import rpyc

486

import threading

487

import queue

488

489

class StreamPool:

490

"""Pool of reusable streams for connection management"""

491

492

def __init__(self, host, port, pool_size=10):

493

self.host = host

494

self.port = port

495

self.pool = queue.Queue()

496

497

# Create initial pool

498

for _ in range(pool_size):

499

stream = SocketStream.connect(host, port)

500

self.pool.put(stream)

501

502

def get_connection(self):

503

"""Get connection from pool"""

504

stream = self.pool.get()

505

channel = Channel(stream)

506

return rpyc.Connection(rpyc.VoidService, channel)

507

508

def return_connection(self, conn):

509

"""Return connection to pool"""

510

self.pool.put(conn._channel.stream)

511

conn.close()

512

513

# Use stream pool for efficient connection reuse

514

pool = StreamPool('server.example.com', 12345, pool_size=20)

515

516

def worker_thread(thread_id):

517

"""Worker thread using pooled connections"""

518

for i in range(100):

519

conn = pool.get_connection()

520

try:

521

result = conn.root.work_function(thread_id, i)

522

print(f"Thread {thread_id}, task {i}: {result}")

523

finally:

524

pool.return_connection(conn)

525

526

# Start multiple worker threads

527

threads = []

528

for tid in range(10):

529

t = threading.Thread(target=worker_thread, args=(tid,))

530

t.start()

531

threads.append(t)

532

533

# Wait for completion

534

for t in threads:

535

t.join()

536

```

537

538

### Stream Monitoring and Debugging

539

540

```python

541

from rpyc.core import SocketStream, Channel

542

import rpyc

543

import time

544

545

class MonitoredStream:

546

"""Stream wrapper that monitors traffic for debugging"""

547

548

def __init__(self, stream):

549

self.stream = stream

550

self.bytes_sent = 0

551

self.bytes_received = 0

552

self.start_time = time.time()

553

554

def read(self, count):

555

data = self.stream.read(count)

556

self.bytes_received += len(data)

557

print(f"READ: {len(data)} bytes (total: {self.bytes_received})")

558

return data

559

560

def write(self, data):

561

self.stream.write(data)

562

self.bytes_sent += len(data)

563

print(f"WRITE: {len(data)} bytes (total: {self.bytes_sent})")

564

565

def close(self):

566

duration = time.time() - self.start_time

567

print(f"Stream closed after {duration:.2f}s")

568

print(f"Total sent: {self.bytes_sent} bytes")

569

print(f"Total received: {self.bytes_received} bytes")

570

self.stream.close()

571

572

@property

573

def closed(self):

574

return self.stream.closed

575

576

# Use monitored stream for debugging

577

base_stream = SocketStream.connect('localhost', 12345)

578

monitored_stream = MonitoredStream(base_stream)

579

channel = Channel(monitored_stream)

580

581

conn = rpyc.Connection(rpyc.VoidService, channel)

582

583

# Operations will show traffic monitoring

584

result = conn.root.some_operation()

585

conn.close() # Shows traffic summary

586

```

587

588

## Constants

589

590

```python { .api }

591

DEFAULT_BUFFER_SIZE = 8192 # Default stream buffer size

592

MAX_BUFFER_SIZE = 1048576 # Maximum buffer size (1MB)

593

DEFAULT_BATCH_SIZE = 10 # Default message batch size

594

DEFAULT_BATCH_TIMEOUT = 0.1 # Default batch timeout (seconds)

595

DEFAULT_COMPRESSION_LEVEL = 6 # Default compression level

596

STREAM_CHUNK_SIZE = 64000 # Default chunk size for large transfers

597

```

598

599

## Exceptions

600

601

```python { .api }

602

class StreamError(Exception):

603

"""Base exception for stream operations"""

604

605

class ConnectionLostError(StreamError):

606

"""Raised when stream connection is lost"""

607

608

class BufferOverflowError(StreamError):

609

"""Raised when buffer size limits are exceeded"""

610

611

class CompressionError(StreamError):

612

"""Raised when compression/decompression fails"""

613

614

class EncryptionError(StreamError):

615

"""Raised when encryption/decryption fails"""

616

```