or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-metrics.mdimplementations.mdindex.mdmetric-groups.mdreporters.mdspecialized-groups.mdtracing.md

implementations.mddocs/

0

# Metric Implementations

1

2

Concrete implementations of metric interfaces providing both thread-safe and performance-optimized variants. These implementations offer ready-to-use metric types with different performance characteristics for various use cases.

3

4

## Capabilities

5

6

### SimpleCounter Implementation

7

8

Basic non-thread-safe counter implementation optimized for single-threaded scenarios with minimal overhead.

9

10

```java { .api }

11

/**

12

* A simple low-overhead Counter that is not thread-safe.

13

*/

14

@Internal

15

public class SimpleCounter implements Counter {

16

17

/** Increment the current count by 1. */

18

@Override

19

public void inc() { /* implementation */ }

20

21

/**

22

* Increment the current count by the given value.

23

* @param n value to increment the current count by

24

*/

25

@Override

26

public void inc(long n) { /* implementation */ }

27

28

/** Decrement the current count by 1. */

29

@Override

30

public void dec() { /* implementation */ }

31

32

/**

33

* Decrement the current count by the given value.

34

* @param n value to decrement the current count by

35

*/

36

@Override

37

public void dec(long n) { /* implementation */ }

38

39

/**

40

* Returns the current count.

41

* @return current count

42

*/

43

@Override

44

public long getCount() { /* implementation */ }

45

}

46

```

47

48

**Usage Examples:**

49

50

```java

51

// Create simple counter for single-threaded use

52

Counter simpleCounter = new SimpleCounter();

53

54

// Basic operations

55

simpleCounter.inc(); // Fast increment

56

simpleCounter.inc(10); // Fast bulk increment

57

simpleCounter.dec(); // Fast decrement

58

simpleCounter.dec(5); // Fast bulk decrement

59

60

long count = simpleCounter.getCount(); // Fast read

61

62

// Use case: single-threaded operators

63

public class SingleThreadedOperator {

64

private final Counter processedRecords = new SimpleCounter();

65

66

public void processRecord(Record record) {

67

// Process record...

68

processedRecords.inc(); // No synchronization overhead

69

}

70

}

71

```

72

73

### ThreadSafeSimpleCounter Implementation

74

75

Thread-safe counter implementation using atomic operations, suitable for multi-threaded scenarios.

76

77

```java { .api }

78

/**

79

* A simple low-overhead Counter that is thread-safe.

80

*/

81

@Internal

82

public class ThreadSafeSimpleCounter implements Counter {

83

84

/** Increment the current count by 1. */

85

@Override

86

public void inc() { /* uses LongAdder.increment() */ }

87

88

/**

89

* Increment the current count by the given value.

90

* @param n value to increment the current count by

91

*/

92

@Override

93

public void inc(long n) { /* uses LongAdder.add(n) */ }

94

95

/** Decrement the current count by 1. */

96

@Override

97

public void dec() { /* uses LongAdder.decrement() */ }

98

99

/**

100

* Decrement the current count by the given value.

101

* @param n value to decrement the current count by

102

*/

103

@Override

104

public void dec(long n) { /* uses LongAdder.add(-n) */ }

105

106

/**

107

* Returns the current count.

108

* @return current count

109

*/

110

@Override

111

public long getCount() { /* uses LongAdder.longValue() */ }

112

}

113

```

114

115

**Usage Examples:**

116

117

```java

118

// Create thread-safe counter for multi-threaded use

119

Counter threadSafeCounter = new ThreadSafeSimpleCounter();

120

121

// Safe for concurrent access

122

ExecutorService executor = Executors.newFixedThreadPool(10);

123

for (int i = 0; i < 100; i++) {

124

executor.submit(() -> {

125

threadSafeCounter.inc(); // Thread-safe increment

126

});

127

}

128

129

// Use case: shared counters across multiple threads

130

public class MultiThreadedProcessor {

131

private final Counter totalProcessed = new ThreadSafeSimpleCounter();

132

133

// Called from multiple threads

134

public void processInParallel(List<Record> records) {

135

records.parallelStream().forEach(record -> {

136

processRecord(record);

137

totalProcessed.inc(); // Safe concurrent increment

138

});

139

}

140

}

141

```

142

143

### MeterView Implementation

144

145

Meter implementation that provides average rate calculations over a specified time window using a circular buffer approach.

146

147

```java { .api }

148

/**

149

* A MeterView provides an average rate of events per second over a given time period.

150

* The primary advantage is that the rate is neither updated by the computing thread

151

* nor for every event. Instead, a history of counts is maintained that is updated

152

* in regular intervals by a background thread.

153

*/

154

@Internal

155

public class MeterView implements Meter, View {

156

157

/**

158

* Creates a MeterView with specified time span.

159

* @param timeSpanInSeconds time span over which to calculate average

160

*/

161

public MeterView(int timeSpanInSeconds) { /* implementation */ }

162

163

/**

164

* Creates a MeterView with a custom counter and default time span.

165

* @param counter the underlying counter

166

*/

167

public MeterView(Counter counter) { /* implementation */ }

168

169

/**

170

* Creates a MeterView with custom counter and time span.

171

* @param counter the underlying counter

172

* @param timeSpanInSeconds time span over which to calculate average

173

*/

174

public MeterView(Counter counter, int timeSpanInSeconds) { /* implementation */ }

175

176

/**

177

* Creates a MeterView from a gauge that returns numeric values.

178

* @param numberGauge gauge providing numeric values

179

*/

180

public MeterView(Gauge<? extends Number> numberGauge) { /* implementation */ }

181

182

/** Mark occurrence of an event. */

183

@Override

184

public void markEvent() { /* delegates to counter.inc() */ }

185

186

/**

187

* Mark occurrence of multiple events.

188

* @param n number of events occurred

189

*/

190

@Override

191

public void markEvent(long n) { /* delegates to counter.inc(n) */ }

192

193

/**

194

* Returns the current rate of events per second.

195

* @return current rate of events per second

196

*/

197

@Override

198

public double getRate() { /* returns calculated rate */ }

199

200

/**

201

* Get number of events marked on the meter.

202

* @return number of events marked on the meter

203

*/

204

@Override

205

public long getCount() { /* delegates to counter.getCount() */ }

206

207

/**

208

* Called periodically to update the rate calculation.

209

* This is part of the View interface.

210

*/

211

@Override

212

public void update() { /* updates internal rate calculation */ }

213

}

214

```

215

216

**Usage Examples:**

217

218

```java

219

// Create meter with default 60-second window

220

MeterView throughputMeter = new MeterView(60);

221

222

// Create meter with custom counter

223

Counter customCounter = new ThreadSafeSimpleCounter();

224

MeterView customMeter = new MeterView(customCounter, 30); // 30-second window

225

226

// Create meter from gauge

227

Gauge<Long> queueSizeGauge = () -> messageQueue.size();

228

MeterView queueGrowthRate = new MeterView(queueSizeGauge);

229

230

// Usage in streaming context

231

public class StreamProcessor {

232

private final MeterView processingRate = new MeterView(30);

233

234

public void processMessage(Message message) {

235

// Process the message...

236

237

processingRate.markEvent(); // Update rate calculation

238

239

// Can also mark batch events

240

if (message.isBatch()) {

241

processingRate.markEvent(message.getBatchSize());

242

}

243

}

244

245

public void reportMetrics() {

246

double currentRate = processingRate.getRate(); // events/second

247

long totalEvents = processingRate.getCount(); // total processed

248

249

System.out.printf("Processing %.2f events/sec, %d total%n",

250

currentRate, totalEvents);

251

}

252

}

253

254

// Integration with View update system

255

public class MetricUpdater {

256

private final List<View> views = new ArrayList<>();

257

private final ScheduledExecutorService scheduler =

258

Executors.newScheduledThreadPool(1);

259

260

public void registerView(View view) {

261

views.add(view);

262

}

263

264

public void startUpdating() {

265

scheduler.scheduleAtFixedRate(

266

() -> views.forEach(View::update),

267

0,

268

View.UPDATE_INTERVAL_SECONDS,

269

TimeUnit.SECONDS

270

);

271

}

272

}

273

```

274

275

### View Interface for Background Updates

276

277

Interface for metrics that require periodic background updates, such as time-windowed calculations.

278

279

```java { .api }

280

/**

281

* An interface for metrics which should be updated in regular intervals

282

* by a background thread.

283

*/

284

public interface View {

285

/** The interval in which metrics are updated. */

286

int UPDATE_INTERVAL_SECONDS = 5;

287

288

/** This method will be called regularly to update the metric. */

289

void update();

290

}

291

```

292

293

**Usage Examples:**

294

295

```java

296

// Custom view implementation

297

public class MovingAverageGauge implements Gauge<Double>, View {

298

private final Queue<Double> values = new LinkedList<>();

299

private final int windowSize;

300

private double currentAverage = 0.0;

301

302

public MovingAverageGauge(int windowSize) {

303

this.windowSize = windowSize;

304

}

305

306

public void addValue(double value) {

307

synchronized (values) {

308

values.offer(value);

309

if (values.size() > windowSize) {

310

values.poll();

311

}

312

}

313

}

314

315

@Override

316

public Double getValue() {

317

return currentAverage;

318

}

319

320

@Override

321

public void update() {

322

synchronized (values) {

323

if (!values.isEmpty()) {

324

currentAverage = values.stream()

325

.mapToDouble(Double::doubleValue)

326

.average()

327

.orElse(0.0);

328

}

329

}

330

}

331

}

332

333

// Register view for automatic updates

334

MovingAverageGauge avgLatency = new MovingAverageGauge(100);

335

metricGroup.gauge("average-latency", avgLatency);

336

337

// The Flink metrics system will automatically call update() every 5 seconds

338

```

339

340

### Performance Characteristics

341

342

Understanding when to use each implementation:

343

344

**SimpleCounter:**

345

- **Use when**: Single-threaded access, maximum performance needed

346

- **Performance**: Fastest, no synchronization overhead

347

- **Thread safety**: Not thread-safe

348

- **Memory**: Minimal memory footprint

349

350

**ThreadSafeSimpleCounter:**

351

- **Use when**: Multi-threaded access, good performance needed

352

- **Performance**: High performance with low contention

353

- **Thread safety**: Thread-safe using LongAdder

354

- **Memory**: Low memory footprint, optimized for concurrent access

355

356

**MeterView:**

357

- **Use when**: Need rate calculations over time

358

- **Performance**: Background calculation, minimal impact on hot path

359

- **Thread safety**: Thread-safe for event marking

360

- **Memory**: Memory usage scales with time window size

361

362

```java

363

// Performance comparison example

364

public class CounterPerformanceTest {

365

366

@Test

367

public void singleThreadedPerformance() {

368

Counter simple = new SimpleCounter();

369

Counter threadSafe = new ThreadSafeSimpleCounter();

370

371

// SimpleCounter is faster for single-threaded use

372

long start = System.nanoTime();

373

for (int i = 0; i < 1_000_000; i++) {

374

simple.inc();

375

}

376

long simpleTime = System.nanoTime() - start;

377

378

start = System.nanoTime();

379

for (int i = 0; i < 1_000_000; i++) {

380

threadSafe.inc();

381

}

382

long threadSafeTime = System.nanoTime() - start;

383

384

// simpleTime < threadSafeTime for single-threaded access

385

}

386

387

@Test

388

public void multiThreadedPerformance() {

389

Counter threadSafe = new ThreadSafeSimpleCounter();

390

391

// ThreadSafeSimpleCounter scales well with multiple threads

392

ExecutorService executor = Executors.newFixedThreadPool(8);

393

394

long start = System.nanoTime();

395

List<Future<?>> futures = new ArrayList<>();

396

for (int thread = 0; thread < 8; thread++) {

397

futures.add(executor.submit(() -> {

398

for (int i = 0; i < 125_000; i++) { // 1M total

399

threadSafe.inc();

400

}

401

}));

402

}

403

404

futures.forEach(f -> {

405

try { f.get(); } catch (Exception e) { /* handle */ }

406

});

407

long multiThreadTime = System.nanoTime() - start;

408

409

assertEquals(1_000_000L, threadSafe.getCount());

410

}

411

}

412

```