0
# Utilities and Advanced Usage
1
2
Helper functions for URI handling, byte ranges, parallel processing, and custom transport development. These utilities provide advanced functionality and extensibility for power users and library developers.
3
4
## Capabilities
5
6
### URI and URL Utilities
7
8
Advanced URI parsing and manipulation functions.
9
10
```python { .api }
11
def safe_urlsplit(url):
12
"""URL split that handles question marks in S3/GS URLs.
13
14
Parameters:
15
url: str - URL to split, may contain question marks in path
16
17
Returns:
18
urllib.parse.SplitResult - Parsed URL components
19
20
Notes:
21
Handles special case where S3/GCS object keys contain '?' characters
22
which would normally be interpreted as query string delimiters
23
"""
24
25
def make_range_string(start=None, stop=None):
26
"""Create HTTP byte range specifier string.
27
28
Parameters:
29
start: int - Starting byte position (inclusive)
30
stop: int - Ending byte position (inclusive)
31
32
Returns:
33
str - Range string like 'bytes=0-1023' or 'bytes=1000-'
34
35
Notes:
36
Used for HTTP Range requests and partial content retrieval
37
"""
38
39
def parse_content_range(content_range):
40
"""Parse HTTP Content-Range header value.
41
42
Parameters:
43
content_range: str - Content-Range header value
44
45
Returns:
46
tuple - (units, start, stop, length) parsed from header
47
48
Example:
49
parse_content_range('bytes 0-1023/2048') -> ('bytes', 0, 1023, 2048)
50
"""
51
```
52
53
### Utility Constants
54
55
```python { .api }
56
# URL schemes requiring special handling
57
WORKAROUND_SCHEMES = ['s3', 's3n', 's3u', 's3a', 'gs']
58
59
# Placeholder for question marks in URLs
60
QUESTION_MARK_PLACEHOLDER = '///smart_open.utils.QUESTION_MARK_PLACEHOLDER///'
61
```
62
63
### Function Introspection
64
65
Utilities for examining and validating function parameters.
66
67
```python { .api }
68
def inspect_kwargs(callable_obj):
69
"""Inspect function signature for supported keyword arguments.
70
71
Parameters:
72
callable_obj: callable - Function or method to inspect
73
74
Returns:
75
dict - Mapping of argument names to their default values
76
77
Notes:
78
Used internally to validate transport_params against function signatures
79
"""
80
81
def check_kwargs(callable_obj, kwargs):
82
"""Filter kwargs to only include supported parameters, log warnings for unsupported.
83
84
Parameters:
85
callable_obj: callable - Function to check against
86
kwargs: dict - Keyword arguments to filter
87
88
Returns:
89
dict - Filtered kwargs containing only supported parameters
90
91
Notes:
92
Logs warnings for unsupported kwargs that are filtered out
93
"""
94
95
def clamp(value, minval=0, maxval=None):
96
"""Clamp numeric value to specified range.
97
98
Parameters:
99
value: number - Value to clamp
100
minval: number - Minimum allowed value
101
maxval: number - Maximum allowed value (None for no limit)
102
103
Returns:
104
number - Clamped value within [minval, maxval] range
105
"""
106
```
107
108
### Enhanced I/O Classes
109
110
Advanced file-like object wrappers with additional functionality.
111
112
```python { .api }
113
class TextIOWrapper(io.TextIOWrapper):
114
"""Enhanced TextIOWrapper with improved exception handling.
115
116
Provides better error reporting and handling for text mode operations
117
over binary file objects from various transport layers.
118
"""
119
120
class FileLikeProxy(wrapt.ObjectProxy):
121
"""Proxy that manages relationships between inner and outer file objects.
122
123
Coordinates operations between compression layers and transport layers,
124
ensuring proper resource management and method delegation.
125
"""
126
```
127
128
### Concurrency Utilities
129
130
Parallel processing support for I/O operations.
131
132
```python { .api }
133
def create_pool(processes=1):
134
"""Create process or thread pool for parallel operations.
135
136
Parameters:
137
processes: int - Number of worker processes/threads
138
139
Returns:
140
Pool object with imap_unordered() method for parallel iteration
141
142
Notes:
143
Automatically selects between multiprocessing and threading based on availability
144
Returns DummyPool for single-process fallback when multiprocessing unavailable
145
"""
146
147
class DummyPool:
148
"""Fallback pool implementation when multiprocessing is unavailable.
149
150
Provides same interface as multiprocessing.Pool but executes sequentially.
151
"""
152
153
def imap_unordered(self, func, iterable):
154
"""Sequential map implementation."""
155
156
class ConcurrentFuturesPool:
157
"""Thread-based pool using concurrent.futures.ThreadPoolExecutor.
158
159
Alternative to multiprocessing for I/O-bound parallel operations.
160
"""
161
162
def imap_unordered(self, func, iterable):
163
"""Parallel map using thread pool."""
164
```
165
166
### Byte Buffer Operations
167
168
Efficient byte buffer for network I/O operations.
169
170
```python { .api }
171
class ByteBuffer:
172
"""Efficient byte buffer for streaming network I/O.
173
174
Provides buffering layer between network reads and application consumption,
175
optimizing for both small reads and bulk operations.
176
"""
177
178
def fill(self, reader):
179
"""Fill buffer from reader function.
180
181
Parameters:
182
reader: callable - Function that returns bytes when called
183
"""
184
185
def read(self, size=-1):
186
"""Read bytes from buffer.
187
188
Parameters:
189
size: int - Number of bytes to read (-1 for all available)
190
191
Returns:
192
bytes - Data read from buffer
193
"""
194
195
def peek(self):
196
"""Peek at buffer contents without consuming.
197
198
Returns:
199
bytes - Current buffer contents
200
"""
201
202
def readline(self, terminator=b'\n'):
203
"""Read line from buffer up to terminator.
204
205
Parameters:
206
terminator: bytes - Line terminator to search for
207
208
Returns:
209
bytes - Line including terminator
210
"""
211
212
def empty(self):
213
"""Empty the buffer, discarding all contents."""
214
```
215
216
### Transport System
217
218
Transport registration and management system.
219
220
```python { .api }
221
def register_transport(submodule):
222
"""Register transport module for URI schemes.
223
224
Parameters:
225
submodule: module or str - Transport module or module name to register
226
227
Notes:
228
Module must have SCHEME/SCHEMES attribute and open, open_uri, parse_uri functions
229
Automatically handles import errors for optional dependencies
230
"""
231
232
def get_transport(scheme):
233
"""Get transport module for URI scheme.
234
235
Parameters:
236
scheme: str - URI scheme (e.g., 's3', 'http', 'ftp')
237
238
Returns:
239
module - Transport module implementing the scheme
240
241
Raises:
242
ImportError - If required dependencies for scheme are missing
243
NotImplementedError - If scheme is not supported
244
"""
245
246
# Transport registry constants
247
NO_SCHEME = '' # Used for local file operations
248
SUPPORTED_SCHEMES = tuple(sorted(_REGISTRY.keys())) # All registered schemes
249
```
250
251
### Core Constants
252
253
Shared constants used throughout the library.
254
255
```python { .api }
256
# Binary mode constants
257
READ_BINARY = 'rb'
258
WRITE_BINARY = 'wb'
259
BINARY_MODES = (READ_BINARY, WRITE_BINARY)
260
BINARY_NEWLINE = b'\n'
261
262
# Seek operation constants
263
WHENCE_START = 0 # Seek from beginning of file
264
WHENCE_CURRENT = 1 # Seek from current position
265
WHENCE_END = 2 # Seek from end of file
266
WHENCE_CHOICES = (WHENCE_START, WHENCE_CURRENT, WHENCE_END)
267
```
268
269
## Usage Examples
270
271
### URI Manipulation
272
273
```python
274
from smart_open.utils import safe_urlsplit, make_range_string, parse_content_range
275
276
# Handle URLs with question marks in path (common in S3/GCS)
277
problematic_url = 's3://bucket/file?with?questions.txt'
278
parsed = safe_urlsplit(problematic_url)
279
print(f"Scheme: {parsed.scheme}, Path: {parsed.path}")
280
281
# Create HTTP range requests
282
range_header = make_range_string(0, 1023) # "bytes=0-1023"
283
range_header_open = make_range_string(1000) # "bytes=1000-"
284
285
# Parse Content-Range responses
286
content_range = "bytes 0-1023/2048"
287
units, start, stop, total = parse_content_range(content_range)
288
print(f"Retrieved bytes {start}-{stop} of {total} total")
289
```
290
291
### Parameter Validation
292
293
```python
294
from smart_open.utils import inspect_kwargs, check_kwargs
295
296
def my_transport_function(uri, mode, custom_param=None, buffer_size=8192):
297
"""Example transport function."""
298
pass
299
300
# Inspect function signature
301
supported_params = inspect_kwargs(my_transport_function)
302
print(f"Supported parameters: {list(supported_params.keys())}")
303
304
# Filter user-provided parameters
305
user_params = {
306
'custom_param': 'value',
307
'buffer_size': 4096,
308
'unsupported_param': 'ignored' # This will be filtered out
309
}
310
valid_params = check_kwargs(my_transport_function, user_params)
311
# valid_params = {'custom_param': 'value', 'buffer_size': 4096}
312
```
313
314
### Parallel Processing
315
316
```python
317
from smart_open.concurrency import create_pool
318
from smart_open import open
319
320
def process_file(uri):
321
"""Process a single file and return results."""
322
with open(uri, 'rb') as f:
323
content = f.read()
324
return len(content) # Example: return file size
325
326
# Parallel file processing
327
file_uris = [
328
's3://bucket/file1.txt',
329
's3://bucket/file2.txt',
330
'gs://bucket/file3.txt',
331
'azure://container/file4.txt'
332
]
333
334
# Process files in parallel
335
with create_pool(processes=4) as pool:
336
results = list(pool.imap_unordered(process_file, file_uris))
337
338
print(f"Processed {len(results)} files, total size: {sum(results)} bytes")
339
340
# Sequential fallback when multiprocessing unavailable
341
from smart_open.concurrency import DummyPool
342
343
with DummyPool() as pool:
344
results = list(pool.imap_unordered(process_file, file_uris))
345
```
346
347
### Byte Buffer Usage
348
349
```python
350
from smart_open.bytebuffer import ByteBuffer
351
import socket
352
353
# Network reading with buffering
354
def read_from_socket(sock):
355
buffer = ByteBuffer()
356
357
# Fill buffer from socket
358
def socket_reader():
359
try:
360
return sock.recv(4096)
361
except socket.timeout:
362
return b''
363
364
buffer.fill(socket_reader)
365
366
# Read lines from buffer
367
while True:
368
try:
369
line = buffer.readline()
370
if not line:
371
break
372
yield line.decode('utf-8').strip()
373
except Exception:
374
break
375
376
# Usage with smart-open
377
from smart_open import open
378
379
with open('http://example.com/stream-data.txt', 'rb') as f:
380
buffer = ByteBuffer()
381
382
# Fill buffer from HTTP stream
383
buffer.fill(lambda: f.read(8192))
384
385
# Process line by line
386
while True:
387
line = buffer.readline()
388
if not line:
389
break
390
process_line(line)
391
```
392
393
### Custom Transport Development
394
395
```python
396
from smart_open.transport import register_transport
397
398
# Create custom transport module
399
class CustomTransport:
400
SCHEME = 'custom'
401
402
@staticmethod
403
def parse_uri(uri_as_string):
404
"""Parse custom URI format."""
405
# Implementation specific to custom scheme
406
return {'scheme': 'custom', 'path': uri_as_string[9:]} # Remove 'custom://'
407
408
@staticmethod
409
def open_uri(uri, mode, transport_params):
410
"""Open custom URI with transport parameters."""
411
parsed = CustomTransport.parse_uri(uri)
412
return CustomTransport.open(parsed['path'], mode)
413
414
@staticmethod
415
def open(path, mode):
416
"""Open custom resource."""
417
# Custom implementation
418
if 'r' in mode:
419
return CustomReader(path)
420
else:
421
return CustomWriter(path)
422
423
class CustomReader:
424
def __init__(self, path):
425
self.path = path
426
self._closed = False
427
428
def read(self, size=-1):
429
"""Read from custom source."""
430
return f"Data from {self.path}".encode()
431
432
def close(self):
433
self._closed = True
434
435
class CustomWriter:
436
def __init__(self, path):
437
self.path = path
438
self._closed = False
439
440
def write(self, data):
441
"""Write to custom destination."""
442
print(f"Writing to {self.path}: {data}")
443
return len(data)
444
445
def close(self):
446
self._closed = True
447
448
# Register the custom transport
449
register_transport(CustomTransport)
450
451
# Now custom:// URLs work with smart-open
452
from smart_open import open
453
454
with open('custom://my-resource', 'rb') as f:
455
data = f.read()
456
457
with open('custom://output-resource', 'wb') as f:
458
f.write(b'Hello custom transport!')
459
```
460
461
### Advanced HTTP Operations
462
463
```python
464
from smart_open.utils import make_range_string
465
from smart_open import open
466
import requests
467
468
# Partial file downloads using Range requests
469
def download_file_range(url, start_byte, end_byte):
470
"""Download specific byte range from HTTP resource."""
471
range_header = make_range_string(start_byte, end_byte)
472
transport_params = {
473
'headers': {'Range': range_header}
474
}
475
476
with open(url, 'rb', transport_params=transport_params) as f:
477
return f.read()
478
479
# Download file in chunks
480
def chunked_download(url, chunk_size=1024*1024):
481
"""Download large file in chunks to avoid memory issues."""
482
# First, get file size
483
response = requests.head(url)
484
content_length = int(response.headers.get('Content-Length', 0))
485
486
chunks = []
487
for start in range(0, content_length, chunk_size):
488
end = min(start + chunk_size - 1, content_length - 1)
489
chunk = download_file_range(url, start, end)
490
chunks.append(chunk)
491
492
return b''.join(chunks)
493
494
# Usage
495
large_file_data = chunked_download('https://example.com/large-file.dat')
496
```
497
498
### Error Handling and Debugging
499
500
```python
501
from smart_open.utils import check_kwargs
502
from smart_open.transport import get_transport
503
import logging
504
505
# Enable debug logging for transport operations
506
logging.basicConfig(level=logging.DEBUG)
507
logger = logging.getLogger('smart_open.transport')
508
509
def debug_transport_params(uri, transport_params):
510
"""Debug transport parameter compatibility."""
511
from urllib.parse import urlparse
512
513
scheme = urlparse(uri).scheme or ''
514
515
try:
516
transport = get_transport(scheme)
517
518
# Check if transport_params are compatible with transport.open
519
if hasattr(transport, 'open'):
520
valid_params = check_kwargs(transport.open, transport_params)
521
invalid_params = set(transport_params.keys()) - set(valid_params.keys())
522
523
if invalid_params:
524
logger.warning(f"Invalid transport_params for {scheme}: {invalid_params}")
525
526
return valid_params
527
except Exception as e:
528
logger.error(f"Transport parameter validation failed: {e}")
529
return transport_params
530
531
# Usage
532
uri = 's3://my-bucket/file.txt'
533
params = {
534
'buffer_size': 1024*1024,
535
'multipart_upload': True,
536
'invalid_param': 'ignored'
537
}
538
539
valid_params = debug_transport_params(uri, params)
540
print(f"Valid parameters: {valid_params}")
541
```
542
543
### Performance Monitoring
544
545
```python
546
import time
547
import functools
548
from smart_open import open
549
550
def timing_decorator(func):
551
"""Decorator to measure function execution time."""
552
@functools.wraps(func)
553
def wrapper(*args, **kwargs):
554
start_time = time.time()
555
result = func(*args, **kwargs)
556
end_time = time.time()
557
print(f"{func.__name__} took {end_time - start_time:.2f} seconds")
558
return result
559
return wrapper
560
561
@timing_decorator
562
def benchmark_read(uri, chunk_size=8192):
563
"""Benchmark file reading performance."""
564
total_bytes = 0
565
with open(uri, 'rb') as f:
566
while True:
567
chunk = f.read(chunk_size)
568
if not chunk:
569
break
570
total_bytes += len(chunk)
571
return total_bytes
572
573
# Compare different chunk sizes
574
uris_to_test = [
575
's3://test-bucket/large-file.dat',
576
'gs://test-bucket/large-file.dat',
577
'azure://container/large-file.dat'
578
]
579
580
chunk_sizes = [4096, 8192, 16384, 32768, 65536]
581
582
for uri in uris_to_test:
583
print(f"\nTesting {uri}:")
584
for chunk_size in chunk_sizes:
585
total_bytes = benchmark_read(uri, chunk_size)
586
print(f" Chunk size {chunk_size}: {total_bytes} bytes")
587
```
588
589
## Best Practices
590
591
### Transport Parameter Validation
592
593
Always validate transport parameters to avoid runtime surprises:
594
595
```python
596
from smart_open.utils import check_kwargs
597
from smart_open.s3 import open as s3_open
598
599
# Validate parameters before use
600
proposed_params = {
601
'buffer_size': 1024*1024,
602
'multipart_upload': True,
603
'typo_in_parameter_name': 'ignored'
604
}
605
606
valid_params = check_kwargs(s3_open, proposed_params)
607
# Use valid_params instead of proposed_params
608
```
609
610
### URI Handling
611
612
Use safe_urlsplit for URLs that might contain special characters:
613
614
```python
615
from smart_open.utils import safe_urlsplit
616
617
# Safer than urllib.parse.urlsplit for cloud storage URLs
618
uri = 's3://bucket/file?with?questions.txt'
619
parsed = safe_urlsplit(uri)
620
```
621
622
### Parallel Processing
623
624
Choose appropriate parallelism based on I/O characteristics:
625
626
```python
627
# CPU-bound: Use multiprocessing
628
with create_pool(processes=cpu_count()) as pool:
629
results = list(pool.imap_unordered(cpu_intensive_func, items))
630
631
# I/O-bound: Use threading
632
from smart_open.concurrency import ConcurrentFuturesPool
633
with ConcurrentFuturesPool(max_workers=20) as pool:
634
results = list(pool.imap_unordered(io_intensive_func, items))
635
```