Python library for Apache Arrow columnar memory format and computing libraries
—
Specialized functionality including CUDA GPU support, Substrait query integration, execution engine operations, and data interchange protocols for advanced use cases and system integration scenarios.
GPU memory management and operations for high-performance computing workloads using NVIDIA CUDA.
class Context:
"""
CUDA context wrapper for device operations.
Attributes:
- device_number: CUDA device number
- handle: CUDA context handle
"""
def __init__(self, device_number=0): ...
def memory_manager(self):
"""Get CUDA memory manager."""
def synchronize(self):
"""Synchronize CUDA operations."""
@property
def device_number(self):
"""Get device number."""
class CudaBuffer:
"""
GPU memory buffer.
Attributes:
- context: CUDA context
- size: Buffer size in bytes
- address: GPU memory address
- is_mutable: Whether buffer is mutable
"""
def copy_to_host(self, position=0, nbytes=None, memory_pool=None):
"""Copy data from GPU to host memory."""
def copy_from_host(self, data, position=0):
"""Copy data from host to GPU memory."""
def copy_from_device(self, buf, position=0, source_position=0, nbytes=None):
"""Copy data from another GPU buffer."""
def slice(self, offset, length=None):
"""Create buffer slice."""
def equals(self, other):
"""Check buffer equality."""
def export_for_ipc(self):
"""Export buffer for inter-process communication."""
class HostBuffer:
"""
Pinned host memory buffer for efficient GPU transfers.
Attributes:
- size: Buffer size in bytes
- address: Host memory address
"""
class IpcMemHandle:
"""
Inter-process communication memory handle.
Attributes:
- handle: IPC handle bytes
"""
def open(self, context):
"""Open IPC handle in context."""
def serialize(self):
"""Serialize handle for IPC."""
@classmethod
def from_buffer(cls, buf):
"""Create handle from CUDA buffer."""
class BufferReader:
"""Reader for CUDA buffers."""
def __init__(self, buffer): ...
def read(self, nbytes=None):
"""Read data from buffer."""
def seek(self, position):
"""Seek to position."""
def tell(self):
"""Get current position."""
class BufferWriter:
"""Writer for CUDA buffers."""
def __init__(self, buffer): ...
def write(self, data):
"""Write data to buffer."""
def seek(self, position):
"""Seek to position."""
def tell(self):
"""Get current position."""
def new_host_buffer(size, device_number=0):
"""
Create new pinned host buffer.
Parameters:
- size: int, buffer size in bytes
- device_number: int, CUDA device number
Returns:
HostBuffer: Pinned host buffer
"""
def serialize_record_batch(batch, ctx):
"""
Serialize record batch for CUDA transfer.
Parameters:
- batch: RecordBatch, batch to serialize
- ctx: Context, CUDA context
Returns:
bytes: Serialized batch
"""
def read_message(source, memory_pool=None):
"""
Read CUDA IPC message.
Parameters:
- source: file-like, message source
- memory_pool: MemoryPool, memory pool for allocation
Returns:
Message: CUDA message
"""
def read_record_batch(message, schema, memory_pool=None):
"""
Read record batch from CUDA message.
Parameters:
- message: Message, CUDA message
- schema: Schema, batch schema
- memory_pool: MemoryPool, memory pool for allocation
Returns:
RecordBatch: Record batch
"""Integration with Substrait for standardized query representation and cross-system compatibility.
def run_query(plan, table_provider=None):
"""
Execute Substrait query plan.
Parameters:
- plan: bytes, serialized Substrait plan
- table_provider: callable, function to provide tables by name
Returns:
Table: Query result table
"""
def get_supported_functions():
"""
Get list of supported Substrait functions.
Returns:
list of str: Supported function names
"""
def deserialize_expressions(data, schema):
"""
Deserialize Substrait expressions.
Parameters:
- data: bytes, serialized Substrait expressions
- schema: Schema, input schema
Returns:
BoundExpressions: Bound expressions with Arrow types
"""
def serialize_expressions(expressions, names, schema):
"""
Serialize Arrow expressions to Substrait.
Parameters:
- expressions: list of Expression, Arrow expressions
- names: list of str, expression names
- schema: Schema, input schema
Returns:
bytes: Serialized Substrait expressions
"""
def deserialize_schema(data):
"""
Deserialize Substrait schema.
Parameters:
- data: bytes, serialized Substrait schema
Returns:
SubstraitSchema: Substrait schema representation
"""
def serialize_schema(schema):
"""
Serialize Arrow schema to Substrait.
Parameters:
- schema: Schema, Arrow schema
Returns:
bytes: Serialized Substrait schema
"""
class BoundExpressions:
"""
Bound Substrait expressions with Arrow types.
Attributes:
- expressions: List of bound expressions
- schema: Input schema
"""
def evaluate(self, batch):
"""Evaluate expressions on record batch."""
class SubstraitSchema:
"""
Substrait schema representation.
Attributes:
- names: Field names
- types: Field types
"""
def to_arrow_schema(self):
"""Convert to Arrow schema."""Low-level execution engine operations for building custom query processing pipelines.
class Declaration:
"""
Execution plan node declaration.
Attributes:
- factory_name: Node factory name
- options: Node options
- inputs: Input declarations
"""
def __init__(self, factory_name, options, inputs=None): ...
class ExecNodeOptions:
"""Base execution node options."""
class TableSourceNodeOptions(ExecNodeOptions):
"""
Table source node configuration.
Attributes:
- table: Source table
"""
def __init__(self, table): ...
class FilterNodeOptions(ExecNodeOptions):
"""
Filter node configuration.
Attributes:
- filter_expression: Filter expression
"""
def __init__(self, filter_expression): ...
class ProjectNodeOptions(ExecNodeOptions):
"""
Projection node configuration.
Attributes:
- expressions: Projection expressions
- names: Output field names
"""
def __init__(self, expressions, names=None): ...
class AggregateNodeOptions(ExecNodeOptions):
"""
Aggregation node configuration.
Attributes:
- aggregates: Aggregate functions
- keys: Grouping keys
"""
def __init__(self, aggregates, keys=None): ...
class OrderByNodeOptions(ExecNodeOptions):
"""
Sorting node configuration.
Attributes:
- sort_keys: Sort key expressions
- ordering: Sort ordering (ascending/descending)
"""
def __init__(self, sort_keys, ordering=None): ...
class HashJoinNodeOptions(ExecNodeOptions):
"""
Hash join node configuration.
Attributes:
- join_type: Type of join
- left_keys: Left join keys
- right_keys: Right join keys
- filter: Optional join filter
"""
def __init__(self, join_type, left_keys, right_keys, filter=None): ...
class AsofJoinNodeOptions(ExecNodeOptions):
"""
As-of join node configuration.
Attributes:
- left_keys: Left join keys
- right_keys: Right join keys
- on_key: Temporal join key
- tolerance: Join tolerance
"""
def __init__(self, left_keys, right_keys, on_key, tolerance=None): ...
class ScanNodeOptions(ExecNodeOptions):
"""
Dataset scan node configuration.
Attributes:
- dataset: Dataset to scan
- filter: Scan filter
- projection: Column projection
"""
def __init__(self, dataset, filter=None, projection=None): ...Support for data interchange protocols enabling interoperability with other data systems.
def from_dataframe(df, preserve_index=None, types_mapper=None):
"""
Convert dataframe interchange object to Arrow Table.
Parameters:
- df: object implementing dataframe interchange protocol
- preserve_index: bool, preserve dataframe index
- types_mapper: callable, custom type mapping function
Returns:
Table: Arrow table from dataframe interchange object
"""Integration with Java Virtual Machine for interoperability with Java-based systems.
def set_default_jvm_path(path):
"""
Set default JVM path.
Parameters:
- path: str, path to JVM library
"""
def get_default_jvm_path():
"""
Get default JVM path.
Returns:
str: JVM library path
"""
def set_default_jvm_options(options):
"""
Set default JVM options.
Parameters:
- options: list of str, JVM startup options
"""
def get_default_jvm_options():
"""
Get default JVM options.
Returns:
list of str: JVM startup options
"""Global configuration and environment management for PyArrow behavior.
def get_include():
"""
Get Arrow C++ include directory path.
Returns:
str: Include directory path
"""
def get_libraries():
"""
Get list of libraries for linking.
Returns:
list of str: Library names
"""
def get_library_dirs():
"""
Get library directories for linking.
Returns:
list of str: Library directory paths
"""
def create_library_symlinks():
"""Create library symlinks for wheel installations."""
def set_timezone_db_path(path):
"""
Set timezone database path.
Parameters:
- path: str, path to timezone database
"""
def cpu_count():
"""
Get number of CPU cores.
Returns:
int: Number of CPU cores
"""
def set_cpu_count(count):
"""
Set CPU core count for computations.
Parameters:
- count: int, number of CPU cores to use
"""
def io_thread_count():
"""
Get I/O thread count.
Returns:
int: Number of I/O threads
"""
def set_io_thread_count(count):
"""
Set I/O thread count.
Parameters:
- count: int, number of I/O threads to use
"""
def enable_signal_handlers(enable):
"""
Enable/disable signal handling.
Parameters:
- enable: bool, whether to enable signal handlers
"""import pyarrow as pa
# Check if CUDA is available
try:
import pyarrow.cuda as cuda
print("CUDA support available")
except ImportError:
print("CUDA support not available")
exit()
# Create CUDA context
ctx = cuda.Context(device_number=0)
print(f"CUDA device: {ctx.device_number}")
# Create host buffer
host_data = b"Hello, CUDA!" * 1000
host_buffer = cuda.new_host_buffer(len(host_data))
# Copy data to host buffer (conceptual - actual API may differ)
# host_buffer.copy_from_host(host_data)
# Create GPU buffer
gpu_buffer = ctx.memory_manager().allocate(len(host_data))
# Copy from host to GPU
gpu_buffer.copy_from_host(host_data)
# Copy back to host
result_buffer = gpu_buffer.copy_to_host()
print(f"GPU round-trip successful: {len(result_buffer)} bytes")
# Create Arrow array on GPU (conceptual)
cpu_array = pa.array([1, 2, 3, 4, 5])
# Note: Actual GPU array creation would require more setup
# IPC with GPU buffers
ipc_handle = cuda.IpcMemHandle.from_buffer(gpu_buffer)
serialized_handle = ipc_handle.serialize()
print(f"Serialized IPC handle: {len(serialized_handle)} bytes")
# Clean up
ctx.synchronize()import pyarrow as pa
import pyarrow.substrait as substrait
import pyarrow.compute as pc
# Check supported Substrait functions
supported_functions = substrait.get_supported_functions()
print(f"Supported functions: {len(supported_functions)}")
print(f"First 10: {supported_functions[:10]}")
# Create sample data
table = pa.table({
'id': range(100),
'category': ['A', 'B', 'C'] * 34, # Cycling through categories
'value': [i * 1.5 for i in range(100)]
})
# Define table provider for Substrait
def table_provider(names):
"""Provide tables by name for Substrait execution."""
if names == ['main_table']:
return table
else:
raise ValueError(f"Unknown table: {names}")
# Example: Simple filter query (conceptual)
# In practice, you would create or receive a Substrait plan
# This is a simplified example showing the concept
# Create expressions and serialize to Substrait
expressions = [
pc.field('value'),
pc.greater(pc.field('value'), pc.scalar(50))
]
names = ['value', 'filter_condition']
try:
# Serialize expressions to Substrait format
serialized_expressions = substrait.serialize_expressions(
expressions, names, table.schema
)
print(f"Serialized expressions: {len(serialized_expressions)} bytes")
# Deserialize expressions back
bound_expressions = substrait.deserialize_expressions(
serialized_expressions, table.schema
)
print(f"Bound expressions: {bound_expressions}")
except Exception as e:
print(f"Substrait operations not fully available: {e}")
# Schema serialization
try:
serialized_schema = substrait.serialize_schema(table.schema)
print(f"Serialized schema: {len(serialized_schema)} bytes")
deserialized_schema = substrait.deserialize_schema(serialized_schema)
print(f"Deserialized schema: {deserialized_schema}")
except Exception as e:
print(f"Schema serialization not available: {e}")import pyarrow as pa
import pyarrow.acero as acero
import pyarrow.compute as pc
# Create sample tables
table1 = pa.table({
'id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
'dept_id': [10, 20, 10, 30, 20]
})
table2 = pa.table({
'dept_id': [10, 20, 30],
'dept_name': ['Engineering', 'Sales', 'Marketing']
})
# Create execution plan declarations
source1 = acero.Declaration(
"table_source",
acero.TableSourceNodeOptions(table1)
)
source2 = acero.Declaration(
"table_source",
acero.TableSourceNodeOptions(table2)
)
# Filter declaration
filter_decl = acero.Declaration(
"filter",
acero.FilterNodeOptions(pc.greater(pc.field('id'), pc.scalar(2))),
inputs=[source1]
)
# Projection declaration
project_decl = acero.Declaration(
"project",
acero.ProjectNodeOptions([
pc.field('id'),
pc.field('name'),
pc.field('dept_id')
]),
inputs=[filter_decl]
)
# Join declaration
join_decl = acero.Declaration(
"hashjoin",
acero.HashJoinNodeOptions(
join_type="inner",
left_keys=[pc.field('dept_id')],
right_keys=[pc.field('dept_id')]
),
inputs=[project_decl, source2]
)
print("Created execution plan with filter, projection, and join")
print("Note: Actual execution requires Acero runtime")
# Example of aggregation node
agg_decl = acero.Declaration(
"aggregate",
acero.AggregateNodeOptions(
aggregates=[
("count", pc.field('id')),
("mean", pc.field('id'))
],
keys=[pc.field('dept_name')]
),
inputs=[join_decl]
)
print("Added aggregation node to execution plan")import pyarrow as pa
import pyarrow.interchange as interchange
# Create a mock dataframe-like object that implements interchange protocol
class MockDataFrame:
"""Mock dataframe implementing interchange protocol."""
def __init__(self, data):
self.data = data
self._schema = pa.schema([
pa.field(name, pa.infer_type(column))
for name, column in data.items()
])
def __dataframe__(self, nan_as_null=False, allow_copy=True):
"""Implement dataframe interchange protocol."""
# This is a simplified mock - real implementation would be more complex
return self
def select_columns(self, indices):
"""Select columns by indices."""
selected_data = {}
for i, (name, column) in enumerate(self.data.items()):
if i in indices:
selected_data[name] = column
return MockDataFrame(selected_data)
def get_chunks(self, n_chunks=None):
"""Get data chunks."""
# Simplified - return single chunk
return [self]
def to_arrow_table(self):
"""Convert to Arrow table."""
return pa.table(self.data, schema=self._schema)
# Create mock dataframe
mock_df_data = {
'integers': [1, 2, 3, 4, 5],
'floats': [1.1, 2.2, 3.3, 4.4, 5.5],
'strings': ['a', 'b', 'c', 'd', 'e']
}
mock_df = MockDataFrame(mock_df_data)
try:
# Convert using interchange protocol
table = interchange.from_dataframe(mock_df)
print(f"Converted table: {table.schema}")
print(f"Rows: {len(table)}")
except Exception as e:
print(f"Interchange conversion failed: {e}")
# Fallback to direct conversion
table = mock_df.to_arrow_table()
print(f"Direct conversion: {table.schema}")
# Work with real pandas DataFrame (if available)
try:
import pandas as pd
# Create pandas DataFrame
df = pd.DataFrame({
'x': range(10),
'y': [i ** 2 for i in range(10)],
'category': ['A', 'B'] * 5
})
# Convert using interchange protocol
table_from_pandas = interchange.from_dataframe(df)
print(f"Pandas conversion: {table_from_pandas.schema}")
print(f"Rows: {len(table_from_pandas)}")
except ImportError:
print("Pandas not available for interchange demo")
except Exception as e:
print(f"Pandas interchange failed: {e}")import pyarrow as pa
# JVM integration (conceptual example)
try:
# Set JVM path (platform-specific)
import platform
if platform.system() == "Linux":
jvm_path = "/usr/lib/jvm/default/lib/server/libjvm.so"
elif platform.system() == "Darwin": # macOS
jvm_path = "/Library/Java/JavaVirtualMachines/*/Contents/Home/lib/server/libjvm.dylib"
elif platform.system() == "Windows":
jvm_path = "C:\\Program Files\\Java\\*\\bin\\server\\jvm.dll"
else:
jvm_path = None
if jvm_path:
pa.set_default_jvm_path(jvm_path)
current_path = pa.get_default_jvm_path()
print(f"JVM path set to: {current_path}")
# Set JVM options
jvm_options = [
"-Xmx1g", # Maximum heap size
"-XX:+UseG1GC", # Use G1 garbage collector
"-Djava.awt.headless=true" # Headless mode
]
pa.set_default_jvm_options(jvm_options)
current_options = pa.get_default_jvm_options()
print(f"JVM options: {current_options}")
except AttributeError:
print("JVM integration functions not available")import pyarrow as pa
import time
# System information
print("=== PyArrow System Information ===")
pa.show_versions()
print()
print("=== Runtime Information ===")
pa.show_info()
print()
# CPU configuration
original_cpu_count = pa.cpu_count()
print(f"Original CPU count: {original_cpu_count}")
# Set lower CPU count for testing
pa.set_cpu_count(max(1, original_cpu_count // 2))
print(f"Reduced CPU count: {pa.cpu_count()}")
# I/O thread configuration
original_io_threads = pa.io_thread_count()
print(f"Original I/O threads: {original_io_threads}")
pa.set_io_thread_count(4)
print(f"Set I/O threads: {pa.io_thread_count()}")
# Memory monitoring
initial_memory = pa.total_allocated_bytes()
print(f"Initial memory: {initial_memory} bytes")
# Create some data to test memory tracking
large_arrays = []
for i in range(5):
arr = pa.array(range(100000))
large_arrays.append(arr)
peak_memory = pa.total_allocated_bytes()
print(f"Peak memory: {peak_memory} bytes")
print(f"Memory increase: {peak_memory - initial_memory} bytes")
# Clear arrays
large_arrays.clear()
import gc
gc.collect()
final_memory = pa.total_allocated_bytes()
print(f"Final memory: {final_memory} bytes")
# Restore original settings
pa.set_cpu_count(original_cpu_count)
pa.set_io_thread_count(original_io_threads)
print(f"Restored CPU count: {pa.cpu_count()}")
print(f"Restored I/O threads: {pa.io_thread_count()}")
# Signal handling
pa.enable_signal_handlers(True)
print("Signal handlers enabled")
# Library information for development
print(f"Include directory: {pa.get_include()}")
print(f"Libraries: {pa.get_libraries()}")
print(f"Library directories: {pa.get_library_dirs()[:3]}...") # First 3import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset as ds
import tempfile
import os
def advanced_processing_pipeline():
"""Demonstrate advanced PyArrow features in a processing pipeline."""
# Create sample data with complex types
data = pa.table({
'id': range(1000),
'timestamp': pa.array([
f'2023-{(i % 12) + 1:02d}-{(i % 28) + 1:02d} {(i % 24):02d}:00:00'
for i in range(1000)
], type=pa.timestamp('s')),
'values': [
[float(j) for j in range(i % 5 + 1)]
for i in range(1000)
],
'metadata': [
{'source': f'sensor_{i % 10}', 'quality': (i % 100) / 100.0}
for i in range(1000)
]
})
with tempfile.TemporaryDirectory() as tmpdir:
# Write partitioned dataset
ds.write_dataset(
data,
tmpdir,
format='parquet',
partitioning=['id'], # Partition by id ranges
max_rows_per_file=100,
compression='snappy'
)
# Read as dataset
dataset = ds.dataset(tmpdir, format='parquet')
print(f"Dataset schema: {dataset.schema}")
print(f"Dataset files: {len(list(dataset.get_fragments()))}")
# Complex filtering and computation
# Filter by timestamp and compute statistics on nested data
filtered_data = dataset.to_table(
filter=(
pc.greater(pc.field('timestamp'),
pc.strptime(['2023-06-01'], format='%Y-%m-%d', unit='s')[0]) &
pc.less(pc.field('timestamp'),
pc.strptime(['2023-09-01'], format='%Y-%m-%d', unit='s')[0])
),
columns=['id', 'timestamp', 'values']
)
print(f"Filtered data: {len(filtered_data)} rows")
# Compute statistics on list column
list_lengths = pc.list_size(filtered_data['values'])
avg_list_length = pc.mean(list_lengths)
print(f"Average list length: {avg_list_length}")
# Flatten list column and compute aggregate
flattened_values = pc.list_flatten(filtered_data['values'])
total_sum = pc.sum(flattened_values)
print(f"Sum of all flattened values: {total_sum}")
return filtered_data
# Run advanced pipeline
try:
result = advanced_processing_pipeline()
print(f"Pipeline completed successfully: {len(result)} rows processed")
except Exception as e:
print(f"Pipeline error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-pyarrow