or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md

vectorized-processing.mddocs/

0

# Vectorized Processing APIs

1

2

The Vectorized Processing APIs in Apache Spark Catalyst enable high-performance columnar data processing. These APIs support efficient batch operations, reduced memory overhead, and optimized CPU utilization through vectorized execution and columnar storage formats like Apache Arrow.

3

4

## Core Vectorized Interfaces

5

6

### ColumnVector

7

8

Base class for columnar data representation:

9

10

```java { .api }

11

package org.apache.spark.sql.vectorized;

12

13

public abstract class ColumnVector implements AutoCloseable {

14

/**

15

* Data type of this column

16

*/

17

public abstract DataType dataType();

18

19

/**

20

* Number of null values in this column

21

*/

22

public abstract int numNulls();

23

24

/**

25

* Whether this column has any null values

26

*/

27

public abstract boolean hasNull();

28

29

/**

30

* Check if value at given row is null

31

*/

32

public abstract boolean isNullAt(int rowId);

33

34

/**

35

* Clean up resources

36

*/

37

@Override

38

public abstract void close();

39

40

// Type-specific getters

41

public boolean getBoolean(int rowId) { ... }

42

public byte getByte(int rowId) { ... }

43

public short getShort(int rowId) { ... }

44

public int getInt(int rowId) { ... }

45

public long getLong(int rowId) { ... }

46

public float getFloat(int rowId) { ... }

47

public double getDouble(int rowId) { ... }

48

public UTF8String getUTF8String(int rowId) { ... }

49

public byte[] getBinary(int rowId) { ... }

50

public Decimal getDecimal(int rowId, int precision, int scale) { ... }

51

52

// Complex type getters

53

public ColumnVector getChild(int ordinal) { ... }

54

public int getArrayLength(int rowId) { ... }

55

public int getArrayOffset(int rowId) { ... }

56

}

57

```

58

59

### ColumnarBatch

60

61

Collection of ColumnVector objects representing a batch of data:

62

63

```java { .api }

64

public final class ColumnarBatch implements AutoCloseable {

65

/**

66

* Number of columns in this batch

67

*/

68

public int numCols();

69

70

/**

71

* Number of rows in this batch

72

*/

73

public int numRows();

74

75

/**

76

* Set number of rows (for dynamically sized batches)

77

*/

78

public void setNumRows(int numRows);

79

80

/**

81

* Get column vector at given ordinal

82

*/

83

public ColumnVector column(int ordinal);

84

85

/**

86

* Get row at given index

87

*/

88

public ColumnarBatchRow getRow(int rowId);

89

90

/**

91

* Iterator over rows in this batch

92

*/

93

public Iterator<InternalRow> rowIterator();

94

95

@Override

96

public void close();

97

}

98

```

99

100

### ArrowColumnVector

101

102

ColumnVector implementation backed by Apache Arrow:

103

104

```java { .api }

105

public final class ArrowColumnVector extends ColumnVector {

106

/**

107

* Create ArrowColumnVector from Arrow ValueVector

108

*/

109

public ArrowColumnVector(ValueVector vector);

110

111

// Implements all ColumnVector methods with Arrow-optimized access

112

}

113

```

114

115

## Implementing Vectorized Data Sources

116

117

### Vectorized Partition Reader

118

119

```java

120

public class MyVectorizedPartitionReader implements PartitionReader<ColumnarBatch> {

121

private final StructType schema;

122

private final String dataPath;

123

private final int batchSize;

124

private ColumnVector[] columns;

125

private int currentBatchRows;

126

private boolean hasNextBatch = true;

127

128

public MyVectorizedPartitionReader(StructType schema, String dataPath, int batchSize) {

129

this.schema = schema;

130

this.dataPath = dataPath;

131

this.batchSize = batchSize;

132

this.columns = createColumnVectors(schema, batchSize);

133

}

134

135

@Override

136

public boolean next() throws IOException {

137

if (!hasNextBatch) {

138

return false;

139

}

140

141

// Load next batch of data into column vectors

142

currentBatchRows = loadNextBatch();

143

hasNextBatch = currentBatchRows > 0;

144

145

return hasNextBatch;

146

}

147

148

@Override

149

public ColumnarBatch get() {

150

ColumnarBatch batch = new ColumnarBatch(columns, currentBatchRows);

151

return batch;

152

}

153

154

@Override

155

public void close() throws IOException {

156

if (columns != null) {

157

for (ColumnVector column : columns) {

158

column.close();

159

}

160

}

161

}

162

163

private ColumnVector[] createColumnVectors(StructType schema, int capacity) {

164

ColumnVector[] vectors = new ColumnVector[schema.length()];

165

for (int i = 0; i < schema.length(); i++) {

166

StructField field = schema.fields()[i];

167

vectors[i] = createColumnVector(field.dataType(), capacity);

168

}

169

return vectors;

170

}

171

172

private ColumnVector createColumnVector(DataType dataType, int capacity) {

173

if (dataType instanceof IntegerType) {

174

return new OnHeapColumnVector(capacity, IntegerType);

175

} else if (dataType instanceof LongType) {

176

return new OnHeapColumnVector(capacity, LongType);

177

} else if (dataType instanceof StringType) {

178

return new OnHeapColumnVector(capacity, StringType);

179

}

180

// Handle other data types...

181

throw new UnsupportedOperationException("Unsupported type: " + dataType);

182

}

183

184

private int loadNextBatch() throws IOException {

185

// Implementation-specific batch loading

186

// This would typically:

187

// 1. Read data from external source

188

// 2. Populate column vectors efficiently

189

// 3. Return number of rows loaded

190

return loadDataIntoVectors(columns);

191

}

192

}

193

```

194

195

### Custom ColumnVector Implementation

196

197

```java

198

public class MyOnHeapColumnVector extends ColumnVector {

199

private final DataType type;

200

private final int capacity;

201

private int numRows;

202

203

// Storage arrays for different types

204

private boolean[] booleanData;

205

private int[] intData;

206

private long[] longData;

207

private double[] doubleData;

208

private byte[][] binaryData;

209

210

// Null tracking

211

private boolean[] nulls;

212

private int numNulls;

213

214

public MyOnHeapColumnVector(int capacity, DataType type) {

215

this.capacity = capacity;

216

this.type = type;

217

this.nulls = new boolean[capacity];

218

219

// Allocate type-specific storage

220

if (type instanceof IntegerType) {

221

intData = new int[capacity];

222

} else if (type instanceof LongType) {

223

longData = new long[capacity];

224

} else if (type instanceof DoubleType) {

225

doubleData = new double[capacity];

226

} else if (type instanceof BooleanType) {

227

booleanData = new boolean[capacity];

228

} else if (type instanceof StringType || type instanceof BinaryType) {

229

binaryData = new byte[capacity][];

230

}

231

}

232

233

@Override

234

public DataType dataType() {

235

return type;

236

}

237

238

@Override

239

public int numNulls() {

240

return numNulls;

241

}

242

243

@Override

244

public boolean hasNull() {

245

return numNulls > 0;

246

}

247

248

@Override

249

public boolean isNullAt(int rowId) {

250

return nulls[rowId];

251

}

252

253

@Override

254

public int getInt(int rowId) {

255

if (nulls[rowId]) return 0;

256

return intData[rowId];

257

}

258

259

@Override

260

public long getLong(int rowId) {

261

if (nulls[rowId]) return 0L;

262

return longData[rowId];

263

}

264

265

@Override

266

public double getDouble(int rowId) {

267

if (nulls[rowId]) return 0.0;

268

return doubleData[rowId];

269

}

270

271

@Override

272

public boolean getBoolean(int rowId) {

273

if (nulls[rowId]) return false;

274

return booleanData[rowId];

275

}

276

277

@Override

278

public byte[] getBinary(int rowId) {

279

if (nulls[rowId]) return null;

280

return binaryData[rowId];

281

}

282

283

// Write methods for populating the vector

284

public void putInt(int rowId, int value) {

285

intData[rowId] = value;

286

nulls[rowId] = false;

287

}

288

289

public void putLong(int rowId, long value) {

290

longData[rowId] = value;

291

nulls[rowId] = false;

292

}

293

294

public void putDouble(int rowId, double value) {

295

doubleData[rowId] = value;

296

nulls[rowId] = false;

297

}

298

299

public void putNull(int rowId) {

300

nulls[rowId] = true;

301

numNulls++;

302

}

303

304

public void setNumRows(int numRows) {

305

this.numRows = numRows;

306

}

307

308

@Override

309

public void close() {

310

// Clean up arrays

311

booleanData = null;

312

intData = null;

313

longData = null;

314

doubleData = null;

315

binaryData = null;

316

nulls = null;

317

}

318

}

319

```

320

321

## Arrow Integration

322

323

### Arrow-Based Vectorized Reader

324

325

```java

326

public class ArrowVectorizedReader implements PartitionReader<ColumnarBatch> {

327

private final VectorSchemaRoot root;

328

private final ArrowFileReader arrowReader;

329

private final ColumnVector[] columns;

330

private boolean hasNext = true;

331

332

public ArrowVectorizedReader(String arrowFilePath, StructType schema)

333

throws IOException {

334

FileInputStream fis = new FileInputStream(arrowFilePath);

335

this.arrowReader = new ArrowFileReader(

336

new SeekableReadChannel(fis.getChannel()),

337

new RootAllocator()

338

);

339

340

this.root = arrowReader.getVectorSchemaRoot();

341

this.columns = createArrowColumnVectors();

342

}

343

344

@Override

345

public boolean next() throws IOException {

346

if (!hasNext) {

347

return false;

348

}

349

350

hasNext = arrowReader.loadNextBatch();

351

return hasNext;

352

}

353

354

@Override

355

public ColumnarBatch get() {

356

int numRows = root.getRowCount();

357

return new ColumnarBatch(columns, numRows);

358

}

359

360

@Override

361

public void close() throws IOException {

362

root.close();

363

arrowReader.close();

364

}

365

366

private ColumnVector[] createArrowColumnVectors() {

367

List<FieldVector> vectors = root.getFieldVectors();

368

ColumnVector[] columnVectors = new ColumnVector[vectors.size()];

369

370

for (int i = 0; i < vectors.size(); i++) {

371

columnVectors[i] = new ArrowColumnVector(vectors.get(i));

372

}

373

374

return columnVectors;

375

}

376

}

377

```

378

379

### Arrow Data Conversion

380

381

```java

382

public class ArrowDataConverter {

383

private final BufferAllocator allocator;

384

385

public ArrowDataConverter() {

386

this.allocator = new RootAllocator();

387

}

388

389

public VectorSchemaRoot convertToArrow(ColumnarBatch batch, StructType schema) {

390

List<Field> fields = new ArrayList<>();

391

List<FieldVector> vectors = new ArrayList<>();

392

393

for (int i = 0; i < schema.length(); i++) {

394

StructField field = schema.fields()[i];

395

ColumnVector sparkVector = batch.column(i);

396

397

Field arrowField = convertField(field);

398

FieldVector arrowVector = convertVector(sparkVector, arrowField);

399

400

fields.add(arrowField);

401

vectors.add(arrowVector);

402

}

403

404

Schema arrowSchema = new Schema(fields);

405

VectorSchemaRoot root = new VectorSchemaRoot(arrowSchema, vectors);

406

root.setRowCount(batch.numRows());

407

408

return root;

409

}

410

411

private Field convertField(StructField sparkField) {

412

ArrowType arrowType = convertDataType(sparkField.dataType());

413

return new Field(sparkField.name(),

414

new FieldType(sparkField.nullable(), arrowType, null),

415

null);

416

}

417

418

private ArrowType convertDataType(DataType sparkType) {

419

if (sparkType instanceof IntegerType) {

420

return new ArrowType.Int(32, true);

421

} else if (sparkType instanceof LongType) {

422

return new ArrowType.Int(64, true);

423

} else if (sparkType instanceof DoubleType) {

424

return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);

425

} else if (sparkType instanceof StringType) {

426

return new ArrowType.Utf8();

427

}

428

throw new UnsupportedOperationException("Unsupported type: " + sparkType);

429

}

430

431

private FieldVector convertVector(ColumnVector sparkVector, Field field) {

432

FieldVector arrowVector = field.createVector(allocator);

433

arrowVector.allocateNew();

434

435

int numRows = sparkVector.numRows();

436

437

if (sparkVector.dataType() instanceof IntegerType) {

438

IntVector intVector = (IntVector) arrowVector;

439

for (int i = 0; i < numRows; i++) {

440

if (sparkVector.isNullAt(i)) {

441

intVector.setNull(i);

442

} else {

443

intVector.set(i, sparkVector.getInt(i));

444

}

445

}

446

}

447

// Handle other types similarly...

448

449

arrowVector.setValueCount(numRows);

450

return arrowVector;

451

}

452

}

453

```

454

455

## Vectorized Aggregations

456

457

### Vectorized Aggregate Functions

458

459

```java

460

public class VectorizedAggregateFunction {

461

462

public static long sum(ColumnVector vector) {

463

long result = 0;

464

int numRows = vector.numRows();

465

466

if (vector.dataType() instanceof IntegerType) {

467

for (int i = 0; i < numRows; i++) {

468

if (!vector.isNullAt(i)) {

469

result += vector.getInt(i);

470

}

471

}

472

} else if (vector.dataType() instanceof LongType) {

473

for (int i = 0; i < numRows; i++) {

474

if (!vector.isNullAt(i)) {

475

result += vector.getLong(i);

476

}

477

}

478

}

479

480

return result;

481

}

482

483

public static double average(ColumnVector vector) {

484

long sum = 0;

485

int count = 0;

486

int numRows = vector.numRows();

487

488

for (int i = 0; i < numRows; i++) {

489

if (!vector.isNullAt(i)) {

490

if (vector.dataType() instanceof IntegerType) {

491

sum += vector.getInt(i);

492

} else if (vector.dataType() instanceof LongType) {

493

sum += vector.getLong(i);

494

} else if (vector.dataType() instanceof DoubleType) {

495

sum += vector.getDouble(i);

496

}

497

count++;

498

}

499

}

500

501

return count > 0 ? (double) sum / count : 0.0;

502

}

503

504

public static Object min(ColumnVector vector) {

505

Object min = null;

506

int numRows = vector.numRows();

507

508

for (int i = 0; i < numRows; i++) {

509

if (!vector.isNullAt(i)) {

510

Object value = getValue(vector, i);

511

if (min == null || compareValues(value, min, vector.dataType()) < 0) {

512

min = value;

513

}

514

}

515

}

516

517

return min;

518

}

519

520

public static Object max(ColumnVector vector) {

521

Object max = null;

522

int numRows = vector.numRows();

523

524

for (int i = 0; i < numRows; i++) {

525

if (!vector.isNullAt(i)) {

526

Object value = getValue(vector, i);

527

if (max == null || compareValues(value, max, vector.dataType()) > 0) {

528

max = value;

529

}

530

}

531

}

532

533

return max;

534

}

535

536

private static Object getValue(ColumnVector vector, int rowId) {

537

DataType type = vector.dataType();

538

if (type instanceof IntegerType) {

539

return vector.getInt(rowId);

540

} else if (type instanceof LongType) {

541

return vector.getLong(rowId);

542

} else if (type instanceof DoubleType) {

543

return vector.getDouble(rowId);

544

} else if (type instanceof StringType) {

545

return vector.getUTF8String(rowId);

546

}

547

return null;

548

}

549

550

@SuppressWarnings("unchecked")

551

private static int compareValues(Object v1, Object v2, DataType dataType) {

552

if (v1 instanceof Comparable && v2 instanceof Comparable) {

553

return ((Comparable<Object>) v1).compareTo(v2);

554

}

555

return 0;

556

}

557

}

558

```

559

560

### Vectorized Filter Operations

561

562

```java

563

public class VectorizedFilters {

564

565

public static boolean[] equalTo(ColumnVector vector, Object value) {

566

int numRows = vector.numRows();

567

boolean[] result = new boolean[numRows];

568

569

for (int i = 0; i < numRows; i++) {

570

if (vector.isNullAt(i)) {

571

result[i] = false;

572

} else {

573

Object vectorValue = getValue(vector, i);

574

result[i] = Objects.equals(vectorValue, value);

575

}

576

}

577

578

return result;

579

}

580

581

public static boolean[] greaterThan(ColumnVector vector, Object value) {

582

int numRows = vector.numRows();

583

boolean[] result = new boolean[numRows];

584

585

for (int i = 0; i < numRows; i++) {

586

if (vector.isNullAt(i)) {

587

result[i] = false;

588

} else {

589

Object vectorValue = getValue(vector, i);

590

result[i] = compareValues(vectorValue, value, vector.dataType()) > 0;

591

}

592

}

593

594

return result;

595

}

596

597

public static boolean[] and(boolean[] left, boolean[] right) {

598

boolean[] result = new boolean[left.length];

599

for (int i = 0; i < left.length; i++) {

600

result[i] = left[i] && right[i];

601

}

602

return result;

603

}

604

605

public static boolean[] or(boolean[] left, boolean[] right) {

606

boolean[] result = new boolean[left.length];

607

for (int i = 0; i < left.length; i++) {

608

result[i] = left[i] || right[i];

609

}

610

return result;

611

}

612

613

public static ColumnarBatch filter(ColumnarBatch batch, boolean[] mask) {

614

int selectedRows = 0;

615

for (boolean selected : mask) {

616

if (selected) selectedRows++;

617

}

618

619

if (selectedRows == 0) {

620

return new ColumnarBatch(new ColumnVector[0], 0);

621

}

622

623

ColumnVector[] filteredColumns = new ColumnVector[batch.numCols()];

624

for (int colIdx = 0; colIdx < batch.numCols(); colIdx++) {

625

filteredColumns[colIdx] = filterColumn(batch.column(colIdx), mask, selectedRows);

626

}

627

628

return new ColumnarBatch(filteredColumns, selectedRows);

629

}

630

631

private static ColumnVector filterColumn(ColumnVector source, boolean[] mask, int resultRows) {

632

DataType dataType = source.dataType();

633

ColumnVector result = createColumnVector(dataType, resultRows);

634

635

int destIdx = 0;

636

for (int srcIdx = 0; srcIdx < mask.length; srcIdx++) {

637

if (mask[srcIdx]) {

638

copyValue(source, srcIdx, result, destIdx);

639

destIdx++;

640

}

641

}

642

643

return result;

644

}

645

646

private static void copyValue(ColumnVector source, int srcIdx,

647

ColumnVector dest, int destIdx) {

648

if (source.isNullAt(srcIdx)) {

649

((MyOnHeapColumnVector) dest).putNull(destIdx);

650

} else {

651

DataType type = source.dataType();

652

if (type instanceof IntegerType) {

653

((MyOnHeapColumnVector) dest).putInt(destIdx, source.getInt(srcIdx));

654

} else if (type instanceof LongType) {

655

((MyOnHeapColumnVector) dest).putLong(destIdx, source.getLong(srcIdx));

656

} else if (type instanceof DoubleType) {

657

((MyOnHeapColumnVector) dest).putDouble(destIdx, source.getDouble(srcIdx));

658

}

659

// Handle other types...

660

}

661

}

662

}

663

```

664

665

## Memory Management and Performance

666

667

### Off-Heap Column Vector

668

669

```java

670

public class OffHeapColumnVector extends ColumnVector {

671

private final DataType dataType;

672

private final long capacity;

673

private final long dataAddress;

674

private final long nullsAddress;

675

private int numRows;

676

677

public OffHeapColumnVector(DataType dataType, int capacity) {

678

this.dataType = dataType;

679

this.capacity = capacity;

680

681

// Allocate off-heap memory

682

int typeSize = getTypeSize(dataType);

683

this.dataAddress = PlatformDependent.allocateMemory(capacity * typeSize);

684

this.nullsAddress = PlatformDependent.allocateMemory(capacity); // 1 byte per null flag

685

686

// Initialize memory

687

PlatformDependent.setMemory(dataAddress, capacity * typeSize, (byte) 0);

688

PlatformDependent.setMemory(nullsAddress, capacity, (byte) 0);

689

}

690

691

@Override

692

public DataType dataType() {

693

return dataType;

694

}

695

696

@Override

697

public boolean isNullAt(int rowId) {

698

return PlatformDependent.getByte(nullsAddress + rowId) == 1;

699

}

700

701

@Override

702

public int getInt(int rowId) {

703

if (isNullAt(rowId)) return 0;

704

return PlatformDependent.getInt(dataAddress + rowId * 4);

705

}

706

707

@Override

708

public long getLong(int rowId) {

709

if (isNullAt(rowId)) return 0L;

710

return PlatformDependent.getLong(dataAddress + rowId * 8);

711

}

712

713

public void putInt(int rowId, int value) {

714

PlatformDependent.putInt(dataAddress + rowId * 4, value);

715

PlatformDependent.putByte(nullsAddress + rowId, (byte) 0);

716

}

717

718

public void putLong(int rowId, long value) {

719

PlatformDependent.putLong(dataAddress + rowId * 8, value);

720

PlatformDependent.putByte(nullsAddress + rowId, (byte) 0);

721

}

722

723

public void putNull(int rowId) {

724

PlatformDependent.putByte(nullsAddress + rowId, (byte) 1);

725

}

726

727

@Override

728

public void close() {

729

if (dataAddress != 0) {

730

PlatformDependent.freeMemory(dataAddress);

731

}

732

if (nullsAddress != 0) {

733

PlatformDependent.freeMemory(nullsAddress);

734

}

735

}

736

737

private int getTypeSize(DataType dataType) {

738

if (dataType instanceof IntegerType) return 4;

739

if (dataType instanceof LongType) return 8;

740

if (dataType instanceof DoubleType) return 8;

741

if (dataType instanceof FloatType) return 4;

742

return 8; // Default size

743

}

744

}

745

```

746

747

### Batch Size Optimization

748

749

```java

750

public class BatchSizeOptimizer {

751

private static final int MIN_BATCH_SIZE = 1024;

752

private static final int MAX_BATCH_SIZE = 8192;

753

private static final long TARGET_BATCH_MEMORY = 64 * 1024 * 1024; // 64MB

754

755

public static int calculateOptimalBatchSize(StructType schema) {

756

long rowSize = calculateRowSize(schema);

757

int calculatedBatchSize = (int) (TARGET_BATCH_MEMORY / rowSize);

758

759

// Clamp to reasonable bounds

760

return Math.max(MIN_BATCH_SIZE, Math.min(MAX_BATCH_SIZE, calculatedBatchSize));

761

}

762

763

private static long calculateRowSize(StructType schema) {

764

long totalSize = 0;

765

for (StructField field : schema.fields()) {

766

totalSize += getDataTypeSize(field.dataType());

767

}

768

return totalSize;

769

}

770

771

private static long getDataTypeSize(DataType dataType) {

772

if (dataType instanceof BooleanType) return 1;

773

if (dataType instanceof ByteType) return 1;

774

if (dataType instanceof ShortType) return 2;

775

if (dataType instanceof IntegerType) return 4;

776

if (dataType instanceof LongType) return 8;

777

if (dataType instanceof FloatType) return 4;

778

if (dataType instanceof DoubleType) return 8;

779

if (dataType instanceof StringType) return 20; // Average string size estimate

780

if (dataType instanceof BinaryType) return 16; // Average binary size estimate

781

return 8; // Default estimate

782

}

783

}

784

```

785

786

### Vectorized Expression Evaluation

787

788

```java

789

public class VectorizedExpressionEvaluator {

790

791

public static ColumnVector evaluateExpression(Expression expr, ColumnarBatch input) {

792

if (expr instanceof Literal) {

793

return evaluateLiteral((Literal) expr, input.numRows());

794

} else if (expr instanceof NamedReference) {

795

return evaluateColumnReference((NamedReference) expr, input);

796

} else if (expr instanceof BinaryExpression) {

797

return evaluateBinaryExpression((BinaryExpression) expr, input);

798

}

799

800

throw new UnsupportedOperationException("Unsupported expression: " + expr);

801

}

802

803

private static ColumnVector evaluateLiteral(Literal literal, int numRows) {

804

Object value = literal.value();

805

DataType dataType = inferDataType(value);

806

ColumnVector result = createColumnVector(dataType, numRows);

807

808

// Fill all rows with the literal value

809

for (int i = 0; i < numRows; i++) {

810

setColumnValue(result, i, value);

811

}

812

813

return result;

814

}

815

816

private static ColumnVector evaluateColumnReference(NamedReference ref, ColumnarBatch input) {

817

String[] fieldNames = ref.fieldNames();

818

String columnName = fieldNames[0]; // Simplified - assume single-level reference

819

820

// Find column index by name

821

// This would require schema information in a real implementation

822

int columnIndex = findColumnIndex(columnName, input);

823

return input.column(columnIndex);

824

}

825

826

private static ColumnVector evaluateBinaryExpression(BinaryExpression expr, ColumnarBatch input) {

827

ColumnVector left = evaluateExpression(expr.left(), input);

828

ColumnVector right = evaluateExpression(expr.right(), input);

829

830

if (expr instanceof Add) {

831

return vectorizedAdd(left, right);

832

} else if (expr instanceof Subtract) {

833

return vectorizedSubtract(left, right);

834

} else if (expr instanceof Multiply) {

835

return vectorizedMultiply(left, right);

836

}

837

838

throw new UnsupportedOperationException("Unsupported binary expression: " + expr);

839

}

840

841

private static ColumnVector vectorizedAdd(ColumnVector left, ColumnVector right) {

842

int numRows = Math.min(left.numRows(), right.numRows());

843

DataType resultType = promoteTypes(left.dataType(), right.dataType());

844

ColumnVector result = createColumnVector(resultType, numRows);

845

846

for (int i = 0; i < numRows; i++) {

847

if (left.isNullAt(i) || right.isNullAt(i)) {

848

((MyOnHeapColumnVector) result).putNull(i);

849

} else {

850

if (resultType instanceof IntegerType) {

851

int sum = left.getInt(i) + right.getInt(i);

852

((MyOnHeapColumnVector) result).putInt(i, sum);

853

} else if (resultType instanceof LongType) {

854

long sum = left.getLong(i) + right.getLong(i);

855

((MyOnHeapColumnVector) result).putLong(i, sum);

856

} else if (resultType instanceof DoubleType) {

857

double sum = left.getDouble(i) + right.getDouble(i);

858

((MyOnHeapColumnVector) result).putDouble(i, sum);

859

}

860

}

861

}

862

863

return result;

864

}

865

}

866

```

867

868

## Integration with Spark SQL

869

870

### Vectorized Data Source

871

872

```java

873

public class VectorizedDataSource implements Table, SupportsRead {

874

875

@Override

876

public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {

877

return new VectorizedScanBuilder(schema, options);

878

}

879

880

@Override

881

public Set<TableCapability> capabilities() {

882

return Set.of(

883

TableCapability.BATCH_READ,

884

TableCapability.ACCEPT_ANY_SCHEMA

885

);

886

}

887

}

888

889

public class VectorizedScan implements Scan, SupportsReportStatistics {

890

891

@Override

892

public Batch toBatch() {

893

return new VectorizedBatch(schema, paths);

894

}

895

896

@Override

897

public Statistics estimateStatistics() {

898

// Provide statistics for query optimization

899

return new Statistics() {

900

@Override

901

public OptionalLong sizeInBytes() {

902

return OptionalLong.of(calculateDataSize());

903

}

904

905

@Override

906

public OptionalLong numRows() {

907

return OptionalLong.of(estimateRowCount());

908

}

909

};

910

}

911

}

912

```

913

914

The Vectorized Processing APIs provide the foundation for high-performance analytical workloads in Spark, enabling efficient processing of large datasets through columnar operations and optimized memory layouts.