Python library for Apache Arrow columnar memory format and computing libraries
—
Memory pool management, buffer operations, compression codecs, and file system abstraction. Provides fine-grained control over memory allocation, efficient I/O operations, and support for various storage systems including local files, cloud storage, and in-memory buffers.
Control over memory allocation with pluggable memory pools, tracking, and different backend implementations.
def default_memory_pool():
"""
Get default memory pool.
Returns:
MemoryPool: Default memory pool instance
"""
def system_memory_pool():
"""
Get system memory pool.
Returns:
MemoryPool: System memory pool using malloc/free
"""
def jemalloc_memory_pool():
"""
Get jemalloc memory pool (if available).
Returns:
MemoryPool: jemalloc-based memory pool
"""
def mimalloc_memory_pool():
"""
Get mimalloc memory pool (if available).
Returns:
MemoryPool: mimalloc-based memory pool
"""
def set_memory_pool(pool):
"""
Set default memory pool.
Parameters:
- pool: MemoryPool, memory pool to use as default
"""
def total_allocated_bytes():
"""
Get total allocated bytes across all pools.
Returns:
int: Total allocated bytes
"""
def supported_memory_backends():
"""
List supported memory pool backends.
Returns:
list of str: Available memory backends
"""
def log_memory_allocations(enable):
"""
Control memory allocation logging.
Parameters:
- enable: bool, whether to enable logging
"""
def jemalloc_set_decay_ms(decay_ms):
"""
Set jemalloc decay time.
Parameters:
- decay_ms: int, decay time in milliseconds
"""
class MemoryPool:
"""
Abstract memory pool interface.
Attributes:
- backend_name: Name of memory pool backend
- bytes_allocated: Current allocated bytes
- max_memory: Maximum memory allocation
- total_bytes_allocated: Total bytes allocated
"""
def allocate(self, size):
"""Allocate memory buffer."""
def reallocate(self, buffer, old_size, new_size):
"""Reallocate memory buffer."""
def free(self, buffer, size):
"""Free memory buffer."""
class LoggingMemoryPool(MemoryPool):
"""Memory pool wrapper with allocation logging."""
class ProxyMemoryPool(MemoryPool):
"""Memory pool proxy for delegation."""
def logging_memory_pool(pool):
"""
Create logging wrapper for memory pool.
Parameters:
- pool: MemoryPool, pool to wrap
Returns:
LoggingMemoryPool: Logging memory pool wrapper
"""
def proxy_memory_pool(pool):
"""
Create proxy wrapper for memory pool.
Parameters:
- pool: MemoryPool, pool to proxy
Returns:
ProxyMemoryPool: Proxy memory pool wrapper
"""Low-level memory buffer operations for efficient data handling and zero-copy operations.
def allocate_buffer(size, alignment=None, memory_pool=None, resizable=False):
"""
Allocate new buffer.
Parameters:
- size: int, buffer size in bytes
- alignment: int, memory alignment
- memory_pool: MemoryPool, memory pool to use
- resizable: bool, whether buffer is resizable
Returns:
Buffer or ResizableBuffer: Allocated buffer
"""
def foreign_buffer(address, size, base=None):
"""
Create buffer from foreign memory.
Parameters:
- address: int, memory address
- size: int, buffer size
- base: object, object holding memory reference
Returns:
Buffer: Buffer wrapping foreign memory
"""
def py_buffer(obj):
"""
Create buffer from Python buffer protocol object.
Parameters:
- obj: object implementing buffer protocol
Returns:
Buffer: Buffer wrapping Python object
"""
class Buffer:
"""
Immutable memory buffer.
Attributes:
- address: Memory address
- is_mutable: Whether buffer is mutable
- size: Buffer size in bytes
"""
def __len__(self): ...
def __getitem__(self, key): ...
def equals(self, other):
"""Check buffer equality."""
def slice(self, offset=0, length=None):
"""Create buffer slice."""
def to_pybytes(self):
"""Convert to Python bytes."""
def hex(self):
"""Hexadecimal representation."""
class ResizableBuffer(Buffer):
"""
Mutable resizable memory buffer.
Attributes:
- capacity: Buffer capacity
"""
def resize(self, new_size, shrink_to_fit=True):
"""Resize buffer."""
def reserve(self, capacity):
"""Reserve buffer capacity."""Compression and decompression with support for multiple codecs and configurable compression levels.
def compress(data, codec=None, asbytes=False, memory_pool=None):
"""
Compress data.
Parameters:
- data: bytes-like, data to compress
- codec: str or Codec, compression codec
- asbytes: bool, return bytes instead of Buffer
- memory_pool: MemoryPool, memory pool for allocation
Returns:
Buffer or bytes: Compressed data
"""
def decompress(data, decompressed_size=None, codec=None, memory_pool=None):
"""
Decompress data.
Parameters:
- data: bytes-like, compressed data
- decompressed_size: int, expected decompressed size
- codec: str or Codec, compression codec
- memory_pool: MemoryPool, memory pool for allocation
Returns:
Buffer: Decompressed data
"""
class Codec:
"""
Compression codec interface.
Attributes:
- name: Codec name
- compression_level: Compression level
"""
@staticmethod
def is_available(codec_name):
"""
Check if codec is available.
Parameters:
- codec_name: str, codec name
Returns:
bool: Whether codec is available
"""
def compress(self, data, memory_pool=None):
"""Compress data."""
def decompress(self, data, decompressed_size=None, memory_pool=None):
"""Decompress data."""File interfaces and streaming I/O operations with support for various file types and compression.
def input_stream(source, compression='detect', buffer_size=None):
"""
Create input stream from various sources.
Parameters:
- source: str, file path, or file-like object
- compression: str, compression type or 'detect'
- buffer_size: int, buffer size for reading
Returns:
InputStream: Input stream for reading
"""
def output_stream(source, compression=None, buffer_size=None):
"""
Create output stream.
Parameters:
- source: str, file path, or file-like object
- compression: str, compression type
- buffer_size: int, buffer size for writing
Returns:
OutputStream: Output stream for writing
"""
def memory_map(path, mode='r'):
"""
Create memory map from file.
Parameters:
- path: str, file path
- mode: str, access mode ('r', 'r+', 'w')
Returns:
MemoryMappedFile: Memory mapped file
"""
def create_memory_map(path, size):
"""
Create new memory mapped file.
Parameters:
- path: str, file path
- size: int, file size
Returns:
MemoryMappedFile: New memory mapped file
"""
class NativeFile:
"""
Abstract base for native file implementations.
Attributes:
- closed: Whether file is closed
- mode: File access mode
"""
def close(self): ...
def closed(self): ...
def fileno(self): ...
def flush(self): ...
def isatty(self): ...
def readable(self): ...
def seekable(self): ...
def writable(self): ...
def read(self, nbytes=None): ...
def read1(self, nbytes=None): ...
def readall(self): ...
def readinto(self, b): ...
def readline(self, size=None): ...
def readlines(self, hint=None): ...
def seek(self, pos, whence=0): ...
def tell(self): ...
def truncate(self, size=None): ...
def write(self, data): ...
def writelines(self, lines): ...
class PythonFile(NativeFile):
"""Wrapper for Python file-like objects."""
class OSFile(NativeFile):
"""
Operating system file with memory mapping support.
Attributes:
- size: File size in bytes
"""
class MemoryMappedFile(NativeFile):
"""
Memory-mapped file interface.
Attributes:
- size: File size in bytes
"""
def resize(self, new_size):
"""Resize memory mapped file."""
class BufferedInputStream:
"""Buffered input stream wrapper."""
def __init__(self, stream, buffer_size=None, memory_pool=None): ...
class BufferedOutputStream:
"""Buffered output stream wrapper."""
def __init__(self, stream, buffer_size=None, memory_pool=None): ...
class CompressedInputStream:
"""Compressed input stream."""
def __init__(self, stream, compression, memory_pool=None): ...
class CompressedOutputStream:
"""Compressed output stream."""
def __init__(self, stream, compression, memory_pool=None): ...
class TransformInputStream:
"""Transform input stream with custom function."""
def __init__(self, stream, transform_func): ...
def transcoding_input_stream(stream, src_encoding, dest_encoding):
"""
Create character encoding transform stream.
Parameters:
- stream: input stream
- src_encoding: str, source encoding
- dest_encoding: str, destination encoding
Returns:
TransformInputStream: Transcoding stream
"""
class FixedSizeBufferWriter:
"""Writer to fixed-size buffer."""
def __init__(self, buffer): ...
class BufferReader:
"""Reader from buffer."""
def __init__(self, buffer): ...
class BufferOutputStream:
"""Output stream to growable buffer."""
def __init__(self, memory_pool=None): ...
def getvalue(self):
"""Get buffer contents."""
class MockOutputStream:
"""Mock output stream for testing."""
class CacheOptions:
"""Caching configuration for streams."""
def have_libhdfs():
"""
Check if HDFS support is available.
Returns:
bool: Whether libhdfs is available
"""Device abstraction for CPU and GPU memory management in heterogeneous computing environments.
class Device:
"""
Device abstraction for CPU/GPU memory.
Attributes:
- device_type: Type of device
- device_id: Device identifier
"""
def equals(self, other):
"""Check device equality."""
def __str__(self): ...
class DeviceAllocationType:
"""Enumeration of device allocation types."""
class MemoryManager:
"""
Cross-device memory manager interface.
"""
def allocate(self, size):
"""Allocate device memory."""
def copy_non_owned(self, data, device=None, memory_pool=None):
"""Copy data to device."""
def default_cpu_memory_manager():
"""
Get default CPU memory manager.
Returns:
MemoryManager: CPU memory manager
"""import pyarrow as pa
# Check available memory backends
backends = pa.supported_memory_backends()
print(f"Available backends: {backends}")
# Get default memory pool info
pool = pa.default_memory_pool()
print(f"Backend: {pool.backend_name}")
print(f"Allocated: {pool.bytes_allocated()} bytes")
print(f"Max memory: {pool.max_memory()} bytes")
# Use different memory pools
if 'jemalloc' in backends:
jemalloc_pool = pa.jemalloc_memory_pool()
pa.set_memory_pool(jemalloc_pool)
print("Switched to jemalloc")
# Create logging memory pool
logging_pool = pa.logging_memory_pool(pa.default_memory_pool())
pa.set_memory_pool(logging_pool)
# Enable memory allocation logging
pa.log_memory_allocations(True)
# Create large array to see logging
large_array = pa.array(range(1000000))
print(f"Created array with {len(large_array)} elements")
# Check total allocation
total = pa.total_allocated_bytes()
print(f"Total allocated: {total} bytes")import pyarrow as pa
# Allocate buffer
buffer = pa.allocate_buffer(1024)
print(f"Buffer size: {buffer.size}")
print(f"Buffer address: {buffer.address}")
# Create resizable buffer
resizable = pa.allocate_buffer(512, resizable=True)
print(f"Initial capacity: {resizable.capacity}")
# Resize buffer
resizable.resize(1024)
print(f"New size: {resizable.size}")
print(f"New capacity: {resizable.capacity}")
# Create buffer from bytes
data = b"Hello, Arrow!"
py_buffer = pa.py_buffer(data)
print(f"Buffer from bytes: {py_buffer.to_pybytes()}")
# Buffer slicing
slice_buffer = py_buffer.slice(7, 5) # "Arrow"
print(f"Sliced buffer: {slice_buffer.to_pybytes()}")
# Foreign buffer (advanced usage)
import ctypes
c_array = (ctypes.c_byte * 10)(*range(10))
foreign = pa.foreign_buffer(
ctypes.addressof(c_array),
ctypes.sizeof(c_array),
base=c_array # Keep reference
)
print(f"Foreign buffer: {foreign.to_pybytes()}")import pyarrow as pa
# Check available codecs
codecs = ['gzip', 'snappy', 'lz4', 'zstd', 'brotli']
available = [codec for codec in codecs if pa.Codec.is_available(codec)]
print(f"Available codecs: {available}")
# Compress data
data = b"This is some test data to compress. " * 100
original_size = len(data)
for codec_name in available:
# Compress
compressed = pa.compress(data, codec=codec_name)
compressed_size = compressed.size
# Decompress
decompressed = pa.decompress(compressed, codec=codec_name)
# Verify
assert decompressed.to_pybytes() == data
compression_ratio = original_size / compressed_size
print(f"{codec_name}: {original_size} -> {compressed_size} "
f"(ratio: {compression_ratio:.2f})")
# Use Codec class directly
codec = pa.Codec('snappy')
compressed = codec.compress(data)
decompressed = codec.decompress(compressed)
print(f"Codec class: {len(decompressed.to_pybytes())} bytes")import pyarrow as pa
import tempfile
import os
# Create sample data
table = pa.table({
'id': range(1000),
'value': [x * 1.5 for x in range(1000)]
})
with tempfile.TemporaryDirectory() as temp_dir:
# Memory mapped file
mmap_path = os.path.join(temp_dir, 'mmap_test.bin')
# Create memory mapped file
with pa.create_memory_map(mmap_path, 8192) as mmap_file:
# Write data
data = b"Memory mapped data " * 100
mmap_file.write(data)
mmap_file.flush()
# Read memory mapped file
with pa.memory_map(mmap_path, 'r') as mmap_file:
read_data = mmap_file.read()
print(f"Memory mapped read: {len(read_data)} bytes")
# Compressed I/O
compressed_path = os.path.join(temp_dir, 'compressed.gz')
# Write compressed
with pa.output_stream(compressed_path, compression='gzip') as out:
out.write(b"Compressed data " * 1000)
# Read compressed
with pa.input_stream(compressed_path, compression='gzip') as inp:
read_compressed = inp.read()
print(f"Compressed read: {len(read_compressed)} bytes")
# Buffer I/O
buffer_stream = pa.BufferOutputStream()
buffer_stream.write(b"Buffer stream data")
buffer_contents = buffer_stream.getvalue()
print(f"Buffer stream: {buffer_contents.to_pybytes()}")
# Read from buffer
buffer_reader = pa.BufferReader(buffer_contents)
read_from_buffer = buffer_reader.read()
print(f"Buffer reader: {read_from_buffer}")
# Transcoding example
text_data = "Hello, 世界! 🌍".encode('utf-8')
utf8_stream = pa.BufferReader(pa.py_buffer(text_data))
# Transcode UTF-8 to Latin-1 (this will fail for non-ASCII)
try:
transcoded_stream = pa.transcoding_input_stream(
utf8_stream, 'utf-8', 'latin-1'
)
transcoded_data = transcoded_stream.read()
print(f"Transcoded: {transcoded_data}")
except Exception as e:
print(f"Transcoding failed: {e}")import pyarrow as pa
import gc
def memory_usage_demo():
# Track memory usage
initial_memory = pa.total_allocated_bytes()
# Create large arrays
arrays = []
for i in range(10):
arr = pa.array(range(100000))
arrays.append(arr)
peak_memory = pa.total_allocated_bytes()
print(f"Peak memory usage: {peak_memory - initial_memory} bytes")
# Clear arrays
arrays.clear()
gc.collect() # Force garbage collection
final_memory = pa.total_allocated_bytes()
print(f"Final memory usage: {final_memory - initial_memory} bytes")
# Custom memory pool example
class TrackingMemoryPool(pa.MemoryPool):
"""Example custom memory pool that tracks allocations."""
def __init__(self, base_pool):
self.base_pool = base_pool
self.allocation_count = 0
self.deallocation_count = 0
@property
def backend_name(self):
return f"tracking_{self.base_pool.backend_name}"
def bytes_allocated(self):
return self.base_pool.bytes_allocated()
def max_memory(self):
return self.base_pool.max_memory()
def allocate(self, size):
self.allocation_count += 1
return self.base_pool.allocate(size)
def free(self, buffer, size):
self.deallocation_count += 1
return self.base_pool.free(buffer, size)
# Use custom memory pool
base_pool = pa.default_memory_pool()
tracking_pool = TrackingMemoryPool(base_pool)
# Note: Custom pools in Python have limitations
# This is more of a conceptual example
print(f"Custom pool backend: {tracking_pool.backend_name}")
# Run memory usage demo
memory_usage_demo()Install with Tessl CLI
npx tessl i tessl/pypi-pyarrow