or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

big-data.mdcloud-storage.mdcompression.mdcore-operations.mdindex.mdnetwork-access.mdutilities.md

utilities.mddocs/

0

# Utilities and Advanced Usage

1

2

Helper functions for URI handling, byte ranges, parallel processing, and custom transport development. These utilities provide advanced functionality and extensibility for power users and library developers.

3

4

## Capabilities

5

6

### URI and URL Utilities

7

8

Advanced URI parsing and manipulation functions.

9

10

```python { .api }

11

def safe_urlsplit(url):

12

"""URL split that handles question marks in S3/GS URLs.

13

14

Parameters:

15

url: str - URL to split, may contain question marks in path

16

17

Returns:

18

urllib.parse.SplitResult - Parsed URL components

19

20

Notes:

21

Handles special case where S3/GCS object keys contain '?' characters

22

which would normally be interpreted as query string delimiters

23

"""

24

25

def make_range_string(start=None, stop=None):

26

"""Create HTTP byte range specifier string.

27

28

Parameters:

29

start: int - Starting byte position (inclusive)

30

stop: int - Ending byte position (inclusive)

31

32

Returns:

33

str - Range string like 'bytes=0-1023' or 'bytes=1000-'

34

35

Notes:

36

Used for HTTP Range requests and partial content retrieval

37

"""

38

39

def parse_content_range(content_range):

40

"""Parse HTTP Content-Range header value.

41

42

Parameters:

43

content_range: str - Content-Range header value

44

45

Returns:

46

tuple - (units, start, stop, length) parsed from header

47

48

Example:

49

parse_content_range('bytes 0-1023/2048') -> ('bytes', 0, 1023, 2048)

50

"""

51

```

52

53

### Utility Constants

54

55

```python { .api }

56

# URL schemes requiring special handling

57

WORKAROUND_SCHEMES = ['s3', 's3n', 's3u', 's3a', 'gs']

58

59

# Placeholder for question marks in URLs

60

QUESTION_MARK_PLACEHOLDER = '///smart_open.utils.QUESTION_MARK_PLACEHOLDER///'

61

```

62

63

### Function Introspection

64

65

Utilities for examining and validating function parameters.

66

67

```python { .api }

68

def inspect_kwargs(callable_obj):

69

"""Inspect function signature for supported keyword arguments.

70

71

Parameters:

72

callable_obj: callable - Function or method to inspect

73

74

Returns:

75

dict - Mapping of argument names to their default values

76

77

Notes:

78

Used internally to validate transport_params against function signatures

79

"""

80

81

def check_kwargs(callable_obj, kwargs):

82

"""Filter kwargs to only include supported parameters, log warnings for unsupported.

83

84

Parameters:

85

callable_obj: callable - Function to check against

86

kwargs: dict - Keyword arguments to filter

87

88

Returns:

89

dict - Filtered kwargs containing only supported parameters

90

91

Notes:

92

Logs warnings for unsupported kwargs that are filtered out

93

"""

94

95

def clamp(value, minval=0, maxval=None):

96

"""Clamp numeric value to specified range.

97

98

Parameters:

99

value: number - Value to clamp

100

minval: number - Minimum allowed value

101

maxval: number - Maximum allowed value (None for no limit)

102

103

Returns:

104

number - Clamped value within [minval, maxval] range

105

"""

106

```

107

108

### Enhanced I/O Classes

109

110

Advanced file-like object wrappers with additional functionality.

111

112

```python { .api }

113

class TextIOWrapper(io.TextIOWrapper):

114

"""Enhanced TextIOWrapper with improved exception handling.

115

116

Provides better error reporting and handling for text mode operations

117

over binary file objects from various transport layers.

118

"""

119

120

class FileLikeProxy(wrapt.ObjectProxy):

121

"""Proxy that manages relationships between inner and outer file objects.

122

123

Coordinates operations between compression layers and transport layers,

124

ensuring proper resource management and method delegation.

125

"""

126

```

127

128

### Concurrency Utilities

129

130

Parallel processing support for I/O operations.

131

132

```python { .api }

133

def create_pool(processes=1):

134

"""Create process or thread pool for parallel operations.

135

136

Parameters:

137

processes: int - Number of worker processes/threads

138

139

Returns:

140

Pool object with imap_unordered() method for parallel iteration

141

142

Notes:

143

Automatically selects between multiprocessing and threading based on availability

144

Returns DummyPool for single-process fallback when multiprocessing unavailable

145

"""

146

147

class DummyPool:

148

"""Fallback pool implementation when multiprocessing is unavailable.

149

150

Provides same interface as multiprocessing.Pool but executes sequentially.

151

"""

152

153

def imap_unordered(self, func, iterable):

154

"""Sequential map implementation."""

155

156

class ConcurrentFuturesPool:

157

"""Thread-based pool using concurrent.futures.ThreadPoolExecutor.

158

159

Alternative to multiprocessing for I/O-bound parallel operations.

160

"""

161

162

def imap_unordered(self, func, iterable):

163

"""Parallel map using thread pool."""

164

```

165

166

### Byte Buffer Operations

167

168

Efficient byte buffer for network I/O operations.

169

170

```python { .api }

171

class ByteBuffer:

172

"""Efficient byte buffer for streaming network I/O.

173

174

Provides buffering layer between network reads and application consumption,

175

optimizing for both small reads and bulk operations.

176

"""

177

178

def fill(self, reader):

179

"""Fill buffer from reader function.

180

181

Parameters:

182

reader: callable - Function that returns bytes when called

183

"""

184

185

def read(self, size=-1):

186

"""Read bytes from buffer.

187

188

Parameters:

189

size: int - Number of bytes to read (-1 for all available)

190

191

Returns:

192

bytes - Data read from buffer

193

"""

194

195

def peek(self):

196

"""Peek at buffer contents without consuming.

197

198

Returns:

199

bytes - Current buffer contents

200

"""

201

202

def readline(self, terminator=b'\n'):

203

"""Read line from buffer up to terminator.

204

205

Parameters:

206

terminator: bytes - Line terminator to search for

207

208

Returns:

209

bytes - Line including terminator

210

"""

211

212

def empty(self):

213

"""Empty the buffer, discarding all contents."""

214

```

215

216

### Transport System

217

218

Transport registration and management system.

219

220

```python { .api }

221

def register_transport(submodule):

222

"""Register transport module for URI schemes.

223

224

Parameters:

225

submodule: module or str - Transport module or module name to register

226

227

Notes:

228

Module must have SCHEME/SCHEMES attribute and open, open_uri, parse_uri functions

229

Automatically handles import errors for optional dependencies

230

"""

231

232

def get_transport(scheme):

233

"""Get transport module for URI scheme.

234

235

Parameters:

236

scheme: str - URI scheme (e.g., 's3', 'http', 'ftp')

237

238

Returns:

239

module - Transport module implementing the scheme

240

241

Raises:

242

ImportError - If required dependencies for scheme are missing

243

NotImplementedError - If scheme is not supported

244

"""

245

246

# Transport registry constants

247

NO_SCHEME = '' # Used for local file operations

248

SUPPORTED_SCHEMES = tuple(sorted(_REGISTRY.keys())) # All registered schemes

249

```

250

251

### Core Constants

252

253

Shared constants used throughout the library.

254

255

```python { .api }

256

# Binary mode constants

257

READ_BINARY = 'rb'

258

WRITE_BINARY = 'wb'

259

BINARY_MODES = (READ_BINARY, WRITE_BINARY)

260

BINARY_NEWLINE = b'\n'

261

262

# Seek operation constants

263

WHENCE_START = 0 # Seek from beginning of file

264

WHENCE_CURRENT = 1 # Seek from current position

265

WHENCE_END = 2 # Seek from end of file

266

WHENCE_CHOICES = (WHENCE_START, WHENCE_CURRENT, WHENCE_END)

267

```

268

269

## Usage Examples

270

271

### URI Manipulation

272

273

```python

274

from smart_open.utils import safe_urlsplit, make_range_string, parse_content_range

275

276

# Handle URLs with question marks in path (common in S3/GCS)

277

problematic_url = 's3://bucket/file?with?questions.txt'

278

parsed = safe_urlsplit(problematic_url)

279

print(f"Scheme: {parsed.scheme}, Path: {parsed.path}")

280

281

# Create HTTP range requests

282

range_header = make_range_string(0, 1023) # "bytes=0-1023"

283

range_header_open = make_range_string(1000) # "bytes=1000-"

284

285

# Parse Content-Range responses

286

content_range = "bytes 0-1023/2048"

287

units, start, stop, total = parse_content_range(content_range)

288

print(f"Retrieved bytes {start}-{stop} of {total} total")

289

```

290

291

### Parameter Validation

292

293

```python

294

from smart_open.utils import inspect_kwargs, check_kwargs

295

296

def my_transport_function(uri, mode, custom_param=None, buffer_size=8192):

297

"""Example transport function."""

298

pass

299

300

# Inspect function signature

301

supported_params = inspect_kwargs(my_transport_function)

302

print(f"Supported parameters: {list(supported_params.keys())}")

303

304

# Filter user-provided parameters

305

user_params = {

306

'custom_param': 'value',

307

'buffer_size': 4096,

308

'unsupported_param': 'ignored' # This will be filtered out

309

}

310

valid_params = check_kwargs(my_transport_function, user_params)

311

# valid_params = {'custom_param': 'value', 'buffer_size': 4096}

312

```

313

314

### Parallel Processing

315

316

```python

317

from smart_open.concurrency import create_pool

318

from smart_open import open

319

320

def process_file(uri):

321

"""Process a single file and return results."""

322

with open(uri, 'rb') as f:

323

content = f.read()

324

return len(content) # Example: return file size

325

326

# Parallel file processing

327

file_uris = [

328

's3://bucket/file1.txt',

329

's3://bucket/file2.txt',

330

'gs://bucket/file3.txt',

331

'azure://container/file4.txt'

332

]

333

334

# Process files in parallel

335

with create_pool(processes=4) as pool:

336

results = list(pool.imap_unordered(process_file, file_uris))

337

338

print(f"Processed {len(results)} files, total size: {sum(results)} bytes")

339

340

# Sequential fallback when multiprocessing unavailable

341

from smart_open.concurrency import DummyPool

342

343

with DummyPool() as pool:

344

results = list(pool.imap_unordered(process_file, file_uris))

345

```

346

347

### Byte Buffer Usage

348

349

```python

350

from smart_open.bytebuffer import ByteBuffer

351

import socket

352

353

# Network reading with buffering

354

def read_from_socket(sock):

355

buffer = ByteBuffer()

356

357

# Fill buffer from socket

358

def socket_reader():

359

try:

360

return sock.recv(4096)

361

except socket.timeout:

362

return b''

363

364

buffer.fill(socket_reader)

365

366

# Read lines from buffer

367

while True:

368

try:

369

line = buffer.readline()

370

if not line:

371

break

372

yield line.decode('utf-8').strip()

373

except Exception:

374

break

375

376

# Usage with smart-open

377

from smart_open import open

378

379

with open('http://example.com/stream-data.txt', 'rb') as f:

380

buffer = ByteBuffer()

381

382

# Fill buffer from HTTP stream

383

buffer.fill(lambda: f.read(8192))

384

385

# Process line by line

386

while True:

387

line = buffer.readline()

388

if not line:

389

break

390

process_line(line)

391

```

392

393

### Custom Transport Development

394

395

```python

396

from smart_open.transport import register_transport

397

398

# Create custom transport module

399

class CustomTransport:

400

SCHEME = 'custom'

401

402

@staticmethod

403

def parse_uri(uri_as_string):

404

"""Parse custom URI format."""

405

# Implementation specific to custom scheme

406

return {'scheme': 'custom', 'path': uri_as_string[9:]} # Remove 'custom://'

407

408

@staticmethod

409

def open_uri(uri, mode, transport_params):

410

"""Open custom URI with transport parameters."""

411

parsed = CustomTransport.parse_uri(uri)

412

return CustomTransport.open(parsed['path'], mode)

413

414

@staticmethod

415

def open(path, mode):

416

"""Open custom resource."""

417

# Custom implementation

418

if 'r' in mode:

419

return CustomReader(path)

420

else:

421

return CustomWriter(path)

422

423

class CustomReader:

424

def __init__(self, path):

425

self.path = path

426

self._closed = False

427

428

def read(self, size=-1):

429

"""Read from custom source."""

430

return f"Data from {self.path}".encode()

431

432

def close(self):

433

self._closed = True

434

435

class CustomWriter:

436

def __init__(self, path):

437

self.path = path

438

self._closed = False

439

440

def write(self, data):

441

"""Write to custom destination."""

442

print(f"Writing to {self.path}: {data}")

443

return len(data)

444

445

def close(self):

446

self._closed = True

447

448

# Register the custom transport

449

register_transport(CustomTransport)

450

451

# Now custom:// URLs work with smart-open

452

from smart_open import open

453

454

with open('custom://my-resource', 'rb') as f:

455

data = f.read()

456

457

with open('custom://output-resource', 'wb') as f:

458

f.write(b'Hello custom transport!')

459

```

460

461

### Advanced HTTP Operations

462

463

```python

464

from smart_open.utils import make_range_string

465

from smart_open import open

466

import requests

467

468

# Partial file downloads using Range requests

469

def download_file_range(url, start_byte, end_byte):

470

"""Download specific byte range from HTTP resource."""

471

range_header = make_range_string(start_byte, end_byte)

472

transport_params = {

473

'headers': {'Range': range_header}

474

}

475

476

with open(url, 'rb', transport_params=transport_params) as f:

477

return f.read()

478

479

# Download file in chunks

480

def chunked_download(url, chunk_size=1024*1024):

481

"""Download large file in chunks to avoid memory issues."""

482

# First, get file size

483

response = requests.head(url)

484

content_length = int(response.headers.get('Content-Length', 0))

485

486

chunks = []

487

for start in range(0, content_length, chunk_size):

488

end = min(start + chunk_size - 1, content_length - 1)

489

chunk = download_file_range(url, start, end)

490

chunks.append(chunk)

491

492

return b''.join(chunks)

493

494

# Usage

495

large_file_data = chunked_download('https://example.com/large-file.dat')

496

```

497

498

### Error Handling and Debugging

499

500

```python

501

from smart_open.utils import check_kwargs

502

from smart_open.transport import get_transport

503

import logging

504

505

# Enable debug logging for transport operations

506

logging.basicConfig(level=logging.DEBUG)

507

logger = logging.getLogger('smart_open.transport')

508

509

def debug_transport_params(uri, transport_params):

510

"""Debug transport parameter compatibility."""

511

from urllib.parse import urlparse

512

513

scheme = urlparse(uri).scheme or ''

514

515

try:

516

transport = get_transport(scheme)

517

518

# Check if transport_params are compatible with transport.open

519

if hasattr(transport, 'open'):

520

valid_params = check_kwargs(transport.open, transport_params)

521

invalid_params = set(transport_params.keys()) - set(valid_params.keys())

522

523

if invalid_params:

524

logger.warning(f"Invalid transport_params for {scheme}: {invalid_params}")

525

526

return valid_params

527

except Exception as e:

528

logger.error(f"Transport parameter validation failed: {e}")

529

return transport_params

530

531

# Usage

532

uri = 's3://my-bucket/file.txt'

533

params = {

534

'buffer_size': 1024*1024,

535

'multipart_upload': True,

536

'invalid_param': 'ignored'

537

}

538

539

valid_params = debug_transport_params(uri, params)

540

print(f"Valid parameters: {valid_params}")

541

```

542

543

### Performance Monitoring

544

545

```python

546

import time

547

import functools

548

from smart_open import open

549

550

def timing_decorator(func):

551

"""Decorator to measure function execution time."""

552

@functools.wraps(func)

553

def wrapper(*args, **kwargs):

554

start_time = time.time()

555

result = func(*args, **kwargs)

556

end_time = time.time()

557

print(f"{func.__name__} took {end_time - start_time:.2f} seconds")

558

return result

559

return wrapper

560

561

@timing_decorator

562

def benchmark_read(uri, chunk_size=8192):

563

"""Benchmark file reading performance."""

564

total_bytes = 0

565

with open(uri, 'rb') as f:

566

while True:

567

chunk = f.read(chunk_size)

568

if not chunk:

569

break

570

total_bytes += len(chunk)

571

return total_bytes

572

573

# Compare different chunk sizes

574

uris_to_test = [

575

's3://test-bucket/large-file.dat',

576

'gs://test-bucket/large-file.dat',

577

'azure://container/large-file.dat'

578

]

579

580

chunk_sizes = [4096, 8192, 16384, 32768, 65536]

581

582

for uri in uris_to_test:

583

print(f"\nTesting {uri}:")

584

for chunk_size in chunk_sizes:

585

total_bytes = benchmark_read(uri, chunk_size)

586

print(f" Chunk size {chunk_size}: {total_bytes} bytes")

587

```

588

589

## Best Practices

590

591

### Transport Parameter Validation

592

593

Always validate transport parameters to avoid runtime surprises:

594

595

```python

596

from smart_open.utils import check_kwargs

597

from smart_open.s3 import open as s3_open

598

599

# Validate parameters before use

600

proposed_params = {

601

'buffer_size': 1024*1024,

602

'multipart_upload': True,

603

'typo_in_parameter_name': 'ignored'

604

}

605

606

valid_params = check_kwargs(s3_open, proposed_params)

607

# Use valid_params instead of proposed_params

608

```

609

610

### URI Handling

611

612

Use safe_urlsplit for URLs that might contain special characters:

613

614

```python

615

from smart_open.utils import safe_urlsplit

616

617

# Safer than urllib.parse.urlsplit for cloud storage URLs

618

uri = 's3://bucket/file?with?questions.txt'

619

parsed = safe_urlsplit(uri)

620

```

621

622

### Parallel Processing

623

624

Choose appropriate parallelism based on I/O characteristics:

625

626

```python

627

# CPU-bound: Use multiprocessing

628

with create_pool(processes=cpu_count()) as pool:

629

results = list(pool.imap_unordered(cpu_intensive_func, items))

630

631

# I/O-bound: Use threading

632

from smart_open.concurrency import ConcurrentFuturesPool

633

with ConcurrentFuturesPool(max_workers=20) as pool:

634

results = list(pool.imap_unordered(io_intensive_func, items))

635

```