0
# Memory and I/O Management
1
2
Memory pool management, buffer operations, compression codecs, and file system abstraction. Provides fine-grained control over memory allocation, efficient I/O operations, and support for various storage systems including local files, cloud storage, and in-memory buffers.
3
4
## Capabilities
5
6
### Memory Management
7
8
Control over memory allocation with pluggable memory pools, tracking, and different backend implementations.
9
10
```python { .api }
11
def default_memory_pool():
12
"""
13
Get default memory pool.
14
15
Returns:
16
MemoryPool: Default memory pool instance
17
"""
18
19
def system_memory_pool():
20
"""
21
Get system memory pool.
22
23
Returns:
24
MemoryPool: System memory pool using malloc/free
25
"""
26
27
def jemalloc_memory_pool():
28
"""
29
Get jemalloc memory pool (if available).
30
31
Returns:
32
MemoryPool: jemalloc-based memory pool
33
"""
34
35
def mimalloc_memory_pool():
36
"""
37
Get mimalloc memory pool (if available).
38
39
Returns:
40
MemoryPool: mimalloc-based memory pool
41
"""
42
43
def set_memory_pool(pool):
44
"""
45
Set default memory pool.
46
47
Parameters:
48
- pool: MemoryPool, memory pool to use as default
49
"""
50
51
def total_allocated_bytes():
52
"""
53
Get total allocated bytes across all pools.
54
55
Returns:
56
int: Total allocated bytes
57
"""
58
59
def supported_memory_backends():
60
"""
61
List supported memory pool backends.
62
63
Returns:
64
list of str: Available memory backends
65
"""
66
67
def log_memory_allocations(enable):
68
"""
69
Control memory allocation logging.
70
71
Parameters:
72
- enable: bool, whether to enable logging
73
"""
74
75
def jemalloc_set_decay_ms(decay_ms):
76
"""
77
Set jemalloc decay time.
78
79
Parameters:
80
- decay_ms: int, decay time in milliseconds
81
"""
82
83
class MemoryPool:
84
"""
85
Abstract memory pool interface.
86
87
Attributes:
88
- backend_name: Name of memory pool backend
89
- bytes_allocated: Current allocated bytes
90
- max_memory: Maximum memory allocation
91
- total_bytes_allocated: Total bytes allocated
92
"""
93
94
def allocate(self, size):
95
"""Allocate memory buffer."""
96
97
def reallocate(self, buffer, old_size, new_size):
98
"""Reallocate memory buffer."""
99
100
def free(self, buffer, size):
101
"""Free memory buffer."""
102
103
class LoggingMemoryPool(MemoryPool):
104
"""Memory pool wrapper with allocation logging."""
105
106
class ProxyMemoryPool(MemoryPool):
107
"""Memory pool proxy for delegation."""
108
109
def logging_memory_pool(pool):
110
"""
111
Create logging wrapper for memory pool.
112
113
Parameters:
114
- pool: MemoryPool, pool to wrap
115
116
Returns:
117
LoggingMemoryPool: Logging memory pool wrapper
118
"""
119
120
def proxy_memory_pool(pool):
121
"""
122
Create proxy wrapper for memory pool.
123
124
Parameters:
125
- pool: MemoryPool, pool to proxy
126
127
Returns:
128
ProxyMemoryPool: Proxy memory pool wrapper
129
"""
130
```
131
132
### Buffer Operations
133
134
Low-level memory buffer operations for efficient data handling and zero-copy operations.
135
136
```python { .api }
137
def allocate_buffer(size, alignment=None, memory_pool=None, resizable=False):
138
"""
139
Allocate new buffer.
140
141
Parameters:
142
- size: int, buffer size in bytes
143
- alignment: int, memory alignment
144
- memory_pool: MemoryPool, memory pool to use
145
- resizable: bool, whether buffer is resizable
146
147
Returns:
148
Buffer or ResizableBuffer: Allocated buffer
149
"""
150
151
def foreign_buffer(address, size, base=None):
152
"""
153
Create buffer from foreign memory.
154
155
Parameters:
156
- address: int, memory address
157
- size: int, buffer size
158
- base: object, object holding memory reference
159
160
Returns:
161
Buffer: Buffer wrapping foreign memory
162
"""
163
164
def py_buffer(obj):
165
"""
166
Create buffer from Python buffer protocol object.
167
168
Parameters:
169
- obj: object implementing buffer protocol
170
171
Returns:
172
Buffer: Buffer wrapping Python object
173
"""
174
175
class Buffer:
176
"""
177
Immutable memory buffer.
178
179
Attributes:
180
- address: Memory address
181
- is_mutable: Whether buffer is mutable
182
- size: Buffer size in bytes
183
"""
184
185
def __len__(self): ...
186
def __getitem__(self, key): ...
187
188
def equals(self, other):
189
"""Check buffer equality."""
190
191
def slice(self, offset=0, length=None):
192
"""Create buffer slice."""
193
194
def to_pybytes(self):
195
"""Convert to Python bytes."""
196
197
def hex(self):
198
"""Hexadecimal representation."""
199
200
class ResizableBuffer(Buffer):
201
"""
202
Mutable resizable memory buffer.
203
204
Attributes:
205
- capacity: Buffer capacity
206
"""
207
208
def resize(self, new_size, shrink_to_fit=True):
209
"""Resize buffer."""
210
211
def reserve(self, capacity):
212
"""Reserve buffer capacity."""
213
```
214
215
### Compression
216
217
Compression and decompression with support for multiple codecs and configurable compression levels.
218
219
```python { .api }
220
def compress(data, codec=None, asbytes=False, memory_pool=None):
221
"""
222
Compress data.
223
224
Parameters:
225
- data: bytes-like, data to compress
226
- codec: str or Codec, compression codec
227
- asbytes: bool, return bytes instead of Buffer
228
- memory_pool: MemoryPool, memory pool for allocation
229
230
Returns:
231
Buffer or bytes: Compressed data
232
"""
233
234
def decompress(data, decompressed_size=None, codec=None, memory_pool=None):
235
"""
236
Decompress data.
237
238
Parameters:
239
- data: bytes-like, compressed data
240
- decompressed_size: int, expected decompressed size
241
- codec: str or Codec, compression codec
242
- memory_pool: MemoryPool, memory pool for allocation
243
244
Returns:
245
Buffer: Decompressed data
246
"""
247
248
class Codec:
249
"""
250
Compression codec interface.
251
252
Attributes:
253
- name: Codec name
254
- compression_level: Compression level
255
"""
256
257
@staticmethod
258
def is_available(codec_name):
259
"""
260
Check if codec is available.
261
262
Parameters:
263
- codec_name: str, codec name
264
265
Returns:
266
bool: Whether codec is available
267
"""
268
269
def compress(self, data, memory_pool=None):
270
"""Compress data."""
271
272
def decompress(self, data, decompressed_size=None, memory_pool=None):
273
"""Decompress data."""
274
```
275
276
### File I/O
277
278
File interfaces and streaming I/O operations with support for various file types and compression.
279
280
```python { .api }
281
def input_stream(source, compression='detect', buffer_size=None):
282
"""
283
Create input stream from various sources.
284
285
Parameters:
286
- source: str, file path, or file-like object
287
- compression: str, compression type or 'detect'
288
- buffer_size: int, buffer size for reading
289
290
Returns:
291
InputStream: Input stream for reading
292
"""
293
294
def output_stream(source, compression=None, buffer_size=None):
295
"""
296
Create output stream.
297
298
Parameters:
299
- source: str, file path, or file-like object
300
- compression: str, compression type
301
- buffer_size: int, buffer size for writing
302
303
Returns:
304
OutputStream: Output stream for writing
305
"""
306
307
def memory_map(path, mode='r'):
308
"""
309
Create memory map from file.
310
311
Parameters:
312
- path: str, file path
313
- mode: str, access mode ('r', 'r+', 'w')
314
315
Returns:
316
MemoryMappedFile: Memory mapped file
317
"""
318
319
def create_memory_map(path, size):
320
"""
321
Create new memory mapped file.
322
323
Parameters:
324
- path: str, file path
325
- size: int, file size
326
327
Returns:
328
MemoryMappedFile: New memory mapped file
329
"""
330
331
class NativeFile:
332
"""
333
Abstract base for native file implementations.
334
335
Attributes:
336
- closed: Whether file is closed
337
- mode: File access mode
338
"""
339
340
def close(self): ...
341
def closed(self): ...
342
def fileno(self): ...
343
def flush(self): ...
344
def isatty(self): ...
345
def readable(self): ...
346
def seekable(self): ...
347
def writable(self): ...
348
349
def read(self, nbytes=None): ...
350
def read1(self, nbytes=None): ...
351
def readall(self): ...
352
def readinto(self, b): ...
353
def readline(self, size=None): ...
354
def readlines(self, hint=None): ...
355
356
def seek(self, pos, whence=0): ...
357
def tell(self): ...
358
def truncate(self, size=None): ...
359
360
def write(self, data): ...
361
def writelines(self, lines): ...
362
363
class PythonFile(NativeFile):
364
"""Wrapper for Python file-like objects."""
365
366
class OSFile(NativeFile):
367
"""
368
Operating system file with memory mapping support.
369
370
Attributes:
371
- size: File size in bytes
372
"""
373
374
class MemoryMappedFile(NativeFile):
375
"""
376
Memory-mapped file interface.
377
378
Attributes:
379
- size: File size in bytes
380
"""
381
382
def resize(self, new_size):
383
"""Resize memory mapped file."""
384
385
class BufferedInputStream:
386
"""Buffered input stream wrapper."""
387
388
def __init__(self, stream, buffer_size=None, memory_pool=None): ...
389
390
class BufferedOutputStream:
391
"""Buffered output stream wrapper."""
392
393
def __init__(self, stream, buffer_size=None, memory_pool=None): ...
394
395
class CompressedInputStream:
396
"""Compressed input stream."""
397
398
def __init__(self, stream, compression, memory_pool=None): ...
399
400
class CompressedOutputStream:
401
"""Compressed output stream."""
402
403
def __init__(self, stream, compression, memory_pool=None): ...
404
405
class TransformInputStream:
406
"""Transform input stream with custom function."""
407
408
def __init__(self, stream, transform_func): ...
409
410
def transcoding_input_stream(stream, src_encoding, dest_encoding):
411
"""
412
Create character encoding transform stream.
413
414
Parameters:
415
- stream: input stream
416
- src_encoding: str, source encoding
417
- dest_encoding: str, destination encoding
418
419
Returns:
420
TransformInputStream: Transcoding stream
421
"""
422
423
class FixedSizeBufferWriter:
424
"""Writer to fixed-size buffer."""
425
426
def __init__(self, buffer): ...
427
428
class BufferReader:
429
"""Reader from buffer."""
430
431
def __init__(self, buffer): ...
432
433
class BufferOutputStream:
434
"""Output stream to growable buffer."""
435
436
def __init__(self, memory_pool=None): ...
437
438
def getvalue(self):
439
"""Get buffer contents."""
440
441
class MockOutputStream:
442
"""Mock output stream for testing."""
443
444
class CacheOptions:
445
"""Caching configuration for streams."""
446
447
def have_libhdfs():
448
"""
449
Check if HDFS support is available.
450
451
Returns:
452
bool: Whether libhdfs is available
453
"""
454
```
455
456
### Device Support
457
458
Device abstraction for CPU and GPU memory management in heterogeneous computing environments.
459
460
```python { .api }
461
class Device:
462
"""
463
Device abstraction for CPU/GPU memory.
464
465
Attributes:
466
- device_type: Type of device
467
- device_id: Device identifier
468
"""
469
470
def equals(self, other):
471
"""Check device equality."""
472
473
def __str__(self): ...
474
475
class DeviceAllocationType:
476
"""Enumeration of device allocation types."""
477
478
class MemoryManager:
479
"""
480
Cross-device memory manager interface.
481
"""
482
483
def allocate(self, size):
484
"""Allocate device memory."""
485
486
def copy_non_owned(self, data, device=None, memory_pool=None):
487
"""Copy data to device."""
488
489
def default_cpu_memory_manager():
490
"""
491
Get default CPU memory manager.
492
493
Returns:
494
MemoryManager: CPU memory manager
495
"""
496
```
497
498
## Usage Examples
499
500
### Memory Pool Management
501
502
```python
503
import pyarrow as pa
504
505
# Check available memory backends
506
backends = pa.supported_memory_backends()
507
print(f"Available backends: {backends}")
508
509
# Get default memory pool info
510
pool = pa.default_memory_pool()
511
print(f"Backend: {pool.backend_name}")
512
print(f"Allocated: {pool.bytes_allocated()} bytes")
513
print(f"Max memory: {pool.max_memory()} bytes")
514
515
# Use different memory pools
516
if 'jemalloc' in backends:
517
jemalloc_pool = pa.jemalloc_memory_pool()
518
pa.set_memory_pool(jemalloc_pool)
519
print("Switched to jemalloc")
520
521
# Create logging memory pool
522
logging_pool = pa.logging_memory_pool(pa.default_memory_pool())
523
pa.set_memory_pool(logging_pool)
524
525
# Enable memory allocation logging
526
pa.log_memory_allocations(True)
527
528
# Create large array to see logging
529
large_array = pa.array(range(1000000))
530
print(f"Created array with {len(large_array)} elements")
531
532
# Check total allocation
533
total = pa.total_allocated_bytes()
534
print(f"Total allocated: {total} bytes")
535
```
536
537
### Buffer Operations
538
539
```python
540
import pyarrow as pa
541
542
# Allocate buffer
543
buffer = pa.allocate_buffer(1024)
544
print(f"Buffer size: {buffer.size}")
545
print(f"Buffer address: {buffer.address}")
546
547
# Create resizable buffer
548
resizable = pa.allocate_buffer(512, resizable=True)
549
print(f"Initial capacity: {resizable.capacity}")
550
551
# Resize buffer
552
resizable.resize(1024)
553
print(f"New size: {resizable.size}")
554
print(f"New capacity: {resizable.capacity}")
555
556
# Create buffer from bytes
557
data = b"Hello, Arrow!"
558
py_buffer = pa.py_buffer(data)
559
print(f"Buffer from bytes: {py_buffer.to_pybytes()}")
560
561
# Buffer slicing
562
slice_buffer = py_buffer.slice(7, 5) # "Arrow"
563
print(f"Sliced buffer: {slice_buffer.to_pybytes()}")
564
565
# Foreign buffer (advanced usage)
566
import ctypes
567
c_array = (ctypes.c_byte * 10)(*range(10))
568
foreign = pa.foreign_buffer(
569
ctypes.addressof(c_array),
570
ctypes.sizeof(c_array),
571
base=c_array # Keep reference
572
)
573
print(f"Foreign buffer: {foreign.to_pybytes()}")
574
```
575
576
### Compression
577
578
```python
579
import pyarrow as pa
580
581
# Check available codecs
582
codecs = ['gzip', 'snappy', 'lz4', 'zstd', 'brotli']
583
available = [codec for codec in codecs if pa.Codec.is_available(codec)]
584
print(f"Available codecs: {available}")
585
586
# Compress data
587
data = b"This is some test data to compress. " * 100
588
original_size = len(data)
589
590
for codec_name in available:
591
# Compress
592
compressed = pa.compress(data, codec=codec_name)
593
compressed_size = compressed.size
594
595
# Decompress
596
decompressed = pa.decompress(compressed, codec=codec_name)
597
598
# Verify
599
assert decompressed.to_pybytes() == data
600
601
compression_ratio = original_size / compressed_size
602
print(f"{codec_name}: {original_size} -> {compressed_size} "
603
f"(ratio: {compression_ratio:.2f})")
604
605
# Use Codec class directly
606
codec = pa.Codec('snappy')
607
compressed = codec.compress(data)
608
decompressed = codec.decompress(compressed)
609
print(f"Codec class: {len(decompressed.to_pybytes())} bytes")
610
```
611
612
### File I/O Operations
613
614
```python
615
import pyarrow as pa
616
import tempfile
617
import os
618
619
# Create sample data
620
table = pa.table({
621
'id': range(1000),
622
'value': [x * 1.5 for x in range(1000)]
623
})
624
625
with tempfile.TemporaryDirectory() as temp_dir:
626
# Memory mapped file
627
mmap_path = os.path.join(temp_dir, 'mmap_test.bin')
628
629
# Create memory mapped file
630
with pa.create_memory_map(mmap_path, 8192) as mmap_file:
631
# Write data
632
data = b"Memory mapped data " * 100
633
mmap_file.write(data)
634
mmap_file.flush()
635
636
# Read memory mapped file
637
with pa.memory_map(mmap_path, 'r') as mmap_file:
638
read_data = mmap_file.read()
639
print(f"Memory mapped read: {len(read_data)} bytes")
640
641
# Compressed I/O
642
compressed_path = os.path.join(temp_dir, 'compressed.gz')
643
644
# Write compressed
645
with pa.output_stream(compressed_path, compression='gzip') as out:
646
out.write(b"Compressed data " * 1000)
647
648
# Read compressed
649
with pa.input_stream(compressed_path, compression='gzip') as inp:
650
read_compressed = inp.read()
651
print(f"Compressed read: {len(read_compressed)} bytes")
652
653
# Buffer I/O
654
buffer_stream = pa.BufferOutputStream()
655
buffer_stream.write(b"Buffer stream data")
656
buffer_contents = buffer_stream.getvalue()
657
print(f"Buffer stream: {buffer_contents.to_pybytes()}")
658
659
# Read from buffer
660
buffer_reader = pa.BufferReader(buffer_contents)
661
read_from_buffer = buffer_reader.read()
662
print(f"Buffer reader: {read_from_buffer}")
663
664
# Transcoding example
665
text_data = "Hello, ไธ็! ๐".encode('utf-8')
666
utf8_stream = pa.BufferReader(pa.py_buffer(text_data))
667
668
# Transcode UTF-8 to Latin-1 (this will fail for non-ASCII)
669
try:
670
transcoded_stream = pa.transcoding_input_stream(
671
utf8_stream, 'utf-8', 'latin-1'
672
)
673
transcoded_data = transcoded_stream.read()
674
print(f"Transcoded: {transcoded_data}")
675
except Exception as e:
676
print(f"Transcoding failed: {e}")
677
```
678
679
### Advanced Memory Management
680
681
```python
682
import pyarrow as pa
683
import gc
684
685
def memory_usage_demo():
686
# Track memory usage
687
initial_memory = pa.total_allocated_bytes()
688
689
# Create large arrays
690
arrays = []
691
for i in range(10):
692
arr = pa.array(range(100000))
693
arrays.append(arr)
694
695
peak_memory = pa.total_allocated_bytes()
696
print(f"Peak memory usage: {peak_memory - initial_memory} bytes")
697
698
# Clear arrays
699
arrays.clear()
700
gc.collect() # Force garbage collection
701
702
final_memory = pa.total_allocated_bytes()
703
print(f"Final memory usage: {final_memory - initial_memory} bytes")
704
705
# Custom memory pool example
706
class TrackingMemoryPool(pa.MemoryPool):
707
"""Example custom memory pool that tracks allocations."""
708
709
def __init__(self, base_pool):
710
self.base_pool = base_pool
711
self.allocation_count = 0
712
self.deallocation_count = 0
713
714
@property
715
def backend_name(self):
716
return f"tracking_{self.base_pool.backend_name}"
717
718
def bytes_allocated(self):
719
return self.base_pool.bytes_allocated()
720
721
def max_memory(self):
722
return self.base_pool.max_memory()
723
724
def allocate(self, size):
725
self.allocation_count += 1
726
return self.base_pool.allocate(size)
727
728
def free(self, buffer, size):
729
self.deallocation_count += 1
730
return self.base_pool.free(buffer, size)
731
732
# Use custom memory pool
733
base_pool = pa.default_memory_pool()
734
tracking_pool = TrackingMemoryPool(base_pool)
735
736
# Note: Custom pools in Python have limitations
737
# This is more of a conceptual example
738
print(f"Custom pool backend: {tracking_pool.backend_name}")
739
740
# Run memory usage demo
741
memory_usage_demo()
742
```