or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-metrics.mdimplementations.mdindex.mdmetric-groups.mdreporters.mdspecialized-groups.mdtracing.md

tracing.mddocs/

0

# Tracing Support

1

2

Experimental distributed tracing capabilities for capturing execution spans across Flink's distributed runtime. Provides span creation, attribute attachment, and pluggable trace reporting for observability in distributed stream processing applications.

3

4

## Capabilities

5

6

### Span Interface

7

8

Core interface representing a distributed tracing span that captures something that happened in Flink at a certain point in time.

9

10

```java { .api }

11

/**

12

* Span represents something that happened in Flink at certain point of time,

13

* that will be reported to a TraceReporter. Currently we don't support traces

14

* with multiple spans. Each span is self-contained and represents things like

15

* a checkpoint or recovery.

16

*/

17

@Experimental

18

public interface Span {

19

20

/**

21

* Creates a new SpanBuilder for constructing Span instances.

22

* @param classScope Flink's convention is that the scope of each Span is

23

* defined by the class that is creating it

24

* @param name Human readable name of this span

25

* @return SpanBuilder for constructing the span

26

*/

27

static SpanBuilder builder(Class<?> classScope, String name);

28

29

/**

30

* Returns the scope of this span.

31

* @return scope string (typically class canonical name)

32

*/

33

String getScope();

34

35

/**

36

* Returns the name of this span.

37

* @return human readable span name

38

*/

39

String getName();

40

41

/**

42

* Returns the start timestamp in milliseconds.

43

* @return start timestamp in milliseconds since epoch

44

*/

45

long getStartTsMillis();

46

47

/**

48

* Returns the end timestamp in milliseconds.

49

* @return end timestamp in milliseconds since epoch

50

*/

51

long getEndTsMillis();

52

53

/**

54

* Returns the attributes attached to this span.

55

* Currently returned values can be of type String, Long or Double,

56

* however more types can be added in the future.

57

* @return map of attribute names to values

58

*/

59

Map<String, Object> getAttributes();

60

}

61

```

62

63

**Usage Examples:**

64

65

```java

66

// Basic span creation

67

public class CheckpointCoordinator {

68

69

public void triggerCheckpoint() {

70

Span checkpointSpan = Span.builder(CheckpointCoordinator.class, "checkpoint")

71

.setStartTsMillis(System.currentTimeMillis())

72

.setAttribute("checkpoint-id", checkpointId)

73

.setAttribute("num-tasks", numberOfTasks)

74

.build();

75

76

try {

77

performCheckpoint();

78

79

// Update span with completion info

80

checkpointSpan = Span.builder(CheckpointCoordinator.class, "checkpoint")

81

.setStartTsMillis(startTime)

82

.setEndTsMillis(System.currentTimeMillis())

83

.setAttribute("checkpoint-id", checkpointId)

84

.setAttribute("num-tasks", numberOfTasks)

85

.setAttribute("success", true)

86

.setAttribute("duration-ms", System.currentTimeMillis() - startTime)

87

.build();

88

89

} catch (Exception e) {

90

// Span for failed checkpoint

91

checkpointSpan = Span.builder(CheckpointCoordinator.class, "checkpoint")

92

.setStartTsMillis(startTime)

93

.setEndTsMillis(System.currentTimeMillis())

94

.setAttribute("checkpoint-id", checkpointId)

95

.setAttribute("success", false)

96

.setAttribute("error", e.getMessage())

97

.build();

98

}

99

100

// Report span

101

metricGroup.addSpan(checkpointSpan.builder(CheckpointCoordinator.class, "checkpoint"));

102

}

103

}

104

105

// Span for operator lifecycle events

106

public class StreamOperatorLifecycle {

107

108

public void open() throws Exception {

109

long startTime = System.currentTimeMillis();

110

111

try {

112

performOpen();

113

114

Span openSpan = Span.builder(this.getClass(), "operator-open")

115

.setStartTsMillis(startTime)

116

.setEndTsMillis(System.currentTimeMillis())

117

.setAttribute("operator-name", getOperatorName())

118

.setAttribute("parallelism", getParallelism())

119

.setAttribute("subtask-index", getSubtaskIndex())

120

.setAttribute("success", true)

121

.build();

122

123

reportSpan(openSpan);

124

125

} catch (Exception e) {

126

Span failedOpenSpan = Span.builder(this.getClass(), "operator-open")

127

.setStartTsMillis(startTime)

128

.setEndTsMillis(System.currentTimeMillis())

129

.setAttribute("operator-name", getOperatorName())

130

.setAttribute("success", false)

131

.setAttribute("error", e.getMessage())

132

.build();

133

134

reportSpan(failedOpenSpan);

135

throw e;

136

}

137

}

138

}

139

```

140

141

### SpanBuilder Class

142

143

Builder for constructing Span instances with fluent API for setting timestamps and attributes.

144

145

```java { .api }

146

/**

147

* Builder used to construct Span instances.

148

*/

149

@Experimental

150

public class SpanBuilder {

151

/**

152

* Constructor for SpanBuilder.

153

* @param classScope Flink's convention is that the scope of each Span is

154

* defined by the class that is creating it. If you are

155

* building the Span in your class MyClass, as the classScope

156

* you should pass MyClass.class.

157

* @param name Human readable name of this span, that describes what the

158

* built Span will represent.

159

*/

160

SpanBuilder(Class<?> classScope, String name);

161

162

/**

163

* Builds the Span instance.

164

* @return constructed Span

165

*/

166

public Span build();

167

168

/**

169

* Optionally you can manually set the Span's startTs. If not specified,

170

* System.currentTimeMillis() will be used.

171

* @param startTsMillis start timestamp in milliseconds

172

* @return this SpanBuilder for method chaining

173

*/

174

public SpanBuilder setStartTsMillis(long startTsMillis);

175

176

/**

177

* Optionally you can manually set the Span's endTs. If not specified,

178

* startTsMillis will be used.

179

* @param endTsMillis end timestamp in milliseconds

180

* @return this SpanBuilder for method chaining

181

*/

182

public SpanBuilder setEndTsMillis(long endTsMillis);

183

184

/**

185

* Additional string attribute to be attached to this Span.

186

* @param key attribute key

187

* @param value string attribute value

188

* @return this SpanBuilder for method chaining

189

*/

190

public SpanBuilder setAttribute(String key, String value);

191

192

/**

193

* Additional long attribute to be attached to this Span.

194

* @param key attribute key

195

* @param value long attribute value

196

* @return this SpanBuilder for method chaining

197

*/

198

public SpanBuilder setAttribute(String key, long value);

199

200

/**

201

* Additional double attribute to be attached to this Span.

202

* @param key attribute key

203

* @param value double attribute value

204

* @return this SpanBuilder for method chaining

205

*/

206

public SpanBuilder setAttribute(String key, double value);

207

}

208

```

209

210

**Usage Examples:**

211

212

```java

213

// Comprehensive span creation

214

public class TaskExecutor {

215

216

public void executeTask(Task task) {

217

SpanBuilder spanBuilder = Span.builder(TaskExecutor.class, "task-execution")

218

.setAttribute("task-id", task.getId())

219

.setAttribute("task-type", task.getType())

220

.setAttribute("parallelism", task.getParallelism())

221

.setAttribute("operator-chain-length", task.getOperatorChain().size());

222

223

long startTime = System.currentTimeMillis();

224

spanBuilder.setStartTsMillis(startTime);

225

226

try {

227

Object result = task.execute();

228

229

long endTime = System.currentTimeMillis();

230

long duration = endTime - startTime;

231

232

Span successSpan = spanBuilder

233

.setEndTsMillis(endTime)

234

.setAttribute("success", true)

235

.setAttribute("duration-ms", duration)

236

.setAttribute("result-size", getResultSize(result))

237

.build();

238

239

reportSpan(successSpan);

240

241

} catch (Exception e) {

242

long endTime = System.currentTimeMillis();

243

244

Span failureSpan = spanBuilder

245

.setEndTsMillis(endTime)

246

.setAttribute("success", false)

247

.setAttribute("error-type", e.getClass().getSimpleName())

248

.setAttribute("error-message", e.getMessage())

249

.setAttribute("duration-ms", endTime - startTime)

250

.build();

251

252

reportSpan(failureSpan);

253

throw e;

254

}

255

}

256

}

257

258

// Network operation tracing

259

public class NetworkClient {

260

261

public void sendData(byte[] data, String destination) {

262

Span networkSpan = Span.builder(NetworkClient.class, "network-send")

263

.setStartTsMillis(System.currentTimeMillis())

264

.setAttribute("destination", destination)

265

.setAttribute("data-size", data.length)

266

.setAttribute("protocol", "tcp")

267

.build();

268

269

// For immediate operations, start and end can be the same

270

reportSpan(networkSpan);

271

}

272

273

public CompletableFuture<Response> sendRequestAsync(Request request) {

274

long startTime = System.currentTimeMillis();

275

276

return sendAsync(request)

277

.whenComplete((response, throwable) -> {

278

long endTime = System.currentTimeMillis();

279

280

SpanBuilder spanBuilder = Span.builder(NetworkClient.class, "async-request")

281

.setStartTsMillis(startTime)

282

.setEndTsMillis(endTime)

283

.setAttribute("request-type", request.getType())

284

.setAttribute("request-size", request.getSize())

285

.setAttribute("duration-ms", endTime - startTime);

286

287

if (throwable == null) {

288

Span successSpan = spanBuilder

289

.setAttribute("success", true)

290

.setAttribute("response-size", response.getSize())

291

.setAttribute("status-code", response.getStatusCode())

292

.build();

293

reportSpan(successSpan);

294

} else {

295

Span errorSpan = spanBuilder

296

.setAttribute("success", false)

297

.setAttribute("error", throwable.getMessage())

298

.build();

299

reportSpan(errorSpan);

300

}

301

});

302

}

303

}

304

305

// State backend operation tracing

306

public class StateBackendTracing {

307

308

public void checkpoint(CheckpointId checkpointId) {

309

SpanBuilder checkpointSpan = Span.builder(StateBackendTracing.class, "state-checkpoint")

310

.setAttribute("checkpoint-id", checkpointId.getValue())

311

.setAttribute("backend-type", getBackendType());

312

313

long startTime = System.currentTimeMillis();

314

315

try {

316

long stateSize = performCheckpoint(checkpointId);

317

long endTime = System.currentTimeMillis();

318

319

Span completedSpan = checkpointSpan

320

.setStartTsMillis(startTime)

321

.setEndTsMillis(endTime)

322

.setAttribute("state-size-bytes", stateSize)

323

.setAttribute("duration-ms", endTime - startTime)

324

.setAttribute("success", true)

325

.build();

326

327

reportSpan(completedSpan);

328

329

} catch (Exception e) {

330

Span failedSpan = checkpointSpan

331

.setStartTsMillis(startTime)

332

.setEndTsMillis(System.currentTimeMillis())

333

.setAttribute("success", false)

334

.setAttribute("error", e.getMessage())

335

.build();

336

337

reportSpan(failedSpan);

338

throw e;

339

}

340

}

341

}

342

```

343

344

### SimpleSpan Implementation

345

346

Default implementation of the Span interface.

347

348

```java { .api }

349

/**

350

* Default implementation of Span interface.

351

*/

352

class SimpleSpan implements Span {

353

// Internal implementation constructed by SpanBuilder

354

}

355

```

356

357

### TraceReporter Interface

358

359

Interface for exporting spans to external tracing systems, similar to MetricReporter but for trace data.

360

361

```java { .api }

362

/**

363

* Trace reporters are used to export Spans to an external backend.

364

* Reporters are instantiated via a TraceReporterFactory.

365

*/

366

@Experimental

367

public interface TraceReporter {

368

369

/**

370

* Configures this reporter. If the reporter was instantiated generically

371

* and hence parameter-less, this method is the place where the reporter

372

* sets its basic fields based on configuration values.

373

* This method is always called first on a newly instantiated reporter.

374

* @param config A properties object that contains all parameters set for this reporter

375

*/

376

void open(MetricConfig config);

377

378

/**

379

* Closes this reporter. Should be used to close channels, streams and release resources.

380

*/

381

void close();

382

383

/**

384

* Called when a new Span is added.

385

* @param span the span that was added

386

*/

387

void notifyOfAddedSpan(Span span);

388

}

389

```

390

391

**Usage Examples:**

392

393

```java

394

// Custom trace reporter implementation

395

public class JaegerTraceReporter implements TraceReporter {

396

private JaegerTracer tracer;

397

private String serviceName;

398

private String jaegerEndpoint;

399

400

@Override

401

public void open(MetricConfig config) {

402

this.serviceName = config.getString("service.name", "flink-application");

403

this.jaegerEndpoint = config.getString("jaeger.endpoint", "http://localhost:14268/api/traces");

404

405

// Initialize Jaeger tracer

406

this.tracer = Configuration.fromEnv(serviceName)

407

.withSampling(Configuration.SamplerConfiguration.fromEnv()

408

.withType(ConstSampler.TYPE)

409

.withParam(1))

410

.withReporter(Configuration.ReporterConfiguration.fromEnv()

411

.withSender(Configuration.SenderConfiguration.fromEnv()

412

.withEndpoint(jaegerEndpoint)))

413

.getTracer();

414

}

415

416

@Override

417

public void notifyOfAddedSpan(Span span) {

418

// Convert Flink span to Jaeger span

419

io.opentracing.Span jaegerSpan = tracer.buildSpan(span.getName())

420

.withStartTimestamp(span.getStartTsMillis() * 1000) // Convert to microseconds

421

.start();

422

423

// Add attributes as tags

424

Map<String, Object> attributes = span.getAttributes();

425

for (Map.Entry<String, Object> entry : attributes.entrySet()) {

426

String key = entry.getKey();

427

Object value = entry.getValue();

428

429

if (value instanceof String) {

430

jaegerSpan.setTag(key, (String) value);

431

} else if (value instanceof Number) {

432

jaegerSpan.setTag(key, (Number) value);

433

} else if (value instanceof Boolean) {

434

jaegerSpan.setTag(key, (Boolean) value);

435

}

436

}

437

438

// Set scope as a tag

439

jaegerSpan.setTag("scope", span.getScope());

440

441

// Finish span with end timestamp

442

jaegerSpan.finish(span.getEndTsMillis() * 1000);

443

}

444

445

@Override

446

public void close() {

447

if (tracer != null) {

448

tracer.close();

449

}

450

}

451

}

452

453

// Console trace reporter for debugging

454

public class ConsoleTraceReporter implements TraceReporter {

455

private boolean includeAttributes;

456

private String dateFormat;

457

458

@Override

459

public void open(MetricConfig config) {

460

this.includeAttributes = config.getBoolean("include.attributes", true);

461

this.dateFormat = config.getString("date.format", "yyyy-MM-dd HH:mm:ss.SSS");

462

}

463

464

@Override

465

public void notifyOfAddedSpan(Span span) {

466

SimpleDateFormat formatter = new SimpleDateFormat(dateFormat);

467

468

System.out.println("=== SPAN ===");

469

System.out.println("Name: " + span.getName());

470

System.out.println("Scope: " + span.getScope());

471

System.out.println("Start: " + formatter.format(new Date(span.getStartTsMillis())));

472

System.out.println("End: " + formatter.format(new Date(span.getEndTsMillis())));

473

System.out.println("Duration: " + (span.getEndTsMillis() - span.getStartTsMillis()) + "ms");

474

475

if (includeAttributes && !span.getAttributes().isEmpty()) {

476

System.out.println("Attributes:");

477

span.getAttributes().forEach((key, value) ->

478

System.out.println(" " + key + ": " + value));

479

}

480

481

System.out.println("============");

482

}

483

484

@Override

485

public void close() {

486

System.out.println("Console trace reporter closed");

487

}

488

}

489

490

// Batching trace reporter

491

public class BatchingTraceReporter implements TraceReporter {

492

private final List<Span> spanBuffer = new ArrayList<>();

493

private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

494

private int batchSize;

495

private long flushInterval;

496

private String endpoint;

497

498

@Override

499

public void open(MetricConfig config) {

500

this.batchSize = config.getInteger("batch.size", 100);

501

this.flushInterval = config.getLong("flush.interval", 5000); // 5 seconds

502

this.endpoint = config.getString("endpoint", "http://localhost:9411/api/v2/spans");

503

504

// Schedule periodic flush

505

scheduler.scheduleAtFixedRate(this::flushSpans, flushInterval, flushInterval, TimeUnit.MILLISECONDS);

506

}

507

508

@Override

509

public synchronized void notifyOfAddedSpan(Span span) {

510

spanBuffer.add(span);

511

512

if (spanBuffer.size() >= batchSize) {

513

flushSpans();

514

}

515

}

516

517

private synchronized void flushSpans() {

518

if (spanBuffer.isEmpty()) {

519

return;

520

}

521

522

List<Span> toFlush = new ArrayList<>(spanBuffer);

523

spanBuffer.clear();

524

525

// Send spans asynchronously

526

CompletableFuture.runAsync(() -> sendSpans(toFlush));

527

}

528

529

private void sendSpans(List<Span> spans) {

530

try {

531

// Convert spans to JSON and send to endpoint

532

String json = convertSpansToJson(spans);

533

sendToEndpoint(json);

534

} catch (Exception e) {

535

System.err.println("Failed to send spans: " + e.getMessage());

536

}

537

}

538

539

@Override

540

public void close() {

541

flushSpans(); // Flush remaining spans

542

scheduler.shutdown();

543

}

544

}

545

```

546

547

### TraceReporterFactory Interface

548

549

Factory interface for creating trace reporters.

550

551

```java { .api }

552

/**

553

* Factory for creating TraceReporter instances.

554

*/

555

@Experimental

556

public interface TraceReporterFactory {

557

/**

558

* Creates a new trace reporter.

559

* @param properties configured properties for the reporter

560

* @return created trace reporter

561

*/

562

TraceReporter createTraceReporter(Properties properties);

563

}

564

```

565

566

**Usage Examples:**

567

568

```java

569

// Factory implementation

570

public class JaegerTraceReporterFactory implements TraceReporterFactory {

571

@Override

572

public TraceReporter createTraceReporter(Properties properties) {

573

return new JaegerTraceReporter();

574

}

575

}

576

577

// Configurable factory

578

public class ConfigurableTraceReporterFactory implements TraceReporterFactory {

579

@Override

580

public TraceReporter createTraceReporter(Properties properties) {

581

String type = properties.getProperty("type", "console");

582

583

switch (type.toLowerCase()) {

584

case "jaeger":

585

return new JaegerTraceReporter();

586

case "zipkin":

587

return new ZipkinTraceReporter();

588

case "console":

589

return new ConsoleTraceReporter();

590

default:

591

throw new IllegalArgumentException("Unknown trace reporter type: " + type);

592

}

593

}

594

}

595

```