or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md

metrics-api.mddocs/

0

# Apache Spark Catalyst Metrics API

1

2

The Apache Spark Catalyst Metrics API provides a comprehensive framework for custom metrics collection in data sources. This API enables data sources to define, collect, and aggregate custom metrics during query execution, providing valuable insights into data source performance and behavior.

3

4

## Core Metrics Interfaces

5

6

### CustomMetric

7

8

The base interface for defining custom metrics that aggregate task-level metrics at the driver:

9

10

```java { .api }

11

package org.apache.spark.sql.connector.metric;

12

13

public interface CustomMetric {

14

/**

15

* Returns the name of custom metric

16

*/

17

String name();

18

19

/**

20

* Returns the description of custom metric

21

*/

22

String description();

23

24

/**

25

* The initial value of this metric

26

*/

27

long initialValue = 0L;

28

29

/**

30

* Given an array of task metric values, returns aggregated final metric value

31

*/

32

String aggregateTaskMetrics(long[] taskMetrics);

33

}

34

```

35

36

### CustomTaskMetric

37

38

Task-level metric representation collected at the executor side:

39

40

```java { .api }

41

package org.apache.spark.sql.connector.metric;

42

43

public interface CustomTaskMetric {

44

/**

45

* Returns the name of custom task metric

46

*/

47

String name();

48

49

/**

50

* Returns the long value of custom task metric

51

*/

52

long value();

53

}

54

```

55

56

## Built-in Metric Implementations

57

58

### CustomSumMetric

59

60

Abstract base class for metrics that sum up values across tasks:

61

62

```java { .api }

63

package org.apache.spark.sql.connector.metric;

64

65

public abstract class CustomSumMetric implements CustomMetric {

66

@Override

67

public String aggregateTaskMetrics(long[] taskMetrics) {

68

long sum = 0L;

69

for (long taskMetric : taskMetrics) {

70

sum += taskMetric;

71

}

72

return String.valueOf(sum);

73

}

74

}

75

```

76

77

### CustomAvgMetric

78

79

Abstract base class for metrics that compute averages across tasks:

80

81

```java { .api }

82

package org.apache.spark.sql.connector.metric;

83

import java.text.DecimalFormat;

84

85

public abstract class CustomAvgMetric implements CustomMetric {

86

@Override

87

public String aggregateTaskMetrics(long[] taskMetrics) {

88

if (taskMetrics.length > 0) {

89

long sum = 0L;

90

for (long taskMetric : taskMetrics) {

91

sum += taskMetric;

92

}

93

double average = ((double) sum) / taskMetrics.length;

94

return new DecimalFormat("#0.000").format(average);

95

} else {

96

return "0";

97

}

98

}

99

}

100

```

101

102

## Streaming Metrics Interfaces

103

104

### ReportsSourceMetrics

105

106

Interface for streaming data sources to report metrics:

107

108

```java { .api }

109

package org.apache.spark.sql.connector.read.streaming;

110

import java.util.Map;

111

import java.util.Optional;

112

113

public interface ReportsSourceMetrics extends SparkDataStream {

114

/**

115

* Returns the metrics reported by the streaming source with respect to

116

* the latest consumed offset

117

*/

118

Map<String, String> metrics(Optional<Offset> latestConsumedOffset);

119

}

120

```

121

122

### ReportsSinkMetrics

123

124

Interface for streaming sinks to report metrics:

125

126

```java { .api }

127

package org.apache.spark.sql.connector.read.streaming;

128

import java.util.Map;

129

130

public interface ReportsSinkMetrics {

131

/**

132

* Returns the metrics reported by the sink for this micro-batch

133

*/

134

Map<String, String> metrics();

135

}

136

```

137

138

## Integration with Data Source APIs

139

140

### Scan Interface Integration

141

142

Data sources integrate metrics through the Scan interface:

143

144

```java { .api }

145

package org.apache.spark.sql.connector.read;

146

147

public interface Scan {

148

/**

149

* Returns custom metrics that this scan supports

150

*/

151

default CustomMetric[] supportedCustomMetrics() {

152

return new CustomMetric[]{};

153

}

154

155

/**

156

* Returns custom task metrics reported from driver side.

157

* Note that these metrics must be included in the supported custom metrics

158

* reported by supportedCustomMetrics.

159

*/

160

default CustomTaskMetric[] reportDriverMetrics() {

161

return new CustomTaskMetric[]{};

162

}

163

}

164

```

165

166

### PartitionReader Integration

167

168

Partition readers report task-level metrics:

169

170

```java { .api }

171

package org.apache.spark.sql.connector.read;

172

173

public interface PartitionReader<T> extends Closeable {

174

/**

175

* Returns current custom task metric values

176

*/

177

default CustomTaskMetric[] currentMetricsValues() {

178

CustomTaskMetric[] NO_METRICS = {};

179

return NO_METRICS;

180

}

181

}

182

```

183

184

### DataWriter Integration

185

186

Data writers can also report task-level metrics:

187

188

```java { .api }

189

package org.apache.spark.sql.connector.write;

190

191

public interface DataWriter<T> {

192

/**

193

* Returns current custom task metric values

194

*/

195

default CustomTaskMetric[] currentMetricsValues() {

196

return new CustomTaskMetric[]{};

197

}

198

}

199

```

200

201

## Complete Implementation Examples

202

203

### Custom Sum Metric Implementation

204

205

```java

206

import org.apache.spark.sql.connector.metric.CustomSumMetric;

207

208

public class RecordsProcessedMetric extends CustomSumMetric {

209

@Override

210

public String name() {

211

return "recordsProcessed";

212

}

213

214

@Override

215

public String description() {

216

return "Total number of records processed across all tasks";

217

}

218

}

219

```

220

221

### Custom Average Metric Implementation

222

223

```java

224

import org.apache.spark.sql.connector.metric.CustomAvgMetric;

225

226

public class ProcessingTimeAvgMetric extends CustomAvgMetric {

227

@Override

228

public String name() {

229

return "avgProcessingTime";

230

}

231

232

@Override

233

public String description() {

234

return "Average processing time per task in milliseconds";

235

}

236

}

237

```

238

239

### Task Metric Implementation

240

241

```java

242

import org.apache.spark.sql.connector.metric.CustomTaskMetric;

243

244

public class TaskRecordsProcessed implements CustomTaskMetric {

245

private final long recordCount;

246

247

public TaskRecordsProcessed(long recordCount) {

248

this.recordCount = recordCount;

249

}

250

251

@Override

252

public String name() {

253

return "recordsProcessed";

254

}

255

256

@Override

257

public long value() {

258

return recordCount;

259

}

260

}

261

```

262

263

### Complete Data Source with Metrics

264

265

```java

266

import org.apache.spark.sql.connector.read.*;

267

import org.apache.spark.sql.connector.metric.*;

268

import org.apache.spark.sql.types.StructType;

269

import java.io.IOException;

270

271

public class MyDataSource implements Table, SupportsRead {

272

private final StructType schema;

273

274

public MyDataSource(StructType schema) {

275

this.schema = schema;

276

}

277

278

@Override

279

public String name() {

280

return "my-data-source";

281

}

282

283

@Override

284

public StructType schema() {

285

return schema;

286

}

287

288

@Override

289

public Set<TableCapability> capabilities() {

290

return Set.of(TableCapability.BATCH_READ);

291

}

292

293

@Override

294

public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {

295

return new MyScanBuilder(schema);

296

}

297

298

private static class MyScanBuilder implements ScanBuilder {

299

private final StructType schema;

300

301

public MyScanBuilder(StructType schema) {

302

this.schema = schema;

303

}

304

305

@Override

306

public Scan build() {

307

return new MyScan(schema);

308

}

309

}

310

311

private static class MyScan implements Scan {

312

private final StructType schema;

313

314

public MyScan(StructType schema) {

315

this.schema = schema;

316

}

317

318

@Override

319

public StructType readSchema() {

320

return schema;

321

}

322

323

@Override

324

public CustomMetric[] supportedCustomMetrics() {

325

return new CustomMetric[]{

326

new RecordsProcessedMetric(),

327

new ProcessingTimeAvgMetric()

328

};

329

}

330

331

@Override

332

public Batch toBatch() {

333

return new MyBatch(schema);

334

}

335

}

336

337

private static class MyBatch implements Batch {

338

private final StructType schema;

339

340

public MyBatch(StructType schema) {

341

this.schema = schema;

342

}

343

344

@Override

345

public InputPartition[] planInputPartitions() {

346

return new InputPartition[]{new MyInputPartition()};

347

}

348

349

@Override

350

public PartitionReaderFactory createReaderFactory() {

351

return new MyReaderFactory(schema);

352

}

353

}

354

355

private static class MyReaderFactory implements PartitionReaderFactory {

356

private final StructType schema;

357

358

public MyReaderFactory(StructType schema) {

359

this.schema = schema;

360

}

361

362

@Override

363

public PartitionReader<InternalRow> createReader(InputPartition partition) {

364

return new MyPartitionReader();

365

}

366

}

367

368

private static class MyPartitionReader implements PartitionReader<InternalRow> {

369

private long recordsProcessed = 0;

370

private long startTime = System.currentTimeMillis();

371

372

@Override

373

public boolean next() throws IOException {

374

// Read next record logic

375

recordsProcessed++;

376

return hasMoreRecords();

377

}

378

379

@Override

380

public InternalRow get() {

381

// Return current record

382

return getCurrentRecord();

383

}

384

385

@Override

386

public CustomTaskMetric[] currentMetricsValues() {

387

long processingTime = System.currentTimeMillis() - startTime;

388

return new CustomTaskMetric[]{

389

new TaskRecordsProcessed(recordsProcessed),

390

new TaskProcessingTime(processingTime)

391

};

392

}

393

394

@Override

395

public void close() throws IOException {

396

// Cleanup resources

397

}

398

399

private boolean hasMoreRecords() {

400

// Implementation specific logic

401

return false;

402

}

403

404

private InternalRow getCurrentRecord() {

405

// Implementation specific logic

406

return null;

407

}

408

}

409

410

private static class TaskProcessingTime implements CustomTaskMetric {

411

private final long processingTime;

412

413

public TaskProcessingTime(long processingTime) {

414

this.processingTime = processingTime;

415

}

416

417

@Override

418

public String name() {

419

return "avgProcessingTime";

420

}

421

422

@Override

423

public long value() {

424

return processingTime;

425

}

426

}

427

428

private static class MyInputPartition implements InputPartition {

429

// Partition implementation

430

}

431

}

432

```

433

434

### Streaming Data Source with Metrics

435

436

```java

437

import org.apache.spark.sql.connector.read.streaming.*;

438

import org.apache.spark.sql.connector.metric.*;

439

import java.util.Map;

440

import java.util.HashMap;

441

import java.util.Optional;

442

443

public class MyStreamingSource implements SparkDataStream, ReportsSourceMetrics {

444

private long totalRecordsRead = 0;

445

private long lastBatchRecords = 0;

446

447

@Override

448

public Map<String, String> metrics(Optional<Offset> latestConsumedOffset) {

449

Map<String, String> metrics = new HashMap<>();

450

metrics.put("totalRecordsRead", String.valueOf(totalRecordsRead));

451

metrics.put("lastBatchRecords", String.valueOf(lastBatchRecords));

452

metrics.put("avgRecordsPerBatch", calculateAverageRecordsPerBatch());

453

return metrics;

454

}

455

456

private String calculateAverageRecordsPerBatch() {

457

// Calculate average based on historical data

458

return "1000";

459

}

460

461

// Other streaming methods...

462

}

463

```

464

465

## Special Metric Names

466

467

The metrics API recognizes certain special metric names that integrate with Spark's built-in task metrics:

468

469

- **`bytesWritten`**: Updates the corresponding task metric for bytes written

470

- **`recordsWritten`**: Updates the corresponding task metric for records written

471

472

When data sources define custom metrics with these names, the values are automatically propagated to Spark's internal task metrics system.

473

474

## Key Features

475

476

### Automatic Aggregation

477

- Spark automatically collects task metrics from all partitions

478

- Driver-side aggregation using the `aggregateTaskMetrics` method

479

- Built-in support for sum and average aggregations

480

481

### Reflection-Based Instantiation

482

- Custom metric classes must have a no-argument constructor

483

- Spark uses reflection to instantiate metric classes during aggregation

484

- Thread-safe aggregation across distributed tasks

485

486

### UI Integration

487

- Final aggregated metrics appear in the Spark UI

488

- Integrated with data source scan operators

489

- Streaming metrics available per micro-batch

490

491

### Extensibility

492

- Easy to create custom aggregation logic

493

- Support for complex metric calculations

494

- Integration with both batch and streaming workloads

495

496

## Import Statements

497

498

To use the Metrics API in your data source implementation, include these imports:

499

500

```java

501

// Core metric interfaces

502

import org.apache.spark.sql.connector.metric.CustomMetric;

503

import org.apache.spark.sql.connector.metric.CustomTaskMetric;

504

505

// Built-in metric implementations

506

import org.apache.spark.sql.connector.metric.CustomSumMetric;

507

import org.apache.spark.sql.connector.metric.CustomAvgMetric;

508

509

// Streaming metrics

510

import org.apache.spark.sql.connector.read.streaming.ReportsSourceMetrics;

511

import org.apache.spark.sql.connector.read.streaming.ReportsSinkMetrics;

512

513

// Reader integration

514

import org.apache.spark.sql.connector.read.PartitionReader;

515

import org.apache.spark.sql.connector.read.Scan;

516

517

// Writer integration

518

import org.apache.spark.sql.connector.write.DataWriter;

519

520

// Utility imports

521

import java.util.Map;

522

import java.util.Optional;

523

import java.text.DecimalFormat;

524

```