or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cluster-management.mdconfiguration.mdcore-api.mddata-structures.mdindex.mdsql-service.mdstream-processing.md

stream-processing.mddocs/

0

# Stream Processing (Jet)

1

2

Hazelcast Jet is a distributed stream and batch processing engine built into Hazelcast. It provides high-performance data processing with low-latency and high-throughput capabilities for both real-time streaming and batch processing workloads.

3

4

## JetService Interface

5

6

The main entry point for Jet operations within a Hazelcast instance.

7

8

```java { .api }

9

import com.hazelcast.jet.JetService;

10

import com.hazelcast.jet.Job;

11

import com.hazelcast.jet.JobStateSnapshot;

12

import com.hazelcast.jet.Observable;

13

import com.hazelcast.jet.config.JetConfig;

14

import com.hazelcast.jet.pipeline.Pipeline;

15

import com.hazelcast.jet.core.DAG;

16

import java.util.List;

17

import java.util.Collection;

18

19

public interface JetService {

20

// Configuration

21

JetConfig getConfig();

22

23

// Job creation

24

Job newJob(Pipeline pipeline);

25

Job newJob(DAG dag);

26

Job newJob(Pipeline pipeline, JobConfig config);

27

Job newJob(DAG dag, JobConfig config);

28

29

// Job creation with conditions

30

Job newJobIfAbsent(Pipeline pipeline, JobConfig config);

31

Job newJobIfAbsent(DAG dag, JobConfig config);

32

33

// Lightweight jobs

34

Job newLightJob(Pipeline pipeline);

35

Job newLightJob(DAG dag);

36

37

// Job management

38

List<Job> getJobs();

39

List<Job> getJobs(String name);

40

Job getJob(long jobId);

41

Job getJob(String name);

42

43

// Snapshots

44

JobStateSnapshot getJobStateSnapshot(String name);

45

Collection<JobStateSnapshot> getJobStateSnapshots();

46

47

// Observables

48

<T> Observable<T> getObservable(String name);

49

<T> Observable<T> newObservable();

50

}

51

```

52

53

### Getting JetService

54

55

```java { .api }

56

HazelcastInstance hz = Hazelcast.newHazelcastInstance();

57

JetService jet = hz.getJet();

58

```

59

60

## Job Management

61

62

### Job Interface

63

64

```java { .api }

65

import com.hazelcast.jet.Job;

66

import com.hazelcast.jet.core.JobStatus;

67

import com.hazelcast.jet.core.JobSuspensionCause;

68

import com.hazelcast.jet.core.metrics.JobMetrics;

69

import java.util.concurrent.CompletableFuture;

70

import java.util.concurrent.CancellationException;

71

72

public interface Job {

73

// Job identification

74

long getId();

75

String getName();

76

77

// Job status and control

78

JobStatus getStatus();

79

CompletableFuture<Void> getFuture();

80

81

// Job lifecycle

82

void cancel() throws CancellationException;

83

Job suspend();

84

Job resume();

85

Job restart();

86

87

// Suspension information

88

JobSuspensionCause getSuspensionCause();

89

90

// Metrics and monitoring

91

JobMetrics getMetrics();

92

93

// Snapshots

94

JobStateSnapshot cancelAndExportSnapshot(String name);

95

JobStateSnapshot exportSnapshot(String name);

96

97

// Configuration

98

JobConfig getConfig();

99

}

100

```

101

102

### JobConfig Class

103

104

```java { .api }

105

import com.hazelcast.jet.config.JobConfig;

106

import com.hazelcast.jet.config.ProcessingGuarantee;

107

import java.io.File;

108

import java.net.URL;

109

import java.util.List;

110

111

public class JobConfig {

112

// Job identification

113

JobConfig setName(String name);

114

String getName();

115

116

// Processing guarantees

117

JobConfig setProcessingGuarantee(ProcessingGuarantee processingGuarantee);

118

ProcessingGuarantee getProcessingGuarantee();

119

120

// Snapshot configuration

121

JobConfig setSnapshotIntervalMillis(long snapshotIntervalMillis);

122

long getSnapshotIntervalMillis();

123

124

JobConfig setAutoScaling(boolean enabled);

125

boolean isAutoScaling();

126

127

// Resource management

128

JobConfig setSplitBrainProtectionName(String splitBrainProtectionName);

129

String getSplitBrainProtectionName();

130

131

// Class loading

132

JobConfig addClass(Class<?>... classes);

133

JobConfig addJar(File jarFile);

134

JobConfig addJar(URL jarUrl);

135

JobConfig addClasspathResource(URL url);

136

JobConfig addClasspathResource(URL url, String id);

137

138

List<String> getJars();

139

List<String> getClasspathResources();

140

141

// Serialization

142

JobConfig setSerializer(Class<?> clazz, Class<?> serializerClass);

143

Map<String, String> getSerializers();

144

145

// Metrics

146

JobConfig setStoreMetricsAfterJobCompletion(boolean storeMetricsAfterJobCompletion);

147

boolean isStoreMetricsAfterJobCompletion();

148

149

JobConfig setMetricsEnabled(boolean enabled);

150

boolean isMetricsEnabled();

151

}

152

```

153

154

### Job Status Handling

155

156

```java { .api }

157

import com.hazelcast.jet.core.JobStatus;

158

159

public enum JobStatus {

160

NOT_RUNNING,

161

STARTING,

162

RUNNING,

163

SUSPENDED,

164

SUSPENDED_EXPORTING_SNAPSHOT,

165

COMPLETING,

166

FAILED,

167

COMPLETED,

168

RESTARTING

169

}

170

171

// Job status monitoring

172

Job job = jet.newJob(pipeline);

173

174

// Check status

175

JobStatus status = job.getStatus();

176

System.out.println("Job status: " + status);

177

178

// Wait for completion

179

try {

180

job.getFuture().get(); // Blocks until job completes

181

System.out.println("Job completed successfully");

182

} catch (Exception e) {

183

System.err.println("Job failed: " + e.getMessage());

184

}

185

186

// Job control

187

job.suspend(); // Suspend job

188

job.resume(); // Resume suspended job

189

job.restart(); // Restart job

190

job.cancel(); // Cancel job

191

```

192

193

## Pipeline API

194

195

### Pipeline Class

196

197

High-level API for building data processing pipelines.

198

199

```java { .api }

200

import com.hazelcast.jet.pipeline.Pipeline;

201

import com.hazelcast.jet.pipeline.BatchSource;

202

import com.hazelcast.jet.pipeline.StreamSource;

203

import com.hazelcast.jet.pipeline.BatchStage;

204

import com.hazelcast.jet.pipeline.StreamStage;

205

import com.hazelcast.jet.pipeline.Sink;

206

207

public class Pipeline {

208

// Pipeline creation

209

public static Pipeline create();

210

211

// Batch sources

212

public <T> BatchStage<T> readFrom(BatchSource<T> source);

213

214

// Stream sources

215

public <T> StreamStage<T> readFrom(StreamSource<T> source);

216

217

// Drawing the DAG

218

public String toDotString();

219

}

220

```

221

222

### Sources

223

224

```java { .api }

225

import com.hazelcast.jet.pipeline.Sources;

226

import com.hazelcast.jet.pipeline.BatchSource;

227

import com.hazelcast.jet.pipeline.StreamSource;

228

import com.hazelcast.function.SupplierEx;

229

import java.util.Map;

230

231

public final class Sources {

232

// Hazelcast data structures

233

public static <K, V> BatchSource<Entry<K, V>> map(String mapName);

234

public static <K, V> BatchSource<Entry<K, V>> map(String mapName, Predicate<K, V> predicate, Projection<? super Entry<K, V>, T> projection);

235

236

public static <T> BatchSource<T> list(String listName);

237

public static <T> BatchSource<T> cache(String cacheName);

238

239

// Streaming from data structures

240

public static <K, V> StreamSource<Entry<K, V>> mapJournal(String mapName, JournalInitialPosition initialPos);

241

public static <T> StreamSource<T> cacheJournal(String cacheName, JournalInitialPosition initialPos);

242

243

// Files

244

public static BatchSource<String> files(String directory);

245

public static BatchSource<String> files(String directory, String glob, boolean sharedFileSystem);

246

public static <T> BatchSource<T> filesBuilder(String directory);

247

248

// Streaming files

249

public static StreamSource<String> fileWatcher(String watchedDirectory);

250

251

// Collections and arrays

252

public static <T> BatchSource<T> fromProcessor(String name, SupplierEx<Processor> supplier);

253

254

// Socket

255

public static StreamSource<String> socket(String host, int port, Charset charset);

256

257

// Test sources

258

public static <T> BatchSource<T> empty();

259

public static StreamSource<Long> streamFromProcessor(String name, SupplierEx<Processor> supplier);

260

}

261

```

262

263

### Sinks

264

265

```java { .api }

266

import com.hazelcast.jet.pipeline.Sinks;

267

import com.hazelcast.jet.pipeline.Sink;

268

import com.hazelcast.function.FunctionEx;

269

270

public final class Sinks {

271

// Hazelcast data structures

272

public static <T, K, V> Sink<T> map(String mapName);

273

public static <T, K, V> Sink<T> map(String mapName, FunctionEx<? super T, ? extends K> toKeyFn, FunctionEx<? super T, ? extends V> toValueFn);

274

275

public static <T> Sink<T> list(String listName);

276

public static <T> Sink<T> cache(String cacheName);

277

278

// Remote maps and caches

279

public static <T, K, V> Sink<T> remoteMap(String mapName, ClientConfig clientConfig);

280

public static <T, K, V> Sink<T> remoteCache(String cacheName, ClientConfig clientConfig);

281

282

// Files

283

public static <T> Sink<T> files(String directoryName);

284

public static <T> Sink<T> files(String directoryName, FunctionEx<? super T, String> toStringFn);

285

286

// Socket

287

public static Sink<String> socket(String host, int port);

288

public static <T> Sink<T> socket(String host, int port, FunctionEx<? super T, String> toStringFn);

289

290

// Logging and testing

291

public static <T> Sink<T> logger();

292

public static <T> Sink<T> logger(FunctionEx<? super T, String> toStringFn);

293

public static <T> Sink<T> noop();

294

}

295

```

296

297

## Pipeline Stages

298

299

### BatchStage Interface

300

301

```java { .api }

302

import com.hazelcast.jet.pipeline.BatchStage;

303

import com.hazelcast.jet.pipeline.BatchStageWithKey;

304

import com.hazelcast.jet.aggregate.AggregateOperation;

305

import com.hazelcast.function.FunctionEx;

306

import com.hazelcast.function.PredicateEx;

307

import com.hazelcast.function.ConsumerEx;

308

309

public interface BatchStage<T> extends GeneralStage<T> {

310

// Transformation operations

311

<R> BatchStage<R> map(FunctionEx<? super T, ? extends R> mapFn);

312

<R> BatchStage<R> flatMap(FunctionEx<? super T, ? extends Traverser<R>> flatMapFn);

313

BatchStage<T> filter(PredicateEx<? super T> filterFn);

314

315

// Grouping

316

<K> BatchStageWithKey<T, K> groupingKey(FunctionEx<? super T, ? extends K> keyFn);

317

318

// Aggregation

319

<A, R> BatchStage<R> aggregate(AggregateOperation<? super T, A, R> aggrOp);

320

321

// Joining

322

<T1, R> BatchStage<R> hashJoin(BatchStage<T1> stage1, JoinClause<K, ? super T, ? super T1, ? extends R> joinClause);

323

324

// Sorting

325

BatchStage<T> sort();

326

BatchStage<T> sort(ComparatorEx<? super T> comparatorFn);

327

328

// Distinct

329

BatchStage<T> distinct();

330

BatchStage<T> distinct(FunctionEx<? super T, ?> keyFn);

331

332

// Peek (for debugging)

333

BatchStage<T> peek(ConsumerEx<? super T> peekFn);

334

335

// Terminal operations

336

void writeTo(Sink<? super T> sink);

337

338

// Custom transformations

339

<R> BatchStage<R> customTransform(String stageName, SupplierEx<Processor> procSupplier);

340

}

341

```

342

343

### StreamStage Interface

344

345

```java { .api }

346

import com.hazelcast.jet.pipeline.StreamStage;

347

import com.hazelcast.jet.pipeline.StreamStageWithKey;

348

import com.hazelcast.jet.pipeline.WindowDefinition;

349

350

public interface StreamStage<T> extends GeneralStage<T> {

351

// Transformation operations

352

<R> StreamStage<R> map(FunctionEx<? super T, ? extends R> mapFn);

353

<R> StreamStage<R> flatMap(FunctionEx<? super T, ? extends Traverser<R>> flatMapFn);

354

StreamStage<T> filter(PredicateEx<? super T> filterFn);

355

356

// Grouping and keying

357

<K> StreamStageWithKey<T, K> groupingKey(FunctionEx<? super T, ? extends K> keyFn);

358

359

// Windowing

360

<R> StreamStage<R> window(WindowDefinition wDef);

361

362

// Stateful mapping

363

<S, R> StreamStage<R> mapStateful(SupplierEx<? extends S> createFn, BiFunctionEx<? super S, ? super T, ? extends R> statefulMapFn);

364

365

// Rebalancing

366

StreamStage<T> rebalance();

367

StreamStage<T> rebalance(FunctionEx<? super T, ?> keyFn);

368

369

// Terminal operations

370

void writeTo(Sink<? super T> sink);

371

372

// Custom transformations

373

<R> StreamStage<R> customTransform(String stageName, SupplierEx<Processor> procSupplier);

374

}

375

```

376

377

## Pipeline Examples

378

379

### Basic Batch Processing

380

381

```java { .api }

382

import com.hazelcast.jet.pipeline.Pipeline;

383

import com.hazelcast.jet.pipeline.Sources;

384

import com.hazelcast.jet.pipeline.Sinks;

385

import com.hazelcast.query.Predicates;

386

387

// Word count example

388

Pipeline pipeline = Pipeline.create();

389

pipeline.readFrom(Sources.files("/input/directory"))

390

.flatMap(line -> Traversers.traverseArray(line.toLowerCase().split("\\W+")))

391

.filter(word -> !word.isEmpty())

392

.groupingKey(word -> word)

393

.aggregate(AggregateOperations.counting())

394

.writeTo(Sinks.map("word-counts"));

395

396

Job job = jet.newJob(pipeline);

397

job.getFuture().get(); // Wait for completion

398

```

399

400

### Stream Processing with Windows

401

402

```java { .api }

403

import com.hazelcast.jet.pipeline.WindowDefinition;

404

import static com.hazelcast.jet.aggregate.AggregateOperations.*;

405

406

// Real-time analytics pipeline

407

Pipeline pipeline = Pipeline.create();

408

pipeline.readFrom(Sources.mapJournal("events", JournalInitialPosition.START_FROM_OLDEST))

409

.withIngestionTimestamps()

410

.<String, Long>map(entry -> {

411

String event = entry.getValue().toString();

412

return Util.entry(extractUserId(event), extractAmount(event));

413

})

414

.groupingKey(Entry::getKey)

415

.window(WindowDefinition.sliding(Duration.ofMinutes(5), Duration.ofMinutes(1)))

416

.aggregate(summingLong(Entry::getValue))

417

.writeTo(Sinks.map("user-totals"));

418

419

Job streamJob = jet.newJob(pipeline);

420

```

421

422

### Complex Data Processing

423

424

```java { .api }

425

// ETL Pipeline with enrichment

426

Pipeline pipeline = Pipeline.create();

427

428

// Main data stream

429

StreamStage<Order> orders = pipeline

430

.readFrom(Sources.mapJournal("orders", JournalInitialPosition.START_FROM_CURRENT))

431

.map(entry -> parseOrder(entry.getValue()));

432

433

// Reference data

434

BatchStage<Entry<String, Customer>> customers = pipeline

435

.readFrom(Sources.map("customers"));

436

437

// Join and enrich

438

orders.groupingKey(Order::getCustomerId)

439

.hashJoin(customers, JoinClause.joinMapEntries(Customer::getId))

440

.map(item -> enrichOrder(item.get1(), item.get2()))

441

.filter(enrichedOrder -> enrichedOrder.getAmount() > 1000)

442

.writeTo(Sinks.map("high-value-orders"));

443

444

Job enrichmentJob = jet.newJob(pipeline);

445

```

446

447

## Aggregations

448

449

### Standard Aggregations

450

451

```java { .api }

452

import com.hazelcast.jet.aggregate.AggregateOperations;

453

import com.hazelcast.jet.aggregate.AggregateOperation;

454

455

// Built-in aggregation operations

456

AggregateOperation<Object, ?, Long> counting = AggregateOperations.counting();

457

AggregateOperation<Long, ?, Long> summingLong = AggregateOperations.summingLong(Long::longValue);

458

AggregateOperation<Double, ?, Double> averagingDouble = AggregateOperations.averagingDouble(Double::doubleValue);

459

AggregateOperation<Comparable, ?, Comparable> maxBy = AggregateOperations.maxBy(Comparable::compareTo);

460

AggregateOperation<Comparable, ?, Comparable> minBy = AggregateOperations.minBy(Comparable::compareTo);

461

462

// Usage in pipeline

463

pipeline.readFrom(Sources.list("numbers"))

464

.aggregate(summingLong(Number::longValue))

465

.writeTo(Sinks.logger());

466

```

467

468

### Custom Aggregations

469

470

```java { .api }

471

import com.hazelcast.jet.aggregate.AggregateOperation;

472

473

// Custom aggregation for calculating standard deviation

474

AggregateOperation<Double, MutableReference<Stats>, Double> stdDev =

475

AggregateOperation

476

.withCreate(() -> new MutableReference<>(new Stats()))

477

.andAccumulate((MutableReference<Stats> acc, Double value) -> {

478

Stats stats = acc.get();

479

stats.count++;

480

stats.sum += value;

481

stats.sumSquares += value * value;

482

})

483

.andCombine((acc1, acc2) -> {

484

Stats stats1 = acc1.get();

485

Stats stats2 = acc2.get();

486

stats1.count += stats2.count;

487

stats1.sum += stats2.sum;

488

stats1.sumSquares += stats2.sumSquares;

489

})

490

.andExportFinish(acc -> {

491

Stats stats = acc.get();

492

double mean = stats.sum / stats.count;

493

double variance = (stats.sumSquares / stats.count) - (mean * mean);

494

return Math.sqrt(variance);

495

});

496

497

// Usage

498

pipeline.readFrom(Sources.list("measurements"))

499

.aggregate(stdDev)

500

.writeTo(Sinks.logger());

501

```

502

503

## Advanced Features

504

505

### Job State Snapshots

506

507

```java { .api }

508

// Create job with snapshot configuration

509

JobConfig config = new JobConfig();

510

config.setName("streaming-analytics");

511

config.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);

512

config.setSnapshotIntervalMillis(5000); // 5 seconds

513

514

Job job = jet.newJob(pipeline, config);

515

516

// Export snapshot

517

JobStateSnapshot snapshot = job.exportSnapshot("backup-snapshot");

518

519

// Start new job from snapshot

520

JobConfig restoreConfig = new JobConfig();

521

restoreConfig.setInitialSnapshotName("backup-snapshot");

522

Job restoredJob = jet.newJob(newPipeline, restoreConfig);

523

```

524

525

### Processing Guarantees

526

527

```java { .api }

528

import com.hazelcast.jet.config.ProcessingGuarantee;

529

530

JobConfig config = new JobConfig();

531

532

// At-least-once processing (default)

533

config.setProcessingGuarantee(ProcessingGuarantee.AT_LEAST_ONCE);

534

535

// Exactly-once processing (with snapshots)

536

config.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);

537

538

// No guarantee (best performance)

539

config.setProcessingGuarantee(ProcessingGuarantee.NONE);

540

```

541

542

### Observables

543

544

```java { .api }

545

import com.hazelcast.jet.Observable;

546

547

// Create observable

548

Observable<String> observable = jet.newObservable();

549

550

// Configure pipeline to write to observable

551

pipeline.readFrom(Sources.list("input"))

552

.map(String::toUpperCase)

553

.writeTo(Sinks.observable("results"));

554

555

Job job = jet.newJob(pipeline);

556

557

// Observe results

558

Observable<String> results = jet.getObservable("results");

559

results.addObserver(result -> {

560

System.out.println("Result: " + result);

561

});

562

```

563

564

## Monitoring and Metrics

565

566

### Job Metrics

567

568

```java { .api }

569

import com.hazelcast.jet.core.metrics.JobMetrics;

570

import com.hazelcast.jet.core.metrics.Measurement;

571

572

Job job = jet.getJob("analytics-job");

573

JobMetrics metrics = job.getMetrics();

574

575

// Iterate through all measurements

576

for (Measurement measurement : metrics) {

577

System.out.println("Metric: " + measurement.metric() +

578

", Value: " + measurement.value() +

579

", Unit: " + measurement.unit());

580

}

581

582

// Get specific metrics

583

long itemsProcessed = metrics.get("[vertex=map-stage]/itemsOut");

584

long throughput = metrics.get("[vertex=map-stage]/throughput");

585

```

586

587

### Job Configuration for Monitoring

588

589

```java { .api }

590

JobConfig config = new JobConfig();

591

config.setMetricsEnabled(true);

592

config.setStoreMetricsAfterJobCompletion(true);

593

594

Job job = jet.newJob(pipeline, config);

595

596

// Monitor job progress

597

while (!job.getStatus().isTerminal()) {

598

JobMetrics currentMetrics = job.getMetrics();

599

// Process metrics...

600

Thread.sleep(1000);

601

}

602

```