0
# Apache Spark Catalyst Metrics API
1
2
The Apache Spark Catalyst Metrics API provides a comprehensive framework for custom metrics collection in data sources. This API enables data sources to define, collect, and aggregate custom metrics during query execution, providing valuable insights into data source performance and behavior.
3
4
## Core Metrics Interfaces
5
6
### CustomMetric
7
8
The base interface for defining custom metrics that aggregate task-level metrics at the driver:
9
10
```java { .api }
11
package org.apache.spark.sql.connector.metric;
12
13
public interface CustomMetric {
14
/**
15
* Returns the name of custom metric
16
*/
17
String name();
18
19
/**
20
* Returns the description of custom metric
21
*/
22
String description();
23
24
/**
25
* The initial value of this metric
26
*/
27
long initialValue = 0L;
28
29
/**
30
* Given an array of task metric values, returns aggregated final metric value
31
*/
32
String aggregateTaskMetrics(long[] taskMetrics);
33
}
34
```
35
36
### CustomTaskMetric
37
38
Task-level metric representation collected at the executor side:
39
40
```java { .api }
41
package org.apache.spark.sql.connector.metric;
42
43
public interface CustomTaskMetric {
44
/**
45
* Returns the name of custom task metric
46
*/
47
String name();
48
49
/**
50
* Returns the long value of custom task metric
51
*/
52
long value();
53
}
54
```
55
56
## Built-in Metric Implementations
57
58
### CustomSumMetric
59
60
Abstract base class for metrics that sum up values across tasks:
61
62
```java { .api }
63
package org.apache.spark.sql.connector.metric;
64
65
public abstract class CustomSumMetric implements CustomMetric {
66
@Override
67
public String aggregateTaskMetrics(long[] taskMetrics) {
68
long sum = 0L;
69
for (long taskMetric : taskMetrics) {
70
sum += taskMetric;
71
}
72
return String.valueOf(sum);
73
}
74
}
75
```
76
77
### CustomAvgMetric
78
79
Abstract base class for metrics that compute averages across tasks:
80
81
```java { .api }
82
package org.apache.spark.sql.connector.metric;
83
import java.text.DecimalFormat;
84
85
public abstract class CustomAvgMetric implements CustomMetric {
86
@Override
87
public String aggregateTaskMetrics(long[] taskMetrics) {
88
if (taskMetrics.length > 0) {
89
long sum = 0L;
90
for (long taskMetric : taskMetrics) {
91
sum += taskMetric;
92
}
93
double average = ((double) sum) / taskMetrics.length;
94
return new DecimalFormat("#0.000").format(average);
95
} else {
96
return "0";
97
}
98
}
99
}
100
```
101
102
## Streaming Metrics Interfaces
103
104
### ReportsSourceMetrics
105
106
Interface for streaming data sources to report metrics:
107
108
```java { .api }
109
package org.apache.spark.sql.connector.read.streaming;
110
import java.util.Map;
111
import java.util.Optional;
112
113
public interface ReportsSourceMetrics extends SparkDataStream {
114
/**
115
* Returns the metrics reported by the streaming source with respect to
116
* the latest consumed offset
117
*/
118
Map<String, String> metrics(Optional<Offset> latestConsumedOffset);
119
}
120
```
121
122
### ReportsSinkMetrics
123
124
Interface for streaming sinks to report metrics:
125
126
```java { .api }
127
package org.apache.spark.sql.connector.read.streaming;
128
import java.util.Map;
129
130
public interface ReportsSinkMetrics {
131
/**
132
* Returns the metrics reported by the sink for this micro-batch
133
*/
134
Map<String, String> metrics();
135
}
136
```
137
138
## Integration with Data Source APIs
139
140
### Scan Interface Integration
141
142
Data sources integrate metrics through the Scan interface:
143
144
```java { .api }
145
package org.apache.spark.sql.connector.read;
146
147
public interface Scan {
148
/**
149
* Returns custom metrics that this scan supports
150
*/
151
default CustomMetric[] supportedCustomMetrics() {
152
return new CustomMetric[]{};
153
}
154
155
/**
156
* Returns custom task metrics reported from driver side.
157
* Note that these metrics must be included in the supported custom metrics
158
* reported by supportedCustomMetrics.
159
*/
160
default CustomTaskMetric[] reportDriverMetrics() {
161
return new CustomTaskMetric[]{};
162
}
163
}
164
```
165
166
### PartitionReader Integration
167
168
Partition readers report task-level metrics:
169
170
```java { .api }
171
package org.apache.spark.sql.connector.read;
172
173
public interface PartitionReader<T> extends Closeable {
174
/**
175
* Returns current custom task metric values
176
*/
177
default CustomTaskMetric[] currentMetricsValues() {
178
CustomTaskMetric[] NO_METRICS = {};
179
return NO_METRICS;
180
}
181
}
182
```
183
184
### DataWriter Integration
185
186
Data writers can also report task-level metrics:
187
188
```java { .api }
189
package org.apache.spark.sql.connector.write;
190
191
public interface DataWriter<T> {
192
/**
193
* Returns current custom task metric values
194
*/
195
default CustomTaskMetric[] currentMetricsValues() {
196
return new CustomTaskMetric[]{};
197
}
198
}
199
```
200
201
## Complete Implementation Examples
202
203
### Custom Sum Metric Implementation
204
205
```java
206
import org.apache.spark.sql.connector.metric.CustomSumMetric;
207
208
public class RecordsProcessedMetric extends CustomSumMetric {
209
@Override
210
public String name() {
211
return "recordsProcessed";
212
}
213
214
@Override
215
public String description() {
216
return "Total number of records processed across all tasks";
217
}
218
}
219
```
220
221
### Custom Average Metric Implementation
222
223
```java
224
import org.apache.spark.sql.connector.metric.CustomAvgMetric;
225
226
public class ProcessingTimeAvgMetric extends CustomAvgMetric {
227
@Override
228
public String name() {
229
return "avgProcessingTime";
230
}
231
232
@Override
233
public String description() {
234
return "Average processing time per task in milliseconds";
235
}
236
}
237
```
238
239
### Task Metric Implementation
240
241
```java
242
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
243
244
public class TaskRecordsProcessed implements CustomTaskMetric {
245
private final long recordCount;
246
247
public TaskRecordsProcessed(long recordCount) {
248
this.recordCount = recordCount;
249
}
250
251
@Override
252
public String name() {
253
return "recordsProcessed";
254
}
255
256
@Override
257
public long value() {
258
return recordCount;
259
}
260
}
261
```
262
263
### Complete Data Source with Metrics
264
265
```java
266
import org.apache.spark.sql.connector.read.*;
267
import org.apache.spark.sql.connector.metric.*;
268
import org.apache.spark.sql.types.StructType;
269
import java.io.IOException;
270
271
public class MyDataSource implements Table, SupportsRead {
272
private final StructType schema;
273
274
public MyDataSource(StructType schema) {
275
this.schema = schema;
276
}
277
278
@Override
279
public String name() {
280
return "my-data-source";
281
}
282
283
@Override
284
public StructType schema() {
285
return schema;
286
}
287
288
@Override
289
public Set<TableCapability> capabilities() {
290
return Set.of(TableCapability.BATCH_READ);
291
}
292
293
@Override
294
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
295
return new MyScanBuilder(schema);
296
}
297
298
private static class MyScanBuilder implements ScanBuilder {
299
private final StructType schema;
300
301
public MyScanBuilder(StructType schema) {
302
this.schema = schema;
303
}
304
305
@Override
306
public Scan build() {
307
return new MyScan(schema);
308
}
309
}
310
311
private static class MyScan implements Scan {
312
private final StructType schema;
313
314
public MyScan(StructType schema) {
315
this.schema = schema;
316
}
317
318
@Override
319
public StructType readSchema() {
320
return schema;
321
}
322
323
@Override
324
public CustomMetric[] supportedCustomMetrics() {
325
return new CustomMetric[]{
326
new RecordsProcessedMetric(),
327
new ProcessingTimeAvgMetric()
328
};
329
}
330
331
@Override
332
public Batch toBatch() {
333
return new MyBatch(schema);
334
}
335
}
336
337
private static class MyBatch implements Batch {
338
private final StructType schema;
339
340
public MyBatch(StructType schema) {
341
this.schema = schema;
342
}
343
344
@Override
345
public InputPartition[] planInputPartitions() {
346
return new InputPartition[]{new MyInputPartition()};
347
}
348
349
@Override
350
public PartitionReaderFactory createReaderFactory() {
351
return new MyReaderFactory(schema);
352
}
353
}
354
355
private static class MyReaderFactory implements PartitionReaderFactory {
356
private final StructType schema;
357
358
public MyReaderFactory(StructType schema) {
359
this.schema = schema;
360
}
361
362
@Override
363
public PartitionReader<InternalRow> createReader(InputPartition partition) {
364
return new MyPartitionReader();
365
}
366
}
367
368
private static class MyPartitionReader implements PartitionReader<InternalRow> {
369
private long recordsProcessed = 0;
370
private long startTime = System.currentTimeMillis();
371
372
@Override
373
public boolean next() throws IOException {
374
// Read next record logic
375
recordsProcessed++;
376
return hasMoreRecords();
377
}
378
379
@Override
380
public InternalRow get() {
381
// Return current record
382
return getCurrentRecord();
383
}
384
385
@Override
386
public CustomTaskMetric[] currentMetricsValues() {
387
long processingTime = System.currentTimeMillis() - startTime;
388
return new CustomTaskMetric[]{
389
new TaskRecordsProcessed(recordsProcessed),
390
new TaskProcessingTime(processingTime)
391
};
392
}
393
394
@Override
395
public void close() throws IOException {
396
// Cleanup resources
397
}
398
399
private boolean hasMoreRecords() {
400
// Implementation specific logic
401
return false;
402
}
403
404
private InternalRow getCurrentRecord() {
405
// Implementation specific logic
406
return null;
407
}
408
}
409
410
private static class TaskProcessingTime implements CustomTaskMetric {
411
private final long processingTime;
412
413
public TaskProcessingTime(long processingTime) {
414
this.processingTime = processingTime;
415
}
416
417
@Override
418
public String name() {
419
return "avgProcessingTime";
420
}
421
422
@Override
423
public long value() {
424
return processingTime;
425
}
426
}
427
428
private static class MyInputPartition implements InputPartition {
429
// Partition implementation
430
}
431
}
432
```
433
434
### Streaming Data Source with Metrics
435
436
```java
437
import org.apache.spark.sql.connector.read.streaming.*;
438
import org.apache.spark.sql.connector.metric.*;
439
import java.util.Map;
440
import java.util.HashMap;
441
import java.util.Optional;
442
443
public class MyStreamingSource implements SparkDataStream, ReportsSourceMetrics {
444
private long totalRecordsRead = 0;
445
private long lastBatchRecords = 0;
446
447
@Override
448
public Map<String, String> metrics(Optional<Offset> latestConsumedOffset) {
449
Map<String, String> metrics = new HashMap<>();
450
metrics.put("totalRecordsRead", String.valueOf(totalRecordsRead));
451
metrics.put("lastBatchRecords", String.valueOf(lastBatchRecords));
452
metrics.put("avgRecordsPerBatch", calculateAverageRecordsPerBatch());
453
return metrics;
454
}
455
456
private String calculateAverageRecordsPerBatch() {
457
// Calculate average based on historical data
458
return "1000";
459
}
460
461
// Other streaming methods...
462
}
463
```
464
465
## Special Metric Names
466
467
The metrics API recognizes certain special metric names that integrate with Spark's built-in task metrics:
468
469
- **`bytesWritten`**: Updates the corresponding task metric for bytes written
470
- **`recordsWritten`**: Updates the corresponding task metric for records written
471
472
When data sources define custom metrics with these names, the values are automatically propagated to Spark's internal task metrics system.
473
474
## Key Features
475
476
### Automatic Aggregation
477
- Spark automatically collects task metrics from all partitions
478
- Driver-side aggregation using the `aggregateTaskMetrics` method
479
- Built-in support for sum and average aggregations
480
481
### Reflection-Based Instantiation
482
- Custom metric classes must have a no-argument constructor
483
- Spark uses reflection to instantiate metric classes during aggregation
484
- Thread-safe aggregation across distributed tasks
485
486
### UI Integration
487
- Final aggregated metrics appear in the Spark UI
488
- Integrated with data source scan operators
489
- Streaming metrics available per micro-batch
490
491
### Extensibility
492
- Easy to create custom aggregation logic
493
- Support for complex metric calculations
494
- Integration with both batch and streaming workloads
495
496
## Import Statements
497
498
To use the Metrics API in your data source implementation, include these imports:
499
500
```java
501
// Core metric interfaces
502
import org.apache.spark.sql.connector.metric.CustomMetric;
503
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
504
505
// Built-in metric implementations
506
import org.apache.spark.sql.connector.metric.CustomSumMetric;
507
import org.apache.spark.sql.connector.metric.CustomAvgMetric;
508
509
// Streaming metrics
510
import org.apache.spark.sql.connector.read.streaming.ReportsSourceMetrics;
511
import org.apache.spark.sql.connector.read.streaming.ReportsSinkMetrics;
512
513
// Reader integration
514
import org.apache.spark.sql.connector.read.PartitionReader;
515
import org.apache.spark.sql.connector.read.Scan;
516
517
// Writer integration
518
import org.apache.spark.sql.connector.write.DataWriter;
519
520
// Utility imports
521
import java.util.Map;
522
import java.util.Optional;
523
import java.text.DecimalFormat;
524
```