or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdmetrics-testing.mdminicluster-management.mdresult-verification.mdspecialized-connectors.mdtest-data-sources.mdtest-environments.mdvalidation-utilities.md

metrics-testing.mddocs/

0

# Metrics Testing

1

2

Fluent assertions and metric listeners for validating Flink metrics including counters, gauges, histograms, and meters with easy retrieval and verification capabilities. Enables comprehensive testing of application metrics and monitoring.

3

4

## Capabilities

5

6

### Metric Assertions

7

8

AssertJ-style fluent assertions for Flink metrics providing type-safe verification of metric values with readable error messages.

9

10

```java { .api }

11

public class MetricAssertions {

12

public static CounterAssert assertThatCounter(Metric actual);

13

public static <T> GaugeAssert<T> assertThatGauge(Metric actual);

14

}

15

```

16

17

#### Counter Assertions

18

19

Assertions specifically for Counter metrics with value comparison capabilities.

20

21

```java { .api }

22

public static class CounterAssert extends AbstractAssert<CounterAssert, Counter> {

23

public CounterAssert isEqualTo(Object expected);

24

}

25

```

26

27

#### Gauge Assertions

28

29

Assertions for Gauge metrics supporting both exact value comparison and tolerance-based comparisons for numeric values.

30

31

```java { .api }

32

public static class GaugeAssert<T> extends AbstractAssert<GaugeAssert<T>, Gauge<T>> {

33

public GaugeAssert<T> isEqualTo(Object expected);

34

public GaugeAssert<T> isCloseTo(long value, long epsilon);

35

}

36

```

37

38

#### Usage Example

39

40

```java

41

import org.apache.flink.metrics.testutils.MetricAssertions;

42

import org.apache.flink.metrics.Counter;

43

import org.apache.flink.metrics.Gauge;

44

45

// Test counter metrics

46

Counter recordsProcessed = getCounterMetric("records_processed");

47

MetricAssertions.assertThatCounter(recordsProcessed)

48

.isEqualTo(1000L);

49

50

// Test gauge metrics with exact value

51

Gauge<Double> cpuUsage = getGaugeMetric("cpu_usage");

52

MetricAssertions.assertThatGauge(cpuUsage)

53

.isEqualTo(0.75);

54

55

// Test gauge metrics with tolerance

56

Gauge<Long> memoryUsage = getGaugeMetric("memory_usage");

57

MetricAssertions.assertThatGauge(memoryUsage)

58

.isCloseTo(1024L, 50L);

59

```

60

61

### Metric Listener

62

63

Comprehensive metric listener that captures metric registration events and provides convenient retrieval methods for all metric types.

64

65

```java { .api }

66

public class MetricListener implements MetricReporter {

67

public MetricListener();

68

69

public MetricGroup getMetricGroup();

70

71

public <T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);

72

public Optional<Meter> getMeter(String... identifier);

73

public Optional<Counter> getCounter(String... identifier);

74

public Optional<Histogram> getHistogram(String... identifier);

75

public <T> Optional<Gauge<T>> getGauge(String... identifier);

76

}

77

```

78

79

#### Usage Example

80

81

```java

82

import org.apache.flink.metrics.testutils.MetricListener;

83

import org.apache.flink.metrics.*;

84

85

// Create and register metric listener

86

MetricListener metricListener = new MetricListener();

87

88

// Register with Flink's metric system

89

Configuration config = new Configuration();

90

config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." +

91

ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,

92

MetricListener.class.getName());

93

94

// Retrieve specific metric types

95

Optional<Counter> counter = metricListener.getCounter("job", "task", "records_processed");

96

if (counter.isPresent()) {

97

long count = counter.get().getCount();

98

assertEquals(1000L, count);

99

}

100

101

Optional<Gauge<Double>> gauge = metricListener.getGauge("job", "task", "cpu_usage");

102

if (gauge.isPresent()) {

103

double usage = gauge.get().getValue();

104

assertTrue(usage >= 0.0 && usage <= 1.0);

105

}

106

107

Optional<Histogram> histogram = metricListener.getHistogram("job", "task", "latency");

108

if (histogram.isPresent()) {

109

long count = histogram.get().getCount();

110

assertTrue(count > 0);

111

}

112

113

Optional<Meter> meter = metricListener.getMeter("job", "task", "throughput");

114

if (meter.isPresent()) {

115

double rate = meter.get().getRate();

116

assertTrue(rate > 0.0);

117

}

118

```

119

120

### Generic Metric Retrieval

121

122

Type-safe generic method for retrieving any metric type with compile-time type checking.

123

124

```java { .api }

125

public <T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);

126

```

127

128

#### Usage Example

129

130

```java

131

// Retrieve custom metric types

132

Optional<MyCustomMetric> customMetric =

133

metricListener.getMetric(MyCustomMetric.class, "job", "custom");

134

135

// Retrieve standard metrics with type safety

136

Optional<Counter> typedCounter =

137

metricListener.getMetric(Counter.class, "records", "processed");

138

```

139

140

## Integration Patterns

141

142

### Test Case Integration

143

144

Complete example of integrating metrics testing into a Flink test case with metric verification.

145

146

```java

147

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

148

import org.apache.flink.metrics.testutils.MetricListener;

149

import org.apache.flink.metrics.testutils.MetricAssertions;

150

import org.apache.flink.test.junit5.MiniClusterExtension;

151

import org.junit.jupiter.api.Test;

152

import org.junit.jupiter.api.extension.RegisterExtension;

153

154

public class MetricsIntegrationTest {

155

156

@RegisterExtension

157

static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension();

158

159

@Test

160

void testJobMetrics() throws Exception {

161

MetricListener metricListener = new MetricListener();

162

163

StreamExecutionEnvironment env = MINI_CLUSTER.getTestStreamEnvironment();

164

165

// Configure metric reporting

166

env.getConfig().setGlobalJobParameters(

167

ParameterTool.fromMap(Collections.singletonMap("metrics.reporter", "test")));

168

169

// Create streaming job

170

env.fromElements(1, 2, 3, 4, 5)

171

.map(new CountingMapFunction())

172

.print();

173

174

// Execute job

175

JobExecutionResult result = env.execute("Metrics Test Job");

176

177

// Verify metrics were captured

178

Optional<Counter> processedRecords =

179

metricListener.getCounter("job", "map", "records_processed");

180

assertTrue(processedRecords.isPresent());

181

182

MetricAssertions.assertThatCounter(processedRecords.get())

183

.isEqualTo(5L);

184

}

185

}

186

```

187

188

### Custom Metric Verification

189

190

Pattern for testing custom business metrics in Flink applications.

191

192

```java

193

@Test

194

void testCustomBusinessMetrics() throws Exception {

195

MetricListener listener = new MetricListener();

196

197

// Execute job with custom metrics

198

executeJobWithCustomMetrics();

199

200

// Verify business-specific metrics

201

Optional<Gauge<Long>> orderCount = listener.getGauge("business", "orders", "total");

202

Optional<Counter> errorCount = listener.getCounter("business", "errors", "validation");

203

Optional<Histogram> processingTime = listener.getHistogram("business", "processing", "duration");

204

205

assertTrue(orderCount.isPresent());

206

assertTrue(errorCount.isPresent());

207

assertTrue(processingTime.isPresent());

208

209

// Verify metric values

210

assertTrue(orderCount.get().getValue() > 0);

211

assertEquals(0L, errorCount.get().getCount()); // No errors expected

212

assertTrue(processingTime.get().getCount() > 0);

213

}

214

```

215

216

### Metric Group Access

217

218

Access to the metric group hierarchy for advanced metric organization verification.

219

220

```java

221

@Test

222

void testMetricGroupStructure() {

223

MetricListener listener = new MetricListener();

224

MetricGroup rootGroup = listener.getMetricGroup();

225

226

// Verify metric group hierarchy

227

assertNotNull(rootGroup);

228

229

// Access nested metric groups and verify structure

230

// Implementation depends on your specific metric organization

231

}

232

```

233

234

## Error Handling and Debugging

235

236

The metrics testing utilities provide clear error messages when:

237

238

- Metrics are not found at the specified path

239

- Metric types don't match expected types

240

- Assertion values don't match actual metric values

241

- Tolerance-based comparisons fail for numeric metrics

242

243

This enables quick identification and resolution of metrics-related test failures, helping ensure your Flink applications properly expose and update their metrics during execution.