0
# Advanced Features
1
2
Specialized functionality including CUDA GPU support, Substrait query integration, execution engine operations, and data interchange protocols for advanced use cases and system integration scenarios.
3
4
## Capabilities
5
6
### CUDA GPU Support
7
8
GPU memory management and operations for high-performance computing workloads using NVIDIA CUDA.
9
10
```python { .api }
11
class Context:
12
"""
13
CUDA context wrapper for device operations.
14
15
Attributes:
16
- device_number: CUDA device number
17
- handle: CUDA context handle
18
"""
19
20
def __init__(self, device_number=0): ...
21
22
def memory_manager(self):
23
"""Get CUDA memory manager."""
24
25
def synchronize(self):
26
"""Synchronize CUDA operations."""
27
28
@property
29
def device_number(self):
30
"""Get device number."""
31
32
class CudaBuffer:
33
"""
34
GPU memory buffer.
35
36
Attributes:
37
- context: CUDA context
38
- size: Buffer size in bytes
39
- address: GPU memory address
40
- is_mutable: Whether buffer is mutable
41
"""
42
43
def copy_to_host(self, position=0, nbytes=None, memory_pool=None):
44
"""Copy data from GPU to host memory."""
45
46
def copy_from_host(self, data, position=0):
47
"""Copy data from host to GPU memory."""
48
49
def copy_from_device(self, buf, position=0, source_position=0, nbytes=None):
50
"""Copy data from another GPU buffer."""
51
52
def slice(self, offset, length=None):
53
"""Create buffer slice."""
54
55
def equals(self, other):
56
"""Check buffer equality."""
57
58
def export_for_ipc(self):
59
"""Export buffer for inter-process communication."""
60
61
class HostBuffer:
62
"""
63
Pinned host memory buffer for efficient GPU transfers.
64
65
Attributes:
66
- size: Buffer size in bytes
67
- address: Host memory address
68
"""
69
70
class IpcMemHandle:
71
"""
72
Inter-process communication memory handle.
73
74
Attributes:
75
- handle: IPC handle bytes
76
"""
77
78
def open(self, context):
79
"""Open IPC handle in context."""
80
81
def serialize(self):
82
"""Serialize handle for IPC."""
83
84
@classmethod
85
def from_buffer(cls, buf):
86
"""Create handle from CUDA buffer."""
87
88
class BufferReader:
89
"""Reader for CUDA buffers."""
90
91
def __init__(self, buffer): ...
92
93
def read(self, nbytes=None):
94
"""Read data from buffer."""
95
96
def seek(self, position):
97
"""Seek to position."""
98
99
def tell(self):
100
"""Get current position."""
101
102
class BufferWriter:
103
"""Writer for CUDA buffers."""
104
105
def __init__(self, buffer): ...
106
107
def write(self, data):
108
"""Write data to buffer."""
109
110
def seek(self, position):
111
"""Seek to position."""
112
113
def tell(self):
114
"""Get current position."""
115
116
def new_host_buffer(size, device_number=0):
117
"""
118
Create new pinned host buffer.
119
120
Parameters:
121
- size: int, buffer size in bytes
122
- device_number: int, CUDA device number
123
124
Returns:
125
HostBuffer: Pinned host buffer
126
"""
127
128
def serialize_record_batch(batch, ctx):
129
"""
130
Serialize record batch for CUDA transfer.
131
132
Parameters:
133
- batch: RecordBatch, batch to serialize
134
- ctx: Context, CUDA context
135
136
Returns:
137
bytes: Serialized batch
138
"""
139
140
def read_message(source, memory_pool=None):
141
"""
142
Read CUDA IPC message.
143
144
Parameters:
145
- source: file-like, message source
146
- memory_pool: MemoryPool, memory pool for allocation
147
148
Returns:
149
Message: CUDA message
150
"""
151
152
def read_record_batch(message, schema, memory_pool=None):
153
"""
154
Read record batch from CUDA message.
155
156
Parameters:
157
- message: Message, CUDA message
158
- schema: Schema, batch schema
159
- memory_pool: MemoryPool, memory pool for allocation
160
161
Returns:
162
RecordBatch: Record batch
163
"""
164
```
165
166
### Substrait Query Integration
167
168
Integration with Substrait for standardized query representation and cross-system compatibility.
169
170
```python { .api }
171
def run_query(plan, table_provider=None):
172
"""
173
Execute Substrait query plan.
174
175
Parameters:
176
- plan: bytes, serialized Substrait plan
177
- table_provider: callable, function to provide tables by name
178
179
Returns:
180
Table: Query result table
181
"""
182
183
def get_supported_functions():
184
"""
185
Get list of supported Substrait functions.
186
187
Returns:
188
list of str: Supported function names
189
"""
190
191
def deserialize_expressions(data, schema):
192
"""
193
Deserialize Substrait expressions.
194
195
Parameters:
196
- data: bytes, serialized Substrait expressions
197
- schema: Schema, input schema
198
199
Returns:
200
BoundExpressions: Bound expressions with Arrow types
201
"""
202
203
def serialize_expressions(expressions, names, schema):
204
"""
205
Serialize Arrow expressions to Substrait.
206
207
Parameters:
208
- expressions: list of Expression, Arrow expressions
209
- names: list of str, expression names
210
- schema: Schema, input schema
211
212
Returns:
213
bytes: Serialized Substrait expressions
214
"""
215
216
def deserialize_schema(data):
217
"""
218
Deserialize Substrait schema.
219
220
Parameters:
221
- data: bytes, serialized Substrait schema
222
223
Returns:
224
SubstraitSchema: Substrait schema representation
225
"""
226
227
def serialize_schema(schema):
228
"""
229
Serialize Arrow schema to Substrait.
230
231
Parameters:
232
- schema: Schema, Arrow schema
233
234
Returns:
235
bytes: Serialized Substrait schema
236
"""
237
238
class BoundExpressions:
239
"""
240
Bound Substrait expressions with Arrow types.
241
242
Attributes:
243
- expressions: List of bound expressions
244
- schema: Input schema
245
"""
246
247
def evaluate(self, batch):
248
"""Evaluate expressions on record batch."""
249
250
class SubstraitSchema:
251
"""
252
Substrait schema representation.
253
254
Attributes:
255
- names: Field names
256
- types: Field types
257
"""
258
259
def to_arrow_schema(self):
260
"""Convert to Arrow schema."""
261
```
262
263
### Acero Execution Engine
264
265
Low-level execution engine operations for building custom query processing pipelines.
266
267
```python { .api }
268
class Declaration:
269
"""
270
Execution plan node declaration.
271
272
Attributes:
273
- factory_name: Node factory name
274
- options: Node options
275
- inputs: Input declarations
276
"""
277
278
def __init__(self, factory_name, options, inputs=None): ...
279
280
class ExecNodeOptions:
281
"""Base execution node options."""
282
283
class TableSourceNodeOptions(ExecNodeOptions):
284
"""
285
Table source node configuration.
286
287
Attributes:
288
- table: Source table
289
"""
290
291
def __init__(self, table): ...
292
293
class FilterNodeOptions(ExecNodeOptions):
294
"""
295
Filter node configuration.
296
297
Attributes:
298
- filter_expression: Filter expression
299
"""
300
301
def __init__(self, filter_expression): ...
302
303
class ProjectNodeOptions(ExecNodeOptions):
304
"""
305
Projection node configuration.
306
307
Attributes:
308
- expressions: Projection expressions
309
- names: Output field names
310
"""
311
312
def __init__(self, expressions, names=None): ...
313
314
class AggregateNodeOptions(ExecNodeOptions):
315
"""
316
Aggregation node configuration.
317
318
Attributes:
319
- aggregates: Aggregate functions
320
- keys: Grouping keys
321
"""
322
323
def __init__(self, aggregates, keys=None): ...
324
325
class OrderByNodeOptions(ExecNodeOptions):
326
"""
327
Sorting node configuration.
328
329
Attributes:
330
- sort_keys: Sort key expressions
331
- ordering: Sort ordering (ascending/descending)
332
"""
333
334
def __init__(self, sort_keys, ordering=None): ...
335
336
class HashJoinNodeOptions(ExecNodeOptions):
337
"""
338
Hash join node configuration.
339
340
Attributes:
341
- join_type: Type of join
342
- left_keys: Left join keys
343
- right_keys: Right join keys
344
- filter: Optional join filter
345
"""
346
347
def __init__(self, join_type, left_keys, right_keys, filter=None): ...
348
349
class AsofJoinNodeOptions(ExecNodeOptions):
350
"""
351
As-of join node configuration.
352
353
Attributes:
354
- left_keys: Left join keys
355
- right_keys: Right join keys
356
- on_key: Temporal join key
357
- tolerance: Join tolerance
358
"""
359
360
def __init__(self, left_keys, right_keys, on_key, tolerance=None): ...
361
362
class ScanNodeOptions(ExecNodeOptions):
363
"""
364
Dataset scan node configuration.
365
366
Attributes:
367
- dataset: Dataset to scan
368
- filter: Scan filter
369
- projection: Column projection
370
"""
371
372
def __init__(self, dataset, filter=None, projection=None): ...
373
```
374
375
### Data Interchange Protocol
376
377
Support for data interchange protocols enabling interoperability with other data systems.
378
379
```python { .api }
380
def from_dataframe(df, preserve_index=None, types_mapper=None):
381
"""
382
Convert dataframe interchange object to Arrow Table.
383
384
Parameters:
385
- df: object implementing dataframe interchange protocol
386
- preserve_index: bool, preserve dataframe index
387
- types_mapper: callable, custom type mapping function
388
389
Returns:
390
Table: Arrow table from dataframe interchange object
391
"""
392
```
393
394
### JVM Integration
395
396
Integration with Java Virtual Machine for interoperability with Java-based systems.
397
398
```python { .api }
399
def set_default_jvm_path(path):
400
"""
401
Set default JVM path.
402
403
Parameters:
404
- path: str, path to JVM library
405
"""
406
407
def get_default_jvm_path():
408
"""
409
Get default JVM path.
410
411
Returns:
412
str: JVM library path
413
"""
414
415
def set_default_jvm_options(options):
416
"""
417
Set default JVM options.
418
419
Parameters:
420
- options: list of str, JVM startup options
421
"""
422
423
def get_default_jvm_options():
424
"""
425
Get default JVM options.
426
427
Returns:
428
list of str: JVM startup options
429
"""
430
```
431
432
### Configuration and Environment
433
434
Global configuration and environment management for PyArrow behavior.
435
436
```python { .api }
437
def get_include():
438
"""
439
Get Arrow C++ include directory path.
440
441
Returns:
442
str: Include directory path
443
"""
444
445
def get_libraries():
446
"""
447
Get list of libraries for linking.
448
449
Returns:
450
list of str: Library names
451
"""
452
453
def get_library_dirs():
454
"""
455
Get library directories for linking.
456
457
Returns:
458
list of str: Library directory paths
459
"""
460
461
def create_library_symlinks():
462
"""Create library symlinks for wheel installations."""
463
464
def set_timezone_db_path(path):
465
"""
466
Set timezone database path.
467
468
Parameters:
469
- path: str, path to timezone database
470
"""
471
472
def cpu_count():
473
"""
474
Get number of CPU cores.
475
476
Returns:
477
int: Number of CPU cores
478
"""
479
480
def set_cpu_count(count):
481
"""
482
Set CPU core count for computations.
483
484
Parameters:
485
- count: int, number of CPU cores to use
486
"""
487
488
def io_thread_count():
489
"""
490
Get I/O thread count.
491
492
Returns:
493
int: Number of I/O threads
494
"""
495
496
def set_io_thread_count(count):
497
"""
498
Set I/O thread count.
499
500
Parameters:
501
- count: int, number of I/O threads to use
502
"""
503
504
def enable_signal_handlers(enable):
505
"""
506
Enable/disable signal handling.
507
508
Parameters:
509
- enable: bool, whether to enable signal handlers
510
"""
511
```
512
513
## Usage Examples
514
515
### CUDA GPU Operations
516
517
```python
518
import pyarrow as pa
519
520
# Check if CUDA is available
521
try:
522
import pyarrow.cuda as cuda
523
print("CUDA support available")
524
except ImportError:
525
print("CUDA support not available")
526
exit()
527
528
# Create CUDA context
529
ctx = cuda.Context(device_number=0)
530
print(f"CUDA device: {ctx.device_number}")
531
532
# Create host buffer
533
host_data = b"Hello, CUDA!" * 1000
534
host_buffer = cuda.new_host_buffer(len(host_data))
535
536
# Copy data to host buffer (conceptual - actual API may differ)
537
# host_buffer.copy_from_host(host_data)
538
539
# Create GPU buffer
540
gpu_buffer = ctx.memory_manager().allocate(len(host_data))
541
542
# Copy from host to GPU
543
gpu_buffer.copy_from_host(host_data)
544
545
# Copy back to host
546
result_buffer = gpu_buffer.copy_to_host()
547
print(f"GPU round-trip successful: {len(result_buffer)} bytes")
548
549
# Create Arrow array on GPU (conceptual)
550
cpu_array = pa.array([1, 2, 3, 4, 5])
551
# Note: Actual GPU array creation would require more setup
552
553
# IPC with GPU buffers
554
ipc_handle = cuda.IpcMemHandle.from_buffer(gpu_buffer)
555
serialized_handle = ipc_handle.serialize()
556
print(f"Serialized IPC handle: {len(serialized_handle)} bytes")
557
558
# Clean up
559
ctx.synchronize()
560
```
561
562
### Substrait Query Integration
563
564
```python
565
import pyarrow as pa
566
import pyarrow.substrait as substrait
567
import pyarrow.compute as pc
568
569
# Check supported Substrait functions
570
supported_functions = substrait.get_supported_functions()
571
print(f"Supported functions: {len(supported_functions)}")
572
print(f"First 10: {supported_functions[:10]}")
573
574
# Create sample data
575
table = pa.table({
576
'id': range(100),
577
'category': ['A', 'B', 'C'] * 34, # Cycling through categories
578
'value': [i * 1.5 for i in range(100)]
579
})
580
581
# Define table provider for Substrait
582
def table_provider(names):
583
"""Provide tables by name for Substrait execution."""
584
if names == ['main_table']:
585
return table
586
else:
587
raise ValueError(f"Unknown table: {names}")
588
589
# Example: Simple filter query (conceptual)
590
# In practice, you would create or receive a Substrait plan
591
# This is a simplified example showing the concept
592
593
# Create expressions and serialize to Substrait
594
expressions = [
595
pc.field('value'),
596
pc.greater(pc.field('value'), pc.scalar(50))
597
]
598
names = ['value', 'filter_condition']
599
600
try:
601
# Serialize expressions to Substrait format
602
serialized_expressions = substrait.serialize_expressions(
603
expressions, names, table.schema
604
)
605
print(f"Serialized expressions: {len(serialized_expressions)} bytes")
606
607
# Deserialize expressions back
608
bound_expressions = substrait.deserialize_expressions(
609
serialized_expressions, table.schema
610
)
611
print(f"Bound expressions: {bound_expressions}")
612
613
except Exception as e:
614
print(f"Substrait operations not fully available: {e}")
615
616
# Schema serialization
617
try:
618
serialized_schema = substrait.serialize_schema(table.schema)
619
print(f"Serialized schema: {len(serialized_schema)} bytes")
620
621
deserialized_schema = substrait.deserialize_schema(serialized_schema)
622
print(f"Deserialized schema: {deserialized_schema}")
623
624
except Exception as e:
625
print(f"Schema serialization not available: {e}")
626
```
627
628
### Acero Execution Engine
629
630
```python
631
import pyarrow as pa
632
import pyarrow.acero as acero
633
import pyarrow.compute as pc
634
635
# Create sample tables
636
table1 = pa.table({
637
'id': [1, 2, 3, 4, 5],
638
'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
639
'dept_id': [10, 20, 10, 30, 20]
640
})
641
642
table2 = pa.table({
643
'dept_id': [10, 20, 30],
644
'dept_name': ['Engineering', 'Sales', 'Marketing']
645
})
646
647
# Create execution plan declarations
648
source1 = acero.Declaration(
649
"table_source",
650
acero.TableSourceNodeOptions(table1)
651
)
652
653
source2 = acero.Declaration(
654
"table_source",
655
acero.TableSourceNodeOptions(table2)
656
)
657
658
# Filter declaration
659
filter_decl = acero.Declaration(
660
"filter",
661
acero.FilterNodeOptions(pc.greater(pc.field('id'), pc.scalar(2))),
662
inputs=[source1]
663
)
664
665
# Projection declaration
666
project_decl = acero.Declaration(
667
"project",
668
acero.ProjectNodeOptions([
669
pc.field('id'),
670
pc.field('name'),
671
pc.field('dept_id')
672
]),
673
inputs=[filter_decl]
674
)
675
676
# Join declaration
677
join_decl = acero.Declaration(
678
"hashjoin",
679
acero.HashJoinNodeOptions(
680
join_type="inner",
681
left_keys=[pc.field('dept_id')],
682
right_keys=[pc.field('dept_id')]
683
),
684
inputs=[project_decl, source2]
685
)
686
687
print("Created execution plan with filter, projection, and join")
688
print("Note: Actual execution requires Acero runtime")
689
690
# Example of aggregation node
691
agg_decl = acero.Declaration(
692
"aggregate",
693
acero.AggregateNodeOptions(
694
aggregates=[
695
("count", pc.field('id')),
696
("mean", pc.field('id'))
697
],
698
keys=[pc.field('dept_name')]
699
),
700
inputs=[join_decl]
701
)
702
703
print("Added aggregation node to execution plan")
704
```
705
706
### Data Interchange Protocol
707
708
```python
709
import pyarrow as pa
710
import pyarrow.interchange as interchange
711
712
# Create a mock dataframe-like object that implements interchange protocol
713
class MockDataFrame:
714
"""Mock dataframe implementing interchange protocol."""
715
716
def __init__(self, data):
717
self.data = data
718
self._schema = pa.schema([
719
pa.field(name, pa.infer_type(column))
720
for name, column in data.items()
721
])
722
723
def __dataframe__(self, nan_as_null=False, allow_copy=True):
724
"""Implement dataframe interchange protocol."""
725
# This is a simplified mock - real implementation would be more complex
726
return self
727
728
def select_columns(self, indices):
729
"""Select columns by indices."""
730
selected_data = {}
731
for i, (name, column) in enumerate(self.data.items()):
732
if i in indices:
733
selected_data[name] = column
734
return MockDataFrame(selected_data)
735
736
def get_chunks(self, n_chunks=None):
737
"""Get data chunks."""
738
# Simplified - return single chunk
739
return [self]
740
741
def to_arrow_table(self):
742
"""Convert to Arrow table."""
743
return pa.table(self.data, schema=self._schema)
744
745
# Create mock dataframe
746
mock_df_data = {
747
'integers': [1, 2, 3, 4, 5],
748
'floats': [1.1, 2.2, 3.3, 4.4, 5.5],
749
'strings': ['a', 'b', 'c', 'd', 'e']
750
}
751
mock_df = MockDataFrame(mock_df_data)
752
753
try:
754
# Convert using interchange protocol
755
table = interchange.from_dataframe(mock_df)
756
print(f"Converted table: {table.schema}")
757
print(f"Rows: {len(table)}")
758
759
except Exception as e:
760
print(f"Interchange conversion failed: {e}")
761
# Fallback to direct conversion
762
table = mock_df.to_arrow_table()
763
print(f"Direct conversion: {table.schema}")
764
765
# Work with real pandas DataFrame (if available)
766
try:
767
import pandas as pd
768
769
# Create pandas DataFrame
770
df = pd.DataFrame({
771
'x': range(10),
772
'y': [i ** 2 for i in range(10)],
773
'category': ['A', 'B'] * 5
774
})
775
776
# Convert using interchange protocol
777
table_from_pandas = interchange.from_dataframe(df)
778
print(f"Pandas conversion: {table_from_pandas.schema}")
779
print(f"Rows: {len(table_from_pandas)}")
780
781
except ImportError:
782
print("Pandas not available for interchange demo")
783
except Exception as e:
784
print(f"Pandas interchange failed: {e}")
785
```
786
787
### JVM Integration
788
789
```python
790
import pyarrow as pa
791
792
# JVM integration (conceptual example)
793
try:
794
# Set JVM path (platform-specific)
795
import platform
796
if platform.system() == "Linux":
797
jvm_path = "/usr/lib/jvm/default/lib/server/libjvm.so"
798
elif platform.system() == "Darwin": # macOS
799
jvm_path = "/Library/Java/JavaVirtualMachines/*/Contents/Home/lib/server/libjvm.dylib"
800
elif platform.system() == "Windows":
801
jvm_path = "C:\\Program Files\\Java\\*\\bin\\server\\jvm.dll"
802
else:
803
jvm_path = None
804
805
if jvm_path:
806
pa.set_default_jvm_path(jvm_path)
807
current_path = pa.get_default_jvm_path()
808
print(f"JVM path set to: {current_path}")
809
810
# Set JVM options
811
jvm_options = [
812
"-Xmx1g", # Maximum heap size
813
"-XX:+UseG1GC", # Use G1 garbage collector
814
"-Djava.awt.headless=true" # Headless mode
815
]
816
pa.set_default_jvm_options(jvm_options)
817
current_options = pa.get_default_jvm_options()
818
print(f"JVM options: {current_options}")
819
820
except AttributeError:
821
print("JVM integration functions not available")
822
```
823
824
### Performance Monitoring and Configuration
825
826
```python
827
import pyarrow as pa
828
import time
829
830
# System information
831
print("=== PyArrow System Information ===")
832
pa.show_versions()
833
print()
834
835
print("=== Runtime Information ===")
836
pa.show_info()
837
print()
838
839
# CPU configuration
840
original_cpu_count = pa.cpu_count()
841
print(f"Original CPU count: {original_cpu_count}")
842
843
# Set lower CPU count for testing
844
pa.set_cpu_count(max(1, original_cpu_count // 2))
845
print(f"Reduced CPU count: {pa.cpu_count()}")
846
847
# I/O thread configuration
848
original_io_threads = pa.io_thread_count()
849
print(f"Original I/O threads: {original_io_threads}")
850
851
pa.set_io_thread_count(4)
852
print(f"Set I/O threads: {pa.io_thread_count()}")
853
854
# Memory monitoring
855
initial_memory = pa.total_allocated_bytes()
856
print(f"Initial memory: {initial_memory} bytes")
857
858
# Create some data to test memory tracking
859
large_arrays = []
860
for i in range(5):
861
arr = pa.array(range(100000))
862
large_arrays.append(arr)
863
864
peak_memory = pa.total_allocated_bytes()
865
print(f"Peak memory: {peak_memory} bytes")
866
print(f"Memory increase: {peak_memory - initial_memory} bytes")
867
868
# Clear arrays
869
large_arrays.clear()
870
import gc
871
gc.collect()
872
873
final_memory = pa.total_allocated_bytes()
874
print(f"Final memory: {final_memory} bytes")
875
876
# Restore original settings
877
pa.set_cpu_count(original_cpu_count)
878
pa.set_io_thread_count(original_io_threads)
879
print(f"Restored CPU count: {pa.cpu_count()}")
880
print(f"Restored I/O threads: {pa.io_thread_count()}")
881
882
# Signal handling
883
pa.enable_signal_handlers(True)
884
print("Signal handlers enabled")
885
886
# Library information for development
887
print(f"Include directory: {pa.get_include()}")
888
print(f"Libraries: {pa.get_libraries()}")
889
print(f"Library directories: {pa.get_library_dirs()[:3]}...") # First 3
890
```
891
892
### Advanced Data Processing Pipeline
893
894
```python
895
import pyarrow as pa
896
import pyarrow.compute as pc
897
import pyarrow.dataset as ds
898
import tempfile
899
import os
900
901
def advanced_processing_pipeline():
902
"""Demonstrate advanced PyArrow features in a processing pipeline."""
903
904
# Create sample data with complex types
905
data = pa.table({
906
'id': range(1000),
907
'timestamp': pa.array([
908
f'2023-{(i % 12) + 1:02d}-{(i % 28) + 1:02d} {(i % 24):02d}:00:00'
909
for i in range(1000)
910
], type=pa.timestamp('s')),
911
'values': [
912
[float(j) for j in range(i % 5 + 1)]
913
for i in range(1000)
914
],
915
'metadata': [
916
{'source': f'sensor_{i % 10}', 'quality': (i % 100) / 100.0}
917
for i in range(1000)
918
]
919
})
920
921
with tempfile.TemporaryDirectory() as tmpdir:
922
# Write partitioned dataset
923
ds.write_dataset(
924
data,
925
tmpdir,
926
format='parquet',
927
partitioning=['id'], # Partition by id ranges
928
max_rows_per_file=100,
929
compression='snappy'
930
)
931
932
# Read as dataset
933
dataset = ds.dataset(tmpdir, format='parquet')
934
935
print(f"Dataset schema: {dataset.schema}")
936
print(f"Dataset files: {len(list(dataset.get_fragments()))}")
937
938
# Complex filtering and computation
939
# Filter by timestamp and compute statistics on nested data
940
filtered_data = dataset.to_table(
941
filter=(
942
pc.greater(pc.field('timestamp'),
943
pc.strptime(['2023-06-01'], format='%Y-%m-%d', unit='s')[0]) &
944
pc.less(pc.field('timestamp'),
945
pc.strptime(['2023-09-01'], format='%Y-%m-%d', unit='s')[0])
946
),
947
columns=['id', 'timestamp', 'values']
948
)
949
950
print(f"Filtered data: {len(filtered_data)} rows")
951
952
# Compute statistics on list column
953
list_lengths = pc.list_size(filtered_data['values'])
954
avg_list_length = pc.mean(list_lengths)
955
956
print(f"Average list length: {avg_list_length}")
957
958
# Flatten list column and compute aggregate
959
flattened_values = pc.list_flatten(filtered_data['values'])
960
total_sum = pc.sum(flattened_values)
961
962
print(f"Sum of all flattened values: {total_sum}")
963
964
return filtered_data
965
966
# Run advanced pipeline
967
try:
968
result = advanced_processing_pipeline()
969
print(f"Pipeline completed successfully: {len(result)} rows processed")
970
except Exception as e:
971
print(f"Pipeline error: {e}")
972
```