or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-metrics.mdimplementations.mdindex.mdmetric-groups.mdreporters.mdspecialized-groups.mdtracing.md

reporters.mddocs/

0

# Reporter Framework

1

2

Pluggable system for exporting metrics to external monitoring systems. The reporter framework enables integration with various monitoring backends through configurable metric exporters that can operate in both push and pull patterns.

3

4

## Capabilities

5

6

### MetricReporter Interface

7

8

Core interface for implementing metric reporters that export metrics to external systems. Provides lifecycle management and metric notification callbacks.

9

10

```java { .api }

11

/**

12

* Metric reporters are used to export Metrics to an external backend.

13

* Metric reporters are instantiated via a MetricReporterFactory.

14

*/

15

public interface MetricReporter {

16

17

/**

18

* Configures this reporter. If the reporter was instantiated generically

19

* and hence parameter-less, this method is the place where the reporter

20

* sets its basic fields based on configuration values.

21

* This method is always called first on a newly instantiated reporter.

22

* @param config A properties object that contains all parameters set for this reporter

23

*/

24

void open(MetricConfig config);

25

26

/**

27

* Closes this reporter. Should be used to close channels, streams and release resources.

28

*/

29

void close();

30

31

/**

32

* Called when a new Metric was added.

33

* @param metric the metric that was added

34

* @param metricName the name of the metric

35

* @param group the group that contains the metric

36

*/

37

void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);

38

39

/**

40

* Called when a Metric was removed.

41

* @param metric the metric that should be removed

42

* @param metricName the name of the metric

43

* @param group the group that contains the metric

44

*/

45

void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);

46

}

47

```

48

49

**Usage Examples:**

50

51

```java

52

// Custom reporter implementation

53

public class ConsoleMetricReporter implements MetricReporter {

54

private boolean isOpen = false;

55

56

@Override

57

public void open(MetricConfig config) {

58

this.isOpen = true;

59

System.out.println("Console reporter started");

60

}

61

62

@Override

63

public void close() {

64

this.isOpen = false;

65

System.out.println("Console reporter stopped");

66

}

67

68

@Override

69

public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {

70

String identifier = group.getMetricIdentifier(metricName);

71

System.out.println("Added metric: " + identifier + " of type " + metric.getMetricType());

72

}

73

74

@Override

75

public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {

76

String identifier = group.getMetricIdentifier(metricName);

77

System.out.println("Removed metric: " + identifier);

78

}

79

}

80

81

// Handle different metric types in reporter

82

@Override

83

public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {

84

String identifier = group.getMetricIdentifier(metricName);

85

86

switch (metric.getMetricType()) {

87

case COUNTER:

88

Counter counter = (Counter) metric;

89

registerCounter(identifier, counter);

90

break;

91

case GAUGE:

92

Gauge<?> gauge = (Gauge<?>) metric;

93

registerGauge(identifier, gauge);

94

break;

95

case METER:

96

Meter meter = (Meter) metric;

97

registerMeter(identifier, meter);

98

break;

99

case HISTOGRAM:

100

Histogram histogram = (Histogram) metric;

101

registerHistogram(identifier, histogram);

102

break;

103

}

104

}

105

```

106

107

### MetricReporterFactory Interface

108

109

Factory interface for creating metric reporters, enabling plugin-based reporter loading and configuration.

110

111

```java { .api }

112

/**

113

* MetricReporter factory. Metric reporters that can be instantiated with

114

* a factory automatically qualify for being loaded as a plugin, so long as

115

* the reporter jar is self-contained and contains a

116

* META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory

117

* file containing the qualified class name of the factory.

118

*/

119

public interface MetricReporterFactory {

120

/**

121

* Creates a new metric reporter.

122

* @param properties configured properties for the reporter

123

* @return created metric reporter

124

*/

125

MetricReporter createMetricReporter(Properties properties);

126

}

127

```

128

129

**Usage Examples:**

130

131

```java

132

// Factory implementation

133

public class ConsoleReporterFactory implements MetricReporterFactory {

134

@Override

135

public MetricReporter createMetricReporter(Properties properties) {

136

return new ConsoleMetricReporter();

137

}

138

}

139

140

// Factory with configuration

141

public class ConfigurableReporterFactory implements MetricReporterFactory {

142

@Override

143

public MetricReporter createMetricReporter(Properties properties) {

144

String endpoint = properties.getProperty("endpoint", "localhost:8080");

145

int interval = Integer.parseInt(properties.getProperty("interval", "60"));

146

147

ConfigurableReporter reporter = new ConfigurableReporter();

148

reporter.setEndpoint(endpoint);

149

reporter.setReportInterval(interval);

150

151

return reporter;

152

}

153

}

154

155

// Plugin registration (in META-INF/services file)

156

// META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory

157

// com.example.ConsoleReporterFactory

158

// com.example.ConfigurableReporterFactory

159

```

160

161

### Scheduled Reporter Interface

162

163

Interface for reporters that actively send out data periodically rather than only responding to metric addition/removal events.

164

165

```java { .api }

166

/**

167

* Interface for reporters that actively send out data periodically.

168

*/

169

public interface Scheduled {

170

/**

171

* Report the current measurements. This method is called periodically

172

* by the metrics registry that uses the reporter. This method must not

173

* block for a significant amount of time, any reporter needing more time

174

* should instead run the operation asynchronously.

175

*/

176

void report();

177

}

178

```

179

180

**Usage Examples:**

181

182

```java

183

// Scheduled reporter implementation

184

public class PeriodicReporter implements MetricReporter, Scheduled {

185

private final Map<String, Metric> registeredMetrics = new ConcurrentHashMap<>();

186

private final HttpClient httpClient = HttpClient.newHttpClient();

187

private String endpoint;

188

189

@Override

190

public void open(MetricConfig config) {

191

this.endpoint = config.getString("endpoint", "http://localhost:8080/metrics");

192

}

193

194

@Override

195

public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {

196

String identifier = group.getMetricIdentifier(metricName);

197

registeredMetrics.put(identifier, metric);

198

}

199

200

@Override

201

public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {

202

String identifier = group.getMetricIdentifier(metricName);

203

registeredMetrics.remove(identifier);

204

}

205

206

// Called periodically by Flink

207

@Override

208

public void report() {

209

Map<String, Object> metrics = new HashMap<>();

210

211

for (Map.Entry<String, Metric> entry : registeredMetrics.entrySet()) {

212

String name = entry.getKey();

213

Metric metric = entry.getValue();

214

215

Object value = extractValue(metric);

216

if (value != null) {

217

metrics.put(name, value);

218

}

219

}

220

221

// Send metrics asynchronously to avoid blocking

222

CompletableFuture.runAsync(() -> sendMetrics(metrics));

223

}

224

225

private Object extractValue(Metric metric) {

226

switch (metric.getMetricType()) {

227

case COUNTER:

228

return ((Counter) metric).getCount();

229

case GAUGE:

230

return ((Gauge<?>) metric).getValue();

231

case METER:

232

Meter meter = (Meter) metric;

233

Map<String, Object> meterData = new HashMap<>();

234

meterData.put("rate", meter.getRate());

235

meterData.put("count", meter.getCount());

236

return meterData;

237

case HISTOGRAM:

238

Histogram histogram = (Histogram) metric;

239

HistogramStatistics stats = histogram.getStatistics();

240

Map<String, Object> histogramData = new HashMap<>();

241

histogramData.put("count", histogram.getCount());

242

histogramData.put("mean", stats.getMean());

243

histogramData.put("p95", stats.getQuantile(0.95));

244

histogramData.put("max", stats.getMax());

245

histogramData.put("min", stats.getMin());

246

return histogramData;

247

default:

248

return null;

249

}

250

}

251

252

@Override

253

public void close() {

254

// Clean up resources

255

}

256

}

257

```

258

259

### AbstractReporter Base Class

260

261

Base implementation providing common functionality for metric reporters.

262

263

```java { .api }

264

/**

265

* Base implementation for metric reporters.

266

*/

267

public abstract class AbstractReporter implements MetricReporter {

268

// Provides common functionality and convenience methods for reporter implementations

269

}

270

```

271

272

### Reporter Annotations

273

274

Annotations for controlling reporter instantiation behavior.

275

276

```java { .api }

277

/**

278

* Marker annotation for factory-instantiated reporters.

279

*/

280

public @interface InstantiateViaFactory {

281

}

282

283

/**

284

* Marker annotation for reflection-based instantiation.

285

*/

286

public @interface InterceptInstantiationViaReflection {

287

}

288

```

289

290

**Usage Examples:**

291

292

```java

293

// Factory-based reporter

294

@InstantiateViaFactory

295

public class MyFactoryReporter implements MetricReporter {

296

// Will be instantiated via MetricReporterFactory

297

}

298

299

// Reflection-based reporter

300

@InterceptInstantiationViaReflection

301

public class MyReflectionReporter implements MetricReporter {

302

// Will be instantiated via reflection with default constructor

303

}

304

```

305

306

### Reporter Configuration Patterns

307

308

Common patterns for configuring reporters using MetricConfig.

309

310

**Configuration Handling:**

311

312

```java

313

public class DatabaseReporter implements MetricReporter {

314

private String jdbcUrl;

315

private String username;

316

private String password;

317

private int batchSize;

318

private long flushInterval;

319

320

@Override

321

public void open(MetricConfig config) {

322

// Required configuration

323

this.jdbcUrl = config.getString("jdbc.url", null);

324

if (jdbcUrl == null) {

325

throw new IllegalArgumentException("jdbc.url is required");

326

}

327

328

// Optional configuration with defaults

329

this.username = config.getString("jdbc.username", "metrics");

330

this.password = config.getString("jdbc.password", "");

331

this.batchSize = config.getInteger("batch.size", 100);

332

this.flushInterval = config.getLong("flush.interval", 30000); // 30 seconds

333

334

// Initialize database connection

335

initializeConnection();

336

}

337

}

338

```

339

340

**Error Handling in Reporters:**

341

342

```java

343

public class RobustReporter implements MetricReporter, Scheduled {

344

private final AtomicBoolean isHealthy = new AtomicBoolean(true);

345

346

@Override

347

public void report() {

348

if (!isHealthy.get()) {

349

return; // Skip reporting if unhealthy

350

}

351

352

try {

353

doReport();

354

} catch (Exception e) {

355

isHealthy.set(false);

356

scheduleHealthCheck();

357

log.warn("Reporter became unhealthy", e);

358

}

359

}

360

361

private void scheduleHealthCheck() {

362

CompletableFuture.delayedExecutor(30, TimeUnit.SECONDS)

363

.execute(() -> {

364

try {

365

if (checkHealth()) {

366

isHealthy.set(true);

367

log.info("Reporter recovered");

368

}

369

} catch (Exception e) {

370

// Will retry on next scheduled check

371

}

372

});

373

}

374

}

375

```