0
# Streams and Channels
1
2
Low-level transport mechanisms and communication channels for RPyC connections. These classes provide the underlying transport layer for different communication methods including TCP sockets, SSL, pipes, and custom channels.
3
4
## Capabilities
5
6
### Stream Classes
7
8
Low-level stream implementations for different transport mechanisms.
9
10
```python { .api }
11
class SocketStream:
12
"""
13
TCP socket-based stream for network communication.
14
"""
15
16
def __init__(self, sock):
17
"""
18
Initialize socket stream.
19
20
Parameters:
21
- sock: TCP socket object
22
"""
23
24
@classmethod
25
def connect(cls, host, port, ipv6=False, keepalive=False):
26
"""
27
Create connection to remote host.
28
29
Parameters:
30
- host (str): Remote hostname or IP
31
- port (int): Remote port
32
- ipv6 (bool): Use IPv6 if True
33
- keepalive (bool): Enable TCP keepalive
34
35
Returns:
36
SocketStream: Connected socket stream
37
"""
38
39
def close(self):
40
"""Close the socket stream"""
41
42
def read(self, count):
43
"""
44
Read data from stream.
45
46
Parameters:
47
- count (int): Number of bytes to read
48
49
Returns:
50
bytes: Data read from stream
51
"""
52
53
def write(self, data):
54
"""
55
Write data to stream.
56
57
Parameters:
58
- data (bytes): Data to write
59
"""
60
61
@property
62
def closed(self) -> bool:
63
"""True if stream is closed"""
64
65
class TunneledSocketStream(SocketStream):
66
"""
67
Socket stream that operates through a tunnel (SSH, proxy, etc.).
68
"""
69
70
def __init__(self, sock):
71
"""
72
Initialize tunneled socket stream.
73
74
Parameters:
75
- sock: Tunneled socket object
76
"""
77
78
class PipeStream:
79
"""
80
Stream implementation using pipes for inter-process communication.
81
"""
82
83
def __init__(self, input, output):
84
"""
85
Initialize pipe stream.
86
87
Parameters:
88
- input: Input pipe/file object
89
- output: Output pipe/file object
90
"""
91
92
@classmethod
93
def create_pair(cls):
94
"""
95
Create pair of connected pipe streams.
96
97
Returns:
98
tuple: (stream1, stream2) connected pipe streams
99
"""
100
101
def close(self):
102
"""Close pipe stream"""
103
104
def read(self, count):
105
"""
106
Read data from input pipe.
107
108
Parameters:
109
- count (int): Number of bytes to read
110
111
Returns:
112
bytes: Data read from pipe
113
"""
114
115
def write(self, data):
116
"""
117
Write data to output pipe.
118
119
Parameters:
120
- data (bytes): Data to write
121
"""
122
123
@property
124
def closed(self) -> bool:
125
"""True if stream is closed"""
126
```
127
128
### Channel Classes
129
130
Higher-level channel abstractions built on top of streams.
131
132
```python { .api }
133
class Channel:
134
"""
135
Bidirectional communication channel with buffering and framing.
136
"""
137
138
def __init__(self, stream):
139
"""
140
Initialize channel.
141
142
Parameters:
143
- stream: Underlying stream object
144
"""
145
146
def close(self):
147
"""Close the channel"""
148
149
def send(self, data):
150
"""
151
Send data through channel.
152
153
Parameters:
154
- data (bytes): Data to send
155
"""
156
157
def recv(self):
158
"""
159
Receive data from channel.
160
161
Returns:
162
bytes: Received data
163
"""
164
165
def poll(self, timeout=0):
166
"""
167
Poll for available data.
168
169
Parameters:
170
- timeout (float): Timeout in seconds
171
172
Returns:
173
bool: True if data is available
174
"""
175
176
@property
177
def closed(self) -> bool:
178
"""True if channel is closed"""
179
180
class CompressedChannel(Channel):
181
"""
182
Channel with data compression for reduced bandwidth usage.
183
"""
184
185
def __init__(self, stream, compression_level=6):
186
"""
187
Initialize compressed channel.
188
189
Parameters:
190
- stream: Underlying stream object
191
- compression_level (int): Compression level (0-9)
192
"""
193
194
class EncryptedChannel(Channel):
195
"""
196
Channel with encryption for secure communication.
197
"""
198
199
def __init__(self, stream, key):
200
"""
201
Initialize encrypted channel.
202
203
Parameters:
204
- stream: Underlying stream object
205
- key (bytes): Encryption key
206
"""
207
```
208
209
### Stream Factories
210
211
Factory functions for creating different types of streams and channels.
212
213
```python { .api }
214
def create_socket_stream(host, port, ipv6=False, keepalive=False):
215
"""
216
Create TCP socket stream.
217
218
Parameters:
219
- host (str): Remote hostname
220
- port (int): Remote port
221
- ipv6 (bool): Use IPv6
222
- keepalive (bool): Enable keepalive
223
224
Returns:
225
SocketStream: Connected socket stream
226
"""
227
228
def create_ssl_stream(host, port, keyfile=None, certfile=None, ca_certs=None, **kwargs):
229
"""
230
Create SSL-encrypted socket stream.
231
232
Parameters:
233
- host (str): Remote hostname
234
- port (int): Remote port
235
- keyfile (str): Private key file path
236
- certfile (str): Certificate file path
237
- ca_certs (str): CA certificates file path
238
- kwargs: Additional SSL parameters
239
240
Returns:
241
SocketStream: SSL-encrypted socket stream
242
"""
243
244
def create_pipe_stream(cmd_args):
245
"""
246
Create pipe stream to subprocess.
247
248
Parameters:
249
- cmd_args (list): Command and arguments for subprocess
250
251
Returns:
252
PipeStream: Pipe stream to subprocess
253
"""
254
255
def create_unix_stream(path):
256
"""
257
Create Unix domain socket stream.
258
259
Parameters:
260
- path (str): Unix socket file path
261
262
Returns:
263
SocketStream: Unix socket stream
264
"""
265
```
266
267
### Buffering and Performance
268
269
Classes for optimizing stream performance through buffering and batching.
270
271
```python { .api }
272
class BufferedStream:
273
"""
274
Stream wrapper with read/write buffering for performance optimization.
275
"""
276
277
def __init__(self, stream, buffer_size=8192):
278
"""
279
Initialize buffered stream.
280
281
Parameters:
282
- stream: Underlying stream object
283
- buffer_size (int): Buffer size in bytes
284
"""
285
286
def flush(self):
287
"""Flush write buffer"""
288
289
def read(self, count):
290
"""Read with buffering"""
291
292
def write(self, data):
293
"""Write with buffering"""
294
295
class BatchingChannel(Channel):
296
"""
297
Channel that batches small messages for improved throughput.
298
"""
299
300
def __init__(self, stream, batch_size=10, batch_timeout=0.1):
301
"""
302
Initialize batching channel.
303
304
Parameters:
305
- stream: Underlying stream object
306
- batch_size (int): Maximum messages per batch
307
- batch_timeout (float): Maximum batch wait time
308
"""
309
```
310
311
## Examples
312
313
### Basic Stream Usage
314
315
```python
316
import rpyc
317
from rpyc.core import SocketStream, Channel
318
319
# Create socket stream
320
stream = SocketStream.connect('localhost', 12345)
321
322
# Create channel on top of stream
323
channel = Channel(stream)
324
325
# Send and receive data
326
channel.send(b'Hello RPyC')
327
response = channel.recv()
328
print("Response:", response)
329
330
# Cleanup
331
channel.close()
332
```
333
334
### SSL Stream Connection
335
336
```python
337
from rpyc.core import create_ssl_stream, Channel
338
339
# Create SSL stream with client certificates
340
ssl_stream = create_ssl_stream(
341
'secure-server.com', 18821,
342
keyfile='client.key',
343
certfile='client.crt',
344
ca_certs='ca.crt'
345
)
346
347
# Use with RPyC connection
348
channel = Channel(ssl_stream)
349
conn = rpyc.Connection(rpyc.VoidService, channel)
350
351
# Use connection
352
result = conn.root.some_function()
353
conn.close()
354
```
355
356
### Pipe Stream to Subprocess
357
358
```python
359
from rpyc.core import PipeStream
360
import rpyc
361
362
# Create pipe stream to subprocess running RPyC server
363
pipe_stream = PipeStream.create_pair()
364
365
# In real usage, you'd spawn subprocess with other end of pipe
366
# For demo, create connection directly
367
channel = rpyc.Channel(pipe_stream)
368
conn = rpyc.Connection(rpyc.ClassicService, channel)
369
370
# Use connection
371
conn.execute('print("Hello from subprocess")')
372
conn.close()
373
```
374
375
### Custom Stream Implementation
376
377
```python
378
from rpyc.core import Channel
379
import socket
380
import ssl
381
382
class CustomSecureStream:
383
"""Custom stream with additional security features"""
384
385
def __init__(self, host, port, auth_token):
386
# Create SSL socket
387
context = ssl.create_default_context()
388
sock = socket.create_connection((host, port))
389
self.ssl_sock = context.wrap_socket(sock, server_hostname=host)
390
391
# Send authentication token
392
self.ssl_sock.send(auth_token.encode())
393
response = self.ssl_sock.recv(100)
394
if response != b'AUTH_OK':
395
raise Exception("Authentication failed")
396
397
def read(self, count):
398
return self.ssl_sock.recv(count)
399
400
def write(self, data):
401
self.ssl_sock.send(data)
402
403
def close(self):
404
self.ssl_sock.close()
405
406
@property
407
def closed(self):
408
return self.ssl_sock._closed
409
410
# Use custom stream
411
custom_stream = CustomSecureStream('secure-host.com', 12345, 'secret_token')
412
channel = Channel(custom_stream)
413
conn = rpyc.Connection(rpyc.VoidService, channel)
414
415
# Use connection normally
416
result = conn.root.some_method()
417
conn.close()
418
```
419
420
### Compressed Communication
421
422
```python
423
from rpyc.core import SocketStream, CompressedChannel
424
import rpyc
425
426
# Create compressed channel for bandwidth efficiency
427
stream = SocketStream.connect('remote-host', 12345)
428
compressed_channel = CompressedChannel(stream, compression_level=9)
429
430
# Create connection with compression
431
conn = rpyc.Connection(rpyc.VoidService, compressed_channel)
432
433
# Large data transfers will be compressed automatically
434
large_data = list(range(10000))
435
result = conn.root.process_large_data(large_data)
436
437
conn.close()
438
```
439
440
### Performance Optimized Streaming
441
442
```python
443
from rpyc.core import SocketStream, BufferedStream, BatchingChannel
444
import rpyc
445
446
# Create performance-optimized connection
447
base_stream = SocketStream.connect('high-throughput-server', 12345)
448
buffered_stream = BufferedStream(base_stream, buffer_size=32768) # 32KB buffer
449
batching_channel = BatchingChannel(buffered_stream, batch_size=20, batch_timeout=0.05)
450
451
# Create connection
452
conn = rpyc.Connection(rpyc.VoidService, batching_channel)
453
454
# High-frequency operations benefit from batching and buffering
455
for i in range(1000):
456
result = conn.root.quick_operation(i)
457
458
conn.close()
459
```
460
461
### Unix Domain Socket Connection
462
463
```python
464
from rpyc.core import create_unix_stream, Channel
465
import rpyc
466
467
# Create Unix socket stream for local IPC
468
unix_stream = create_unix_stream('/tmp/rpyc.sock')
469
channel = Channel(unix_stream)
470
471
# Create connection
472
conn = rpyc.Connection(rpyc.ClassicService, channel)
473
474
# Use for local inter-process communication
475
local_files = conn.modules.os.listdir('/tmp')
476
print("Local temp files:", local_files)
477
478
conn.close()
479
```
480
481
### Multi-threaded Stream Pool
482
483
```python
484
from rpyc.core import SocketStream, Channel
485
import rpyc
486
import threading
487
import queue
488
489
class StreamPool:
490
"""Pool of reusable streams for connection management"""
491
492
def __init__(self, host, port, pool_size=10):
493
self.host = host
494
self.port = port
495
self.pool = queue.Queue()
496
497
# Create initial pool
498
for _ in range(pool_size):
499
stream = SocketStream.connect(host, port)
500
self.pool.put(stream)
501
502
def get_connection(self):
503
"""Get connection from pool"""
504
stream = self.pool.get()
505
channel = Channel(stream)
506
return rpyc.Connection(rpyc.VoidService, channel)
507
508
def return_connection(self, conn):
509
"""Return connection to pool"""
510
self.pool.put(conn._channel.stream)
511
conn.close()
512
513
# Use stream pool for efficient connection reuse
514
pool = StreamPool('server.example.com', 12345, pool_size=20)
515
516
def worker_thread(thread_id):
517
"""Worker thread using pooled connections"""
518
for i in range(100):
519
conn = pool.get_connection()
520
try:
521
result = conn.root.work_function(thread_id, i)
522
print(f"Thread {thread_id}, task {i}: {result}")
523
finally:
524
pool.return_connection(conn)
525
526
# Start multiple worker threads
527
threads = []
528
for tid in range(10):
529
t = threading.Thread(target=worker_thread, args=(tid,))
530
t.start()
531
threads.append(t)
532
533
# Wait for completion
534
for t in threads:
535
t.join()
536
```
537
538
### Stream Monitoring and Debugging
539
540
```python
541
from rpyc.core import SocketStream, Channel
542
import rpyc
543
import time
544
545
class MonitoredStream:
546
"""Stream wrapper that monitors traffic for debugging"""
547
548
def __init__(self, stream):
549
self.stream = stream
550
self.bytes_sent = 0
551
self.bytes_received = 0
552
self.start_time = time.time()
553
554
def read(self, count):
555
data = self.stream.read(count)
556
self.bytes_received += len(data)
557
print(f"READ: {len(data)} bytes (total: {self.bytes_received})")
558
return data
559
560
def write(self, data):
561
self.stream.write(data)
562
self.bytes_sent += len(data)
563
print(f"WRITE: {len(data)} bytes (total: {self.bytes_sent})")
564
565
def close(self):
566
duration = time.time() - self.start_time
567
print(f"Stream closed after {duration:.2f}s")
568
print(f"Total sent: {self.bytes_sent} bytes")
569
print(f"Total received: {self.bytes_received} bytes")
570
self.stream.close()
571
572
@property
573
def closed(self):
574
return self.stream.closed
575
576
# Use monitored stream for debugging
577
base_stream = SocketStream.connect('localhost', 12345)
578
monitored_stream = MonitoredStream(base_stream)
579
channel = Channel(monitored_stream)
580
581
conn = rpyc.Connection(rpyc.VoidService, channel)
582
583
# Operations will show traffic monitoring
584
result = conn.root.some_operation()
585
conn.close() # Shows traffic summary
586
```
587
588
## Constants
589
590
```python { .api }
591
DEFAULT_BUFFER_SIZE = 8192 # Default stream buffer size
592
MAX_BUFFER_SIZE = 1048576 # Maximum buffer size (1MB)
593
DEFAULT_BATCH_SIZE = 10 # Default message batch size
594
DEFAULT_BATCH_TIMEOUT = 0.1 # Default batch timeout (seconds)
595
DEFAULT_COMPRESSION_LEVEL = 6 # Default compression level
596
STREAM_CHUNK_SIZE = 64000 # Default chunk size for large transfers
597
```
598
599
## Exceptions
600
601
```python { .api }
602
class StreamError(Exception):
603
"""Base exception for stream operations"""
604
605
class ConnectionLostError(StreamError):
606
"""Raised when stream connection is lost"""
607
608
class BufferOverflowError(StreamError):
609
"""Raised when buffer size limits are exceeded"""
610
611
class CompressionError(StreamError):
612
"""Raised when compression/decompression fails"""
613
614
class EncryptionError(StreamError):
615
"""Raised when encryption/decryption fails"""
616
```