or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-collection.mdindex.mdmetrics-testing.mdsecurity-testing.mdtest-data-providers.mdtest-environments.md

metrics-testing.mddocs/

0

# Metrics Testing

1

2

Comprehensive utilities for testing metrics reporting in Flink applications, including metric collection, validation, and assertion utilities using Hamcrest matchers.

3

4

## Capabilities

5

6

### Metric Listener

7

8

`MetricListener` captures and stores metrics registered under a root metric group for retrieval and testing.

9

10

```java { .api }

11

/**

12

* Listens for metric and group registration under a root metric group and stores them for retrieval

13

*/

14

public class MetricListener {

15

/**

16

* Delimiter for metric identifiers

17

*/

18

public static final String DELIMITER = ".";

19

20

/**

21

* Name of root metric group

22

*/

23

public static final String ROOT_METRIC_GROUP_NAME = "rootMetricGroup";

24

25

/**

26

* Creates new metric listener with testing registry

27

*/

28

public MetricListener();

29

30

/**

31

* Returns the root metric group for registering metrics

32

* @return MetricGroup for metric registration

33

*/

34

public MetricGroup getMetricGroup();

35

36

/**

37

* Gets registered metric by type and identifier

38

* @param metricType Class of the metric type to retrieve

39

* @param identifier Metric identifier components

40

* @return Optional containing the metric if found

41

*/

42

public <T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);

43

44

/**

45

* Gets registered Meter metric

46

* @param identifier Metric identifier components

47

* @return Optional containing the Meter if found

48

*/

49

public Optional<Meter> getMeter(String... identifier);

50

51

/**

52

* Gets registered Counter metric

53

* @param identifier Metric identifier components

54

* @return Optional containing the Counter if found

55

*/

56

public Optional<Counter> getCounter(String... identifier);

57

58

/**

59

* Gets registered Histogram metric

60

* @param identifier Metric identifier components

61

* @return Optional containing the Histogram if found

62

*/

63

public Optional<Histogram> getHistogram(String... identifier);

64

65

/**

66

* Gets registered Gauge metric

67

* @param identifier Metric identifier components

68

* @return Optional containing the Gauge if found

69

*/

70

public <T> Optional<Gauge<T>> getGauge(String... identifier);

71

}

72

```

73

74

**Usage Example:**

75

76

```java

77

import org.apache.flink.metrics.testutils.MetricListener;

78

import org.apache.flink.metrics.Counter;

79

import org.apache.flink.metrics.MetricGroup;

80

81

@Test

82

public void testMetricsReporting() throws Exception {

83

// Create metric listener

84

MetricListener metricListener = new MetricListener();

85

MetricGroup metricGroup = metricListener.getMetricGroup();

86

87

// Register a counter metric

88

Counter myCounter = metricGroup.counter("test", "myCounter");

89

myCounter.inc(5);

90

91

// Retrieve and verify the metric

92

Optional<Counter> retrievedCounter = metricListener.getCounter("test", "myCounter");

93

assertTrue(retrievedCounter.isPresent());

94

assertEquals(5L, retrievedCounter.get().getCount());

95

96

// Test with subgroups

97

MetricGroup subGroup = metricGroup.addGroup("subgroup");

98

Counter subCounter = subGroup.counter("subCounter");

99

subCounter.inc(10);

100

101

Optional<Counter> retrievedSubCounter = metricListener.getCounter("subgroup", "subCounter");

102

assertTrue(retrievedSubCounter.isPresent());

103

assertEquals(10L, retrievedSubCounter.get().getCount());

104

}

105

```

106

107

### Metric Matchers

108

109

`MetricMatchers` provides Hamcrest matchers for metric assertions in tests.

110

111

```java { .api }

112

/**

113

* Provides Hamcrest matchers for metric assertions in tests

114

*/

115

public class MetricMatchers {

116

117

/**

118

* Creates matcher for Gauge metrics

119

* @param valueMatcher Matcher for the gauge value

120

* @return Matcher that checks if metric is a Gauge with matching value

121

*/

122

public static <V, T extends Metric> Matcher<T> isGauge(Matcher<V> valueMatcher);

123

124

/**

125

* Creates matcher for Counter metrics

126

* @param valueMatcher Matcher for the counter value

127

* @return Matcher that checks if metric is a Counter with matching value

128

*/

129

public static <T extends Metric> Matcher<T> isCounter(Matcher<Long> valueMatcher);

130

}

131

```

132

133

**Usage Example:**

134

135

```java

136

import org.apache.flink.metrics.testutils.MetricListener;

137

import org.apache.flink.metrics.testutils.MetricMatchers;

138

import org.apache.flink.metrics.Counter;

139

import org.apache.flink.metrics.Gauge;

140

import org.apache.flink.metrics.MetricGroup;

141

import static org.hamcrest.MatcherAssert.assertThat;

142

import static org.hamcrest.Matchers.*;

143

144

@Test

145

public void testMetricMatchers() throws Exception {

146

MetricListener metricListener = new MetricListener();

147

MetricGroup metricGroup = metricListener.getMetricGroup();

148

149

// Test Counter matcher

150

Counter counter = metricGroup.counter("testCounter");

151

counter.inc(42);

152

153

Optional<Counter> retrievedCounter = metricListener.getCounter("testCounter");

154

assertTrue(retrievedCounter.isPresent());

155

156

// Use metric matcher to assert counter value

157

assertThat(retrievedCounter.get(), MetricMatchers.isCounter(equalTo(42L)));

158

assertThat(retrievedCounter.get(), MetricMatchers.isCounter(greaterThan(40L)));

159

160

// Test Gauge matcher

161

Gauge<Integer> gauge = metricGroup.gauge("testGauge", () -> 100);

162

163

Optional<Gauge<Integer>> retrievedGauge = metricListener.getGauge("testGauge");

164

assertTrue(retrievedGauge.isPresent());

165

166

// Use metric matcher to assert gauge value

167

assertThat(retrievedGauge.get(), MetricMatchers.isGauge(equalTo(100)));

168

assertThat(retrievedGauge.get(), MetricMatchers.isGauge(greaterThanOrEqualTo(100)));

169

}

170

```

171

172

## Complete Metrics Testing Example

173

174

Here's a comprehensive example showing how to test metrics in a Flink application:

175

176

```java

177

import org.apache.flink.api.common.functions.RichMapFunction;

178

import org.apache.flink.configuration.Configuration;

179

import org.apache.flink.metrics.Counter;

180

import org.apache.flink.metrics.Gauge;

181

import org.apache.flink.metrics.MetricGroup;

182

import org.apache.flink.metrics.testutils.MetricListener;

183

import org.apache.flink.metrics.testutils.MetricMatchers;

184

import org.apache.flink.streaming.api.datastream.DataStream;

185

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

186

import org.apache.flink.test.util.AbstractTestBase;

187

188

public class MetricsTestExample extends AbstractTestBase {

189

190

@Test

191

public void testJobMetrics() throws Exception {

192

MetricListener metricListener = new MetricListener();

193

194

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

195

env.setParallelism(1);

196

197

// Create a function that reports metrics

198

DataStream<String> input = env.fromElements("a", "b", "c", "d", "e");

199

DataStream<String> result = input.map(new MetricsReportingMapFunction(metricListener));

200

201

result.print();

202

env.execute("Metrics Test Job");

203

204

// Verify metrics were reported

205

Optional<Counter> processedCounter = metricListener.getCounter("processed_elements");

206

assertTrue("Counter should be present", processedCounter.isPresent());

207

assertThat(processedCounter.get(), MetricMatchers.isCounter(equalTo(5L)));

208

209

Optional<Gauge<Long>> lastProcessedTime = metricListener.getGauge("last_processed_time");

210

assertTrue("Gauge should be present", lastProcessedTime.isPresent());

211

assertThat(lastProcessedTime.get(), MetricMatchers.isGauge(greaterThan(0L)));

212

}

213

214

private static class MetricsReportingMapFunction extends RichMapFunction<String, String> {

215

private final MetricListener metricListener;

216

private Counter processedElements;

217

private volatile long lastProcessedTime = 0;

218

219

public MetricsReportingMapFunction(MetricListener metricListener) {

220

this.metricListener = metricListener;

221

}

222

223

@Override

224

public void open(Configuration parameters) {

225

MetricGroup metricGroup = metricListener.getMetricGroup();

226

processedElements = metricGroup.counter("processed_elements");

227

metricGroup.gauge("last_processed_time", () -> lastProcessedTime);

228

}

229

230

@Override

231

public String map(String value) {

232

processedElements.inc();

233

lastProcessedTime = System.currentTimeMillis();

234

return value.toUpperCase();

235

}

236

}

237

}

238

```

239

240

## Testing Different Metric Types

241

242

### Testing Histograms

243

244

```java

245

import org.apache.flink.metrics.Histogram;

246

247

@Test

248

public void testHistogramMetrics() throws Exception {

249

MetricListener metricListener = new MetricListener();

250

MetricGroup metricGroup = metricListener.getMetricGroup();

251

252

// Register histogram

253

Histogram histogram = metricGroup.histogram("response_time");

254

255

// Report some values

256

histogram.update(100);

257

histogram.update(200);

258

histogram.update(150);

259

260

// Retrieve and verify

261

Optional<Histogram> retrievedHistogram = metricListener.getHistogram("response_time");

262

assertTrue(retrievedHistogram.isPresent());

263

assertEquals(3, retrievedHistogram.get().getCount());

264

}

265

```

266

267

### Testing Meters

268

269

```java

270

import org.apache.flink.metrics.Meter;

271

272

@Test

273

public void testMeterMetrics() throws Exception {

274

MetricListener metricListener = new MetricListener();

275

MetricGroup metricGroup = metricListener.getMetricGroup();

276

277

// Register meter

278

Meter meter = metricGroup.meter("requests_per_second");

279

280

// Mark some events

281

meter.markEvent();

282

meter.markEvent(5);

283

284

// Retrieve and verify

285

Optional<Meter> retrievedMeter = metricListener.getMeter("requests_per_second");

286

assertTrue(retrievedMeter.isPresent());

287

assertEquals(6, retrievedMeter.get().getCount());

288

}

289

```