or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-features.mdarrow-flight.mdcompute-functions.mdcore-data-structures.mddata-types.mddataset-operations.mdfile-formats.mdindex.mdmemory-io.md

advanced-features.mddocs/

0

# Advanced Features

1

2

Specialized functionality including CUDA GPU support, Substrait query integration, execution engine operations, and data interchange protocols for advanced use cases and system integration scenarios.

3

4

## Capabilities

5

6

### CUDA GPU Support

7

8

GPU memory management and operations for high-performance computing workloads using NVIDIA CUDA.

9

10

```python { .api }

11

class Context:

12

"""

13

CUDA context wrapper for device operations.

14

15

Attributes:

16

- device_number: CUDA device number

17

- handle: CUDA context handle

18

"""

19

20

def __init__(self, device_number=0): ...

21

22

def memory_manager(self):

23

"""Get CUDA memory manager."""

24

25

def synchronize(self):

26

"""Synchronize CUDA operations."""

27

28

@property

29

def device_number(self):

30

"""Get device number."""

31

32

class CudaBuffer:

33

"""

34

GPU memory buffer.

35

36

Attributes:

37

- context: CUDA context

38

- size: Buffer size in bytes

39

- address: GPU memory address

40

- is_mutable: Whether buffer is mutable

41

"""

42

43

def copy_to_host(self, position=0, nbytes=None, memory_pool=None):

44

"""Copy data from GPU to host memory."""

45

46

def copy_from_host(self, data, position=0):

47

"""Copy data from host to GPU memory."""

48

49

def copy_from_device(self, buf, position=0, source_position=0, nbytes=None):

50

"""Copy data from another GPU buffer."""

51

52

def slice(self, offset, length=None):

53

"""Create buffer slice."""

54

55

def equals(self, other):

56

"""Check buffer equality."""

57

58

def export_for_ipc(self):

59

"""Export buffer for inter-process communication."""

60

61

class HostBuffer:

62

"""

63

Pinned host memory buffer for efficient GPU transfers.

64

65

Attributes:

66

- size: Buffer size in bytes

67

- address: Host memory address

68

"""

69

70

class IpcMemHandle:

71

"""

72

Inter-process communication memory handle.

73

74

Attributes:

75

- handle: IPC handle bytes

76

"""

77

78

def open(self, context):

79

"""Open IPC handle in context."""

80

81

def serialize(self):

82

"""Serialize handle for IPC."""

83

84

@classmethod

85

def from_buffer(cls, buf):

86

"""Create handle from CUDA buffer."""

87

88

class BufferReader:

89

"""Reader for CUDA buffers."""

90

91

def __init__(self, buffer): ...

92

93

def read(self, nbytes=None):

94

"""Read data from buffer."""

95

96

def seek(self, position):

97

"""Seek to position."""

98

99

def tell(self):

100

"""Get current position."""

101

102

class BufferWriter:

103

"""Writer for CUDA buffers."""

104

105

def __init__(self, buffer): ...

106

107

def write(self, data):

108

"""Write data to buffer."""

109

110

def seek(self, position):

111

"""Seek to position."""

112

113

def tell(self):

114

"""Get current position."""

115

116

def new_host_buffer(size, device_number=0):

117

"""

118

Create new pinned host buffer.

119

120

Parameters:

121

- size: int, buffer size in bytes

122

- device_number: int, CUDA device number

123

124

Returns:

125

HostBuffer: Pinned host buffer

126

"""

127

128

def serialize_record_batch(batch, ctx):

129

"""

130

Serialize record batch for CUDA transfer.

131

132

Parameters:

133

- batch: RecordBatch, batch to serialize

134

- ctx: Context, CUDA context

135

136

Returns:

137

bytes: Serialized batch

138

"""

139

140

def read_message(source, memory_pool=None):

141

"""

142

Read CUDA IPC message.

143

144

Parameters:

145

- source: file-like, message source

146

- memory_pool: MemoryPool, memory pool for allocation

147

148

Returns:

149

Message: CUDA message

150

"""

151

152

def read_record_batch(message, schema, memory_pool=None):

153

"""

154

Read record batch from CUDA message.

155

156

Parameters:

157

- message: Message, CUDA message

158

- schema: Schema, batch schema

159

- memory_pool: MemoryPool, memory pool for allocation

160

161

Returns:

162

RecordBatch: Record batch

163

"""

164

```

165

166

### Substrait Query Integration

167

168

Integration with Substrait for standardized query representation and cross-system compatibility.

169

170

```python { .api }

171

def run_query(plan, table_provider=None):

172

"""

173

Execute Substrait query plan.

174

175

Parameters:

176

- plan: bytes, serialized Substrait plan

177

- table_provider: callable, function to provide tables by name

178

179

Returns:

180

Table: Query result table

181

"""

182

183

def get_supported_functions():

184

"""

185

Get list of supported Substrait functions.

186

187

Returns:

188

list of str: Supported function names

189

"""

190

191

def deserialize_expressions(data, schema):

192

"""

193

Deserialize Substrait expressions.

194

195

Parameters:

196

- data: bytes, serialized Substrait expressions

197

- schema: Schema, input schema

198

199

Returns:

200

BoundExpressions: Bound expressions with Arrow types

201

"""

202

203

def serialize_expressions(expressions, names, schema):

204

"""

205

Serialize Arrow expressions to Substrait.

206

207

Parameters:

208

- expressions: list of Expression, Arrow expressions

209

- names: list of str, expression names

210

- schema: Schema, input schema

211

212

Returns:

213

bytes: Serialized Substrait expressions

214

"""

215

216

def deserialize_schema(data):

217

"""

218

Deserialize Substrait schema.

219

220

Parameters:

221

- data: bytes, serialized Substrait schema

222

223

Returns:

224

SubstraitSchema: Substrait schema representation

225

"""

226

227

def serialize_schema(schema):

228

"""

229

Serialize Arrow schema to Substrait.

230

231

Parameters:

232

- schema: Schema, Arrow schema

233

234

Returns:

235

bytes: Serialized Substrait schema

236

"""

237

238

class BoundExpressions:

239

"""

240

Bound Substrait expressions with Arrow types.

241

242

Attributes:

243

- expressions: List of bound expressions

244

- schema: Input schema

245

"""

246

247

def evaluate(self, batch):

248

"""Evaluate expressions on record batch."""

249

250

class SubstraitSchema:

251

"""

252

Substrait schema representation.

253

254

Attributes:

255

- names: Field names

256

- types: Field types

257

"""

258

259

def to_arrow_schema(self):

260

"""Convert to Arrow schema."""

261

```

262

263

### Acero Execution Engine

264

265

Low-level execution engine operations for building custom query processing pipelines.

266

267

```python { .api }

268

class Declaration:

269

"""

270

Execution plan node declaration.

271

272

Attributes:

273

- factory_name: Node factory name

274

- options: Node options

275

- inputs: Input declarations

276

"""

277

278

def __init__(self, factory_name, options, inputs=None): ...

279

280

class ExecNodeOptions:

281

"""Base execution node options."""

282

283

class TableSourceNodeOptions(ExecNodeOptions):

284

"""

285

Table source node configuration.

286

287

Attributes:

288

- table: Source table

289

"""

290

291

def __init__(self, table): ...

292

293

class FilterNodeOptions(ExecNodeOptions):

294

"""

295

Filter node configuration.

296

297

Attributes:

298

- filter_expression: Filter expression

299

"""

300

301

def __init__(self, filter_expression): ...

302

303

class ProjectNodeOptions(ExecNodeOptions):

304

"""

305

Projection node configuration.

306

307

Attributes:

308

- expressions: Projection expressions

309

- names: Output field names

310

"""

311

312

def __init__(self, expressions, names=None): ...

313

314

class AggregateNodeOptions(ExecNodeOptions):

315

"""

316

Aggregation node configuration.

317

318

Attributes:

319

- aggregates: Aggregate functions

320

- keys: Grouping keys

321

"""

322

323

def __init__(self, aggregates, keys=None): ...

324

325

class OrderByNodeOptions(ExecNodeOptions):

326

"""

327

Sorting node configuration.

328

329

Attributes:

330

- sort_keys: Sort key expressions

331

- ordering: Sort ordering (ascending/descending)

332

"""

333

334

def __init__(self, sort_keys, ordering=None): ...

335

336

class HashJoinNodeOptions(ExecNodeOptions):

337

"""

338

Hash join node configuration.

339

340

Attributes:

341

- join_type: Type of join

342

- left_keys: Left join keys

343

- right_keys: Right join keys

344

- filter: Optional join filter

345

"""

346

347

def __init__(self, join_type, left_keys, right_keys, filter=None): ...

348

349

class AsofJoinNodeOptions(ExecNodeOptions):

350

"""

351

As-of join node configuration.

352

353

Attributes:

354

- left_keys: Left join keys

355

- right_keys: Right join keys

356

- on_key: Temporal join key

357

- tolerance: Join tolerance

358

"""

359

360

def __init__(self, left_keys, right_keys, on_key, tolerance=None): ...

361

362

class ScanNodeOptions(ExecNodeOptions):

363

"""

364

Dataset scan node configuration.

365

366

Attributes:

367

- dataset: Dataset to scan

368

- filter: Scan filter

369

- projection: Column projection

370

"""

371

372

def __init__(self, dataset, filter=None, projection=None): ...

373

```

374

375

### Data Interchange Protocol

376

377

Support for data interchange protocols enabling interoperability with other data systems.

378

379

```python { .api }

380

def from_dataframe(df, preserve_index=None, types_mapper=None):

381

"""

382

Convert dataframe interchange object to Arrow Table.

383

384

Parameters:

385

- df: object implementing dataframe interchange protocol

386

- preserve_index: bool, preserve dataframe index

387

- types_mapper: callable, custom type mapping function

388

389

Returns:

390

Table: Arrow table from dataframe interchange object

391

"""

392

```

393

394

### JVM Integration

395

396

Integration with Java Virtual Machine for interoperability with Java-based systems.

397

398

```python { .api }

399

def set_default_jvm_path(path):

400

"""

401

Set default JVM path.

402

403

Parameters:

404

- path: str, path to JVM library

405

"""

406

407

def get_default_jvm_path():

408

"""

409

Get default JVM path.

410

411

Returns:

412

str: JVM library path

413

"""

414

415

def set_default_jvm_options(options):

416

"""

417

Set default JVM options.

418

419

Parameters:

420

- options: list of str, JVM startup options

421

"""

422

423

def get_default_jvm_options():

424

"""

425

Get default JVM options.

426

427

Returns:

428

list of str: JVM startup options

429

"""

430

```

431

432

### Configuration and Environment

433

434

Global configuration and environment management for PyArrow behavior.

435

436

```python { .api }

437

def get_include():

438

"""

439

Get Arrow C++ include directory path.

440

441

Returns:

442

str: Include directory path

443

"""

444

445

def get_libraries():

446

"""

447

Get list of libraries for linking.

448

449

Returns:

450

list of str: Library names

451

"""

452

453

def get_library_dirs():

454

"""

455

Get library directories for linking.

456

457

Returns:

458

list of str: Library directory paths

459

"""

460

461

def create_library_symlinks():

462

"""Create library symlinks for wheel installations."""

463

464

def set_timezone_db_path(path):

465

"""

466

Set timezone database path.

467

468

Parameters:

469

- path: str, path to timezone database

470

"""

471

472

def cpu_count():

473

"""

474

Get number of CPU cores.

475

476

Returns:

477

int: Number of CPU cores

478

"""

479

480

def set_cpu_count(count):

481

"""

482

Set CPU core count for computations.

483

484

Parameters:

485

- count: int, number of CPU cores to use

486

"""

487

488

def io_thread_count():

489

"""

490

Get I/O thread count.

491

492

Returns:

493

int: Number of I/O threads

494

"""

495

496

def set_io_thread_count(count):

497

"""

498

Set I/O thread count.

499

500

Parameters:

501

- count: int, number of I/O threads to use

502

"""

503

504

def enable_signal_handlers(enable):

505

"""

506

Enable/disable signal handling.

507

508

Parameters:

509

- enable: bool, whether to enable signal handlers

510

"""

511

```

512

513

## Usage Examples

514

515

### CUDA GPU Operations

516

517

```python

518

import pyarrow as pa

519

520

# Check if CUDA is available

521

try:

522

import pyarrow.cuda as cuda

523

print("CUDA support available")

524

except ImportError:

525

print("CUDA support not available")

526

exit()

527

528

# Create CUDA context

529

ctx = cuda.Context(device_number=0)

530

print(f"CUDA device: {ctx.device_number}")

531

532

# Create host buffer

533

host_data = b"Hello, CUDA!" * 1000

534

host_buffer = cuda.new_host_buffer(len(host_data))

535

536

# Copy data to host buffer (conceptual - actual API may differ)

537

# host_buffer.copy_from_host(host_data)

538

539

# Create GPU buffer

540

gpu_buffer = ctx.memory_manager().allocate(len(host_data))

541

542

# Copy from host to GPU

543

gpu_buffer.copy_from_host(host_data)

544

545

# Copy back to host

546

result_buffer = gpu_buffer.copy_to_host()

547

print(f"GPU round-trip successful: {len(result_buffer)} bytes")

548

549

# Create Arrow array on GPU (conceptual)

550

cpu_array = pa.array([1, 2, 3, 4, 5])

551

# Note: Actual GPU array creation would require more setup

552

553

# IPC with GPU buffers

554

ipc_handle = cuda.IpcMemHandle.from_buffer(gpu_buffer)

555

serialized_handle = ipc_handle.serialize()

556

print(f"Serialized IPC handle: {len(serialized_handle)} bytes")

557

558

# Clean up

559

ctx.synchronize()

560

```

561

562

### Substrait Query Integration

563

564

```python

565

import pyarrow as pa

566

import pyarrow.substrait as substrait

567

import pyarrow.compute as pc

568

569

# Check supported Substrait functions

570

supported_functions = substrait.get_supported_functions()

571

print(f"Supported functions: {len(supported_functions)}")

572

print(f"First 10: {supported_functions[:10]}")

573

574

# Create sample data

575

table = pa.table({

576

'id': range(100),

577

'category': ['A', 'B', 'C'] * 34, # Cycling through categories

578

'value': [i * 1.5 for i in range(100)]

579

})

580

581

# Define table provider for Substrait

582

def table_provider(names):

583

"""Provide tables by name for Substrait execution."""

584

if names == ['main_table']:

585

return table

586

else:

587

raise ValueError(f"Unknown table: {names}")

588

589

# Example: Simple filter query (conceptual)

590

# In practice, you would create or receive a Substrait plan

591

# This is a simplified example showing the concept

592

593

# Create expressions and serialize to Substrait

594

expressions = [

595

pc.field('value'),

596

pc.greater(pc.field('value'), pc.scalar(50))

597

]

598

names = ['value', 'filter_condition']

599

600

try:

601

# Serialize expressions to Substrait format

602

serialized_expressions = substrait.serialize_expressions(

603

expressions, names, table.schema

604

)

605

print(f"Serialized expressions: {len(serialized_expressions)} bytes")

606

607

# Deserialize expressions back

608

bound_expressions = substrait.deserialize_expressions(

609

serialized_expressions, table.schema

610

)

611

print(f"Bound expressions: {bound_expressions}")

612

613

except Exception as e:

614

print(f"Substrait operations not fully available: {e}")

615

616

# Schema serialization

617

try:

618

serialized_schema = substrait.serialize_schema(table.schema)

619

print(f"Serialized schema: {len(serialized_schema)} bytes")

620

621

deserialized_schema = substrait.deserialize_schema(serialized_schema)

622

print(f"Deserialized schema: {deserialized_schema}")

623

624

except Exception as e:

625

print(f"Schema serialization not available: {e}")

626

```

627

628

### Acero Execution Engine

629

630

```python

631

import pyarrow as pa

632

import pyarrow.acero as acero

633

import pyarrow.compute as pc

634

635

# Create sample tables

636

table1 = pa.table({

637

'id': [1, 2, 3, 4, 5],

638

'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],

639

'dept_id': [10, 20, 10, 30, 20]

640

})

641

642

table2 = pa.table({

643

'dept_id': [10, 20, 30],

644

'dept_name': ['Engineering', 'Sales', 'Marketing']

645

})

646

647

# Create execution plan declarations

648

source1 = acero.Declaration(

649

"table_source",

650

acero.TableSourceNodeOptions(table1)

651

)

652

653

source2 = acero.Declaration(

654

"table_source",

655

acero.TableSourceNodeOptions(table2)

656

)

657

658

# Filter declaration

659

filter_decl = acero.Declaration(

660

"filter",

661

acero.FilterNodeOptions(pc.greater(pc.field('id'), pc.scalar(2))),

662

inputs=[source1]

663

)

664

665

# Projection declaration

666

project_decl = acero.Declaration(

667

"project",

668

acero.ProjectNodeOptions([

669

pc.field('id'),

670

pc.field('name'),

671

pc.field('dept_id')

672

]),

673

inputs=[filter_decl]

674

)

675

676

# Join declaration

677

join_decl = acero.Declaration(

678

"hashjoin",

679

acero.HashJoinNodeOptions(

680

join_type="inner",

681

left_keys=[pc.field('dept_id')],

682

right_keys=[pc.field('dept_id')]

683

),

684

inputs=[project_decl, source2]

685

)

686

687

print("Created execution plan with filter, projection, and join")

688

print("Note: Actual execution requires Acero runtime")

689

690

# Example of aggregation node

691

agg_decl = acero.Declaration(

692

"aggregate",

693

acero.AggregateNodeOptions(

694

aggregates=[

695

("count", pc.field('id')),

696

("mean", pc.field('id'))

697

],

698

keys=[pc.field('dept_name')]

699

),

700

inputs=[join_decl]

701

)

702

703

print("Added aggregation node to execution plan")

704

```

705

706

### Data Interchange Protocol

707

708

```python

709

import pyarrow as pa

710

import pyarrow.interchange as interchange

711

712

# Create a mock dataframe-like object that implements interchange protocol

713

class MockDataFrame:

714

"""Mock dataframe implementing interchange protocol."""

715

716

def __init__(self, data):

717

self.data = data

718

self._schema = pa.schema([

719

pa.field(name, pa.infer_type(column))

720

for name, column in data.items()

721

])

722

723

def __dataframe__(self, nan_as_null=False, allow_copy=True):

724

"""Implement dataframe interchange protocol."""

725

# This is a simplified mock - real implementation would be more complex

726

return self

727

728

def select_columns(self, indices):

729

"""Select columns by indices."""

730

selected_data = {}

731

for i, (name, column) in enumerate(self.data.items()):

732

if i in indices:

733

selected_data[name] = column

734

return MockDataFrame(selected_data)

735

736

def get_chunks(self, n_chunks=None):

737

"""Get data chunks."""

738

# Simplified - return single chunk

739

return [self]

740

741

def to_arrow_table(self):

742

"""Convert to Arrow table."""

743

return pa.table(self.data, schema=self._schema)

744

745

# Create mock dataframe

746

mock_df_data = {

747

'integers': [1, 2, 3, 4, 5],

748

'floats': [1.1, 2.2, 3.3, 4.4, 5.5],

749

'strings': ['a', 'b', 'c', 'd', 'e']

750

}

751

mock_df = MockDataFrame(mock_df_data)

752

753

try:

754

# Convert using interchange protocol

755

table = interchange.from_dataframe(mock_df)

756

print(f"Converted table: {table.schema}")

757

print(f"Rows: {len(table)}")

758

759

except Exception as e:

760

print(f"Interchange conversion failed: {e}")

761

# Fallback to direct conversion

762

table = mock_df.to_arrow_table()

763

print(f"Direct conversion: {table.schema}")

764

765

# Work with real pandas DataFrame (if available)

766

try:

767

import pandas as pd

768

769

# Create pandas DataFrame

770

df = pd.DataFrame({

771

'x': range(10),

772

'y': [i ** 2 for i in range(10)],

773

'category': ['A', 'B'] * 5

774

})

775

776

# Convert using interchange protocol

777

table_from_pandas = interchange.from_dataframe(df)

778

print(f"Pandas conversion: {table_from_pandas.schema}")

779

print(f"Rows: {len(table_from_pandas)}")

780

781

except ImportError:

782

print("Pandas not available for interchange demo")

783

except Exception as e:

784

print(f"Pandas interchange failed: {e}")

785

```

786

787

### JVM Integration

788

789

```python

790

import pyarrow as pa

791

792

# JVM integration (conceptual example)

793

try:

794

# Set JVM path (platform-specific)

795

import platform

796

if platform.system() == "Linux":

797

jvm_path = "/usr/lib/jvm/default/lib/server/libjvm.so"

798

elif platform.system() == "Darwin": # macOS

799

jvm_path = "/Library/Java/JavaVirtualMachines/*/Contents/Home/lib/server/libjvm.dylib"

800

elif platform.system() == "Windows":

801

jvm_path = "C:\\Program Files\\Java\\*\\bin\\server\\jvm.dll"

802

else:

803

jvm_path = None

804

805

if jvm_path:

806

pa.set_default_jvm_path(jvm_path)

807

current_path = pa.get_default_jvm_path()

808

print(f"JVM path set to: {current_path}")

809

810

# Set JVM options

811

jvm_options = [

812

"-Xmx1g", # Maximum heap size

813

"-XX:+UseG1GC", # Use G1 garbage collector

814

"-Djava.awt.headless=true" # Headless mode

815

]

816

pa.set_default_jvm_options(jvm_options)

817

current_options = pa.get_default_jvm_options()

818

print(f"JVM options: {current_options}")

819

820

except AttributeError:

821

print("JVM integration functions not available")

822

```

823

824

### Performance Monitoring and Configuration

825

826

```python

827

import pyarrow as pa

828

import time

829

830

# System information

831

print("=== PyArrow System Information ===")

832

pa.show_versions()

833

print()

834

835

print("=== Runtime Information ===")

836

pa.show_info()

837

print()

838

839

# CPU configuration

840

original_cpu_count = pa.cpu_count()

841

print(f"Original CPU count: {original_cpu_count}")

842

843

# Set lower CPU count for testing

844

pa.set_cpu_count(max(1, original_cpu_count // 2))

845

print(f"Reduced CPU count: {pa.cpu_count()}")

846

847

# I/O thread configuration

848

original_io_threads = pa.io_thread_count()

849

print(f"Original I/O threads: {original_io_threads}")

850

851

pa.set_io_thread_count(4)

852

print(f"Set I/O threads: {pa.io_thread_count()}")

853

854

# Memory monitoring

855

initial_memory = pa.total_allocated_bytes()

856

print(f"Initial memory: {initial_memory} bytes")

857

858

# Create some data to test memory tracking

859

large_arrays = []

860

for i in range(5):

861

arr = pa.array(range(100000))

862

large_arrays.append(arr)

863

864

peak_memory = pa.total_allocated_bytes()

865

print(f"Peak memory: {peak_memory} bytes")

866

print(f"Memory increase: {peak_memory - initial_memory} bytes")

867

868

# Clear arrays

869

large_arrays.clear()

870

import gc

871

gc.collect()

872

873

final_memory = pa.total_allocated_bytes()

874

print(f"Final memory: {final_memory} bytes")

875

876

# Restore original settings

877

pa.set_cpu_count(original_cpu_count)

878

pa.set_io_thread_count(original_io_threads)

879

print(f"Restored CPU count: {pa.cpu_count()}")

880

print(f"Restored I/O threads: {pa.io_thread_count()}")

881

882

# Signal handling

883

pa.enable_signal_handlers(True)

884

print("Signal handlers enabled")

885

886

# Library information for development

887

print(f"Include directory: {pa.get_include()}")

888

print(f"Libraries: {pa.get_libraries()}")

889

print(f"Library directories: {pa.get_library_dirs()[:3]}...") # First 3

890

```

891

892

### Advanced Data Processing Pipeline

893

894

```python

895

import pyarrow as pa

896

import pyarrow.compute as pc

897

import pyarrow.dataset as ds

898

import tempfile

899

import os

900

901

def advanced_processing_pipeline():

902

"""Demonstrate advanced PyArrow features in a processing pipeline."""

903

904

# Create sample data with complex types

905

data = pa.table({

906

'id': range(1000),

907

'timestamp': pa.array([

908

f'2023-{(i % 12) + 1:02d}-{(i % 28) + 1:02d} {(i % 24):02d}:00:00'

909

for i in range(1000)

910

], type=pa.timestamp('s')),

911

'values': [

912

[float(j) for j in range(i % 5 + 1)]

913

for i in range(1000)

914

],

915

'metadata': [

916

{'source': f'sensor_{i % 10}', 'quality': (i % 100) / 100.0}

917

for i in range(1000)

918

]

919

})

920

921

with tempfile.TemporaryDirectory() as tmpdir:

922

# Write partitioned dataset

923

ds.write_dataset(

924

data,

925

tmpdir,

926

format='parquet',

927

partitioning=['id'], # Partition by id ranges

928

max_rows_per_file=100,

929

compression='snappy'

930

)

931

932

# Read as dataset

933

dataset = ds.dataset(tmpdir, format='parquet')

934

935

print(f"Dataset schema: {dataset.schema}")

936

print(f"Dataset files: {len(list(dataset.get_fragments()))}")

937

938

# Complex filtering and computation

939

# Filter by timestamp and compute statistics on nested data

940

filtered_data = dataset.to_table(

941

filter=(

942

pc.greater(pc.field('timestamp'),

943

pc.strptime(['2023-06-01'], format='%Y-%m-%d', unit='s')[0]) &

944

pc.less(pc.field('timestamp'),

945

pc.strptime(['2023-09-01'], format='%Y-%m-%d', unit='s')[0])

946

),

947

columns=['id', 'timestamp', 'values']

948

)

949

950

print(f"Filtered data: {len(filtered_data)} rows")

951

952

# Compute statistics on list column

953

list_lengths = pc.list_size(filtered_data['values'])

954

avg_list_length = pc.mean(list_lengths)

955

956

print(f"Average list length: {avg_list_length}")

957

958

# Flatten list column and compute aggregate

959

flattened_values = pc.list_flatten(filtered_data['values'])

960

total_sum = pc.sum(flattened_values)

961

962

print(f"Sum of all flattened values: {total_sum}")

963

964

return filtered_data

965

966

# Run advanced pipeline

967

try:

968

result = advanced_processing_pipeline()

969

print(f"Pipeline completed successfully: {len(result)} rows processed")

970

except Exception as e:

971

print(f"Pipeline error: {e}")

972

```