0
# Stream Processing (Jet)
1
2
Hazelcast Jet is a distributed stream and batch processing engine built into Hazelcast. It provides high-performance data processing with low-latency and high-throughput capabilities for both real-time streaming and batch processing workloads.
3
4
## JetService Interface
5
6
The main entry point for Jet operations within a Hazelcast instance.
7
8
```java { .api }
9
import com.hazelcast.jet.JetService;
10
import com.hazelcast.jet.Job;
11
import com.hazelcast.jet.JobStateSnapshot;
12
import com.hazelcast.jet.Observable;
13
import com.hazelcast.jet.config.JetConfig;
14
import com.hazelcast.jet.pipeline.Pipeline;
15
import com.hazelcast.jet.core.DAG;
16
import java.util.List;
17
import java.util.Collection;
18
19
public interface JetService {
20
// Configuration
21
JetConfig getConfig();
22
23
// Job creation
24
Job newJob(Pipeline pipeline);
25
Job newJob(DAG dag);
26
Job newJob(Pipeline pipeline, JobConfig config);
27
Job newJob(DAG dag, JobConfig config);
28
29
// Job creation with conditions
30
Job newJobIfAbsent(Pipeline pipeline, JobConfig config);
31
Job newJobIfAbsent(DAG dag, JobConfig config);
32
33
// Lightweight jobs
34
Job newLightJob(Pipeline pipeline);
35
Job newLightJob(DAG dag);
36
37
// Job management
38
List<Job> getJobs();
39
List<Job> getJobs(String name);
40
Job getJob(long jobId);
41
Job getJob(String name);
42
43
// Snapshots
44
JobStateSnapshot getJobStateSnapshot(String name);
45
Collection<JobStateSnapshot> getJobStateSnapshots();
46
47
// Observables
48
<T> Observable<T> getObservable(String name);
49
<T> Observable<T> newObservable();
50
}
51
```
52
53
### Getting JetService
54
55
```java { .api }
56
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
57
JetService jet = hz.getJet();
58
```
59
60
## Job Management
61
62
### Job Interface
63
64
```java { .api }
65
import com.hazelcast.jet.Job;
66
import com.hazelcast.jet.core.JobStatus;
67
import com.hazelcast.jet.core.JobSuspensionCause;
68
import com.hazelcast.jet.core.metrics.JobMetrics;
69
import java.util.concurrent.CompletableFuture;
70
import java.util.concurrent.CancellationException;
71
72
public interface Job {
73
// Job identification
74
long getId();
75
String getName();
76
77
// Job status and control
78
JobStatus getStatus();
79
CompletableFuture<Void> getFuture();
80
81
// Job lifecycle
82
void cancel() throws CancellationException;
83
Job suspend();
84
Job resume();
85
Job restart();
86
87
// Suspension information
88
JobSuspensionCause getSuspensionCause();
89
90
// Metrics and monitoring
91
JobMetrics getMetrics();
92
93
// Snapshots
94
JobStateSnapshot cancelAndExportSnapshot(String name);
95
JobStateSnapshot exportSnapshot(String name);
96
97
// Configuration
98
JobConfig getConfig();
99
}
100
```
101
102
### JobConfig Class
103
104
```java { .api }
105
import com.hazelcast.jet.config.JobConfig;
106
import com.hazelcast.jet.config.ProcessingGuarantee;
107
import java.io.File;
108
import java.net.URL;
109
import java.util.List;
110
111
public class JobConfig {
112
// Job identification
113
JobConfig setName(String name);
114
String getName();
115
116
// Processing guarantees
117
JobConfig setProcessingGuarantee(ProcessingGuarantee processingGuarantee);
118
ProcessingGuarantee getProcessingGuarantee();
119
120
// Snapshot configuration
121
JobConfig setSnapshotIntervalMillis(long snapshotIntervalMillis);
122
long getSnapshotIntervalMillis();
123
124
JobConfig setAutoScaling(boolean enabled);
125
boolean isAutoScaling();
126
127
// Resource management
128
JobConfig setSplitBrainProtectionName(String splitBrainProtectionName);
129
String getSplitBrainProtectionName();
130
131
// Class loading
132
JobConfig addClass(Class<?>... classes);
133
JobConfig addJar(File jarFile);
134
JobConfig addJar(URL jarUrl);
135
JobConfig addClasspathResource(URL url);
136
JobConfig addClasspathResource(URL url, String id);
137
138
List<String> getJars();
139
List<String> getClasspathResources();
140
141
// Serialization
142
JobConfig setSerializer(Class<?> clazz, Class<?> serializerClass);
143
Map<String, String> getSerializers();
144
145
// Metrics
146
JobConfig setStoreMetricsAfterJobCompletion(boolean storeMetricsAfterJobCompletion);
147
boolean isStoreMetricsAfterJobCompletion();
148
149
JobConfig setMetricsEnabled(boolean enabled);
150
boolean isMetricsEnabled();
151
}
152
```
153
154
### Job Status Handling
155
156
```java { .api }
157
import com.hazelcast.jet.core.JobStatus;
158
159
public enum JobStatus {
160
NOT_RUNNING,
161
STARTING,
162
RUNNING,
163
SUSPENDED,
164
SUSPENDED_EXPORTING_SNAPSHOT,
165
COMPLETING,
166
FAILED,
167
COMPLETED,
168
RESTARTING
169
}
170
171
// Job status monitoring
172
Job job = jet.newJob(pipeline);
173
174
// Check status
175
JobStatus status = job.getStatus();
176
System.out.println("Job status: " + status);
177
178
// Wait for completion
179
try {
180
job.getFuture().get(); // Blocks until job completes
181
System.out.println("Job completed successfully");
182
} catch (Exception e) {
183
System.err.println("Job failed: " + e.getMessage());
184
}
185
186
// Job control
187
job.suspend(); // Suspend job
188
job.resume(); // Resume suspended job
189
job.restart(); // Restart job
190
job.cancel(); // Cancel job
191
```
192
193
## Pipeline API
194
195
### Pipeline Class
196
197
High-level API for building data processing pipelines.
198
199
```java { .api }
200
import com.hazelcast.jet.pipeline.Pipeline;
201
import com.hazelcast.jet.pipeline.BatchSource;
202
import com.hazelcast.jet.pipeline.StreamSource;
203
import com.hazelcast.jet.pipeline.BatchStage;
204
import com.hazelcast.jet.pipeline.StreamStage;
205
import com.hazelcast.jet.pipeline.Sink;
206
207
public class Pipeline {
208
// Pipeline creation
209
public static Pipeline create();
210
211
// Batch sources
212
public <T> BatchStage<T> readFrom(BatchSource<T> source);
213
214
// Stream sources
215
public <T> StreamStage<T> readFrom(StreamSource<T> source);
216
217
// Drawing the DAG
218
public String toDotString();
219
}
220
```
221
222
### Sources
223
224
```java { .api }
225
import com.hazelcast.jet.pipeline.Sources;
226
import com.hazelcast.jet.pipeline.BatchSource;
227
import com.hazelcast.jet.pipeline.StreamSource;
228
import com.hazelcast.function.SupplierEx;
229
import java.util.Map;
230
231
public final class Sources {
232
// Hazelcast data structures
233
public static <K, V> BatchSource<Entry<K, V>> map(String mapName);
234
public static <K, V> BatchSource<Entry<K, V>> map(String mapName, Predicate<K, V> predicate, Projection<? super Entry<K, V>, T> projection);
235
236
public static <T> BatchSource<T> list(String listName);
237
public static <T> BatchSource<T> cache(String cacheName);
238
239
// Streaming from data structures
240
public static <K, V> StreamSource<Entry<K, V>> mapJournal(String mapName, JournalInitialPosition initialPos);
241
public static <T> StreamSource<T> cacheJournal(String cacheName, JournalInitialPosition initialPos);
242
243
// Files
244
public static BatchSource<String> files(String directory);
245
public static BatchSource<String> files(String directory, String glob, boolean sharedFileSystem);
246
public static <T> BatchSource<T> filesBuilder(String directory);
247
248
// Streaming files
249
public static StreamSource<String> fileWatcher(String watchedDirectory);
250
251
// Collections and arrays
252
public static <T> BatchSource<T> fromProcessor(String name, SupplierEx<Processor> supplier);
253
254
// Socket
255
public static StreamSource<String> socket(String host, int port, Charset charset);
256
257
// Test sources
258
public static <T> BatchSource<T> empty();
259
public static StreamSource<Long> streamFromProcessor(String name, SupplierEx<Processor> supplier);
260
}
261
```
262
263
### Sinks
264
265
```java { .api }
266
import com.hazelcast.jet.pipeline.Sinks;
267
import com.hazelcast.jet.pipeline.Sink;
268
import com.hazelcast.function.FunctionEx;
269
270
public final class Sinks {
271
// Hazelcast data structures
272
public static <T, K, V> Sink<T> map(String mapName);
273
public static <T, K, V> Sink<T> map(String mapName, FunctionEx<? super T, ? extends K> toKeyFn, FunctionEx<? super T, ? extends V> toValueFn);
274
275
public static <T> Sink<T> list(String listName);
276
public static <T> Sink<T> cache(String cacheName);
277
278
// Remote maps and caches
279
public static <T, K, V> Sink<T> remoteMap(String mapName, ClientConfig clientConfig);
280
public static <T, K, V> Sink<T> remoteCache(String cacheName, ClientConfig clientConfig);
281
282
// Files
283
public static <T> Sink<T> files(String directoryName);
284
public static <T> Sink<T> files(String directoryName, FunctionEx<? super T, String> toStringFn);
285
286
// Socket
287
public static Sink<String> socket(String host, int port);
288
public static <T> Sink<T> socket(String host, int port, FunctionEx<? super T, String> toStringFn);
289
290
// Logging and testing
291
public static <T> Sink<T> logger();
292
public static <T> Sink<T> logger(FunctionEx<? super T, String> toStringFn);
293
public static <T> Sink<T> noop();
294
}
295
```
296
297
## Pipeline Stages
298
299
### BatchStage Interface
300
301
```java { .api }
302
import com.hazelcast.jet.pipeline.BatchStage;
303
import com.hazelcast.jet.pipeline.BatchStageWithKey;
304
import com.hazelcast.jet.aggregate.AggregateOperation;
305
import com.hazelcast.function.FunctionEx;
306
import com.hazelcast.function.PredicateEx;
307
import com.hazelcast.function.ConsumerEx;
308
309
public interface BatchStage<T> extends GeneralStage<T> {
310
// Transformation operations
311
<R> BatchStage<R> map(FunctionEx<? super T, ? extends R> mapFn);
312
<R> BatchStage<R> flatMap(FunctionEx<? super T, ? extends Traverser<R>> flatMapFn);
313
BatchStage<T> filter(PredicateEx<? super T> filterFn);
314
315
// Grouping
316
<K> BatchStageWithKey<T, K> groupingKey(FunctionEx<? super T, ? extends K> keyFn);
317
318
// Aggregation
319
<A, R> BatchStage<R> aggregate(AggregateOperation<? super T, A, R> aggrOp);
320
321
// Joining
322
<T1, R> BatchStage<R> hashJoin(BatchStage<T1> stage1, JoinClause<K, ? super T, ? super T1, ? extends R> joinClause);
323
324
// Sorting
325
BatchStage<T> sort();
326
BatchStage<T> sort(ComparatorEx<? super T> comparatorFn);
327
328
// Distinct
329
BatchStage<T> distinct();
330
BatchStage<T> distinct(FunctionEx<? super T, ?> keyFn);
331
332
// Peek (for debugging)
333
BatchStage<T> peek(ConsumerEx<? super T> peekFn);
334
335
// Terminal operations
336
void writeTo(Sink<? super T> sink);
337
338
// Custom transformations
339
<R> BatchStage<R> customTransform(String stageName, SupplierEx<Processor> procSupplier);
340
}
341
```
342
343
### StreamStage Interface
344
345
```java { .api }
346
import com.hazelcast.jet.pipeline.StreamStage;
347
import com.hazelcast.jet.pipeline.StreamStageWithKey;
348
import com.hazelcast.jet.pipeline.WindowDefinition;
349
350
public interface StreamStage<T> extends GeneralStage<T> {
351
// Transformation operations
352
<R> StreamStage<R> map(FunctionEx<? super T, ? extends R> mapFn);
353
<R> StreamStage<R> flatMap(FunctionEx<? super T, ? extends Traverser<R>> flatMapFn);
354
StreamStage<T> filter(PredicateEx<? super T> filterFn);
355
356
// Grouping and keying
357
<K> StreamStageWithKey<T, K> groupingKey(FunctionEx<? super T, ? extends K> keyFn);
358
359
// Windowing
360
<R> StreamStage<R> window(WindowDefinition wDef);
361
362
// Stateful mapping
363
<S, R> StreamStage<R> mapStateful(SupplierEx<? extends S> createFn, BiFunctionEx<? super S, ? super T, ? extends R> statefulMapFn);
364
365
// Rebalancing
366
StreamStage<T> rebalance();
367
StreamStage<T> rebalance(FunctionEx<? super T, ?> keyFn);
368
369
// Terminal operations
370
void writeTo(Sink<? super T> sink);
371
372
// Custom transformations
373
<R> StreamStage<R> customTransform(String stageName, SupplierEx<Processor> procSupplier);
374
}
375
```
376
377
## Pipeline Examples
378
379
### Basic Batch Processing
380
381
```java { .api }
382
import com.hazelcast.jet.pipeline.Pipeline;
383
import com.hazelcast.jet.pipeline.Sources;
384
import com.hazelcast.jet.pipeline.Sinks;
385
import com.hazelcast.query.Predicates;
386
387
// Word count example
388
Pipeline pipeline = Pipeline.create();
389
pipeline.readFrom(Sources.files("/input/directory"))
390
.flatMap(line -> Traversers.traverseArray(line.toLowerCase().split("\\W+")))
391
.filter(word -> !word.isEmpty())
392
.groupingKey(word -> word)
393
.aggregate(AggregateOperations.counting())
394
.writeTo(Sinks.map("word-counts"));
395
396
Job job = jet.newJob(pipeline);
397
job.getFuture().get(); // Wait for completion
398
```
399
400
### Stream Processing with Windows
401
402
```java { .api }
403
import com.hazelcast.jet.pipeline.WindowDefinition;
404
import static com.hazelcast.jet.aggregate.AggregateOperations.*;
405
406
// Real-time analytics pipeline
407
Pipeline pipeline = Pipeline.create();
408
pipeline.readFrom(Sources.mapJournal("events", JournalInitialPosition.START_FROM_OLDEST))
409
.withIngestionTimestamps()
410
.<String, Long>map(entry -> {
411
String event = entry.getValue().toString();
412
return Util.entry(extractUserId(event), extractAmount(event));
413
})
414
.groupingKey(Entry::getKey)
415
.window(WindowDefinition.sliding(Duration.ofMinutes(5), Duration.ofMinutes(1)))
416
.aggregate(summingLong(Entry::getValue))
417
.writeTo(Sinks.map("user-totals"));
418
419
Job streamJob = jet.newJob(pipeline);
420
```
421
422
### Complex Data Processing
423
424
```java { .api }
425
// ETL Pipeline with enrichment
426
Pipeline pipeline = Pipeline.create();
427
428
// Main data stream
429
StreamStage<Order> orders = pipeline
430
.readFrom(Sources.mapJournal("orders", JournalInitialPosition.START_FROM_CURRENT))
431
.map(entry -> parseOrder(entry.getValue()));
432
433
// Reference data
434
BatchStage<Entry<String, Customer>> customers = pipeline
435
.readFrom(Sources.map("customers"));
436
437
// Join and enrich
438
orders.groupingKey(Order::getCustomerId)
439
.hashJoin(customers, JoinClause.joinMapEntries(Customer::getId))
440
.map(item -> enrichOrder(item.get1(), item.get2()))
441
.filter(enrichedOrder -> enrichedOrder.getAmount() > 1000)
442
.writeTo(Sinks.map("high-value-orders"));
443
444
Job enrichmentJob = jet.newJob(pipeline);
445
```
446
447
## Aggregations
448
449
### Standard Aggregations
450
451
```java { .api }
452
import com.hazelcast.jet.aggregate.AggregateOperations;
453
import com.hazelcast.jet.aggregate.AggregateOperation;
454
455
// Built-in aggregation operations
456
AggregateOperation<Object, ?, Long> counting = AggregateOperations.counting();
457
AggregateOperation<Long, ?, Long> summingLong = AggregateOperations.summingLong(Long::longValue);
458
AggregateOperation<Double, ?, Double> averagingDouble = AggregateOperations.averagingDouble(Double::doubleValue);
459
AggregateOperation<Comparable, ?, Comparable> maxBy = AggregateOperations.maxBy(Comparable::compareTo);
460
AggregateOperation<Comparable, ?, Comparable> minBy = AggregateOperations.minBy(Comparable::compareTo);
461
462
// Usage in pipeline
463
pipeline.readFrom(Sources.list("numbers"))
464
.aggregate(summingLong(Number::longValue))
465
.writeTo(Sinks.logger());
466
```
467
468
### Custom Aggregations
469
470
```java { .api }
471
import com.hazelcast.jet.aggregate.AggregateOperation;
472
473
// Custom aggregation for calculating standard deviation
474
AggregateOperation<Double, MutableReference<Stats>, Double> stdDev =
475
AggregateOperation
476
.withCreate(() -> new MutableReference<>(new Stats()))
477
.andAccumulate((MutableReference<Stats> acc, Double value) -> {
478
Stats stats = acc.get();
479
stats.count++;
480
stats.sum += value;
481
stats.sumSquares += value * value;
482
})
483
.andCombine((acc1, acc2) -> {
484
Stats stats1 = acc1.get();
485
Stats stats2 = acc2.get();
486
stats1.count += stats2.count;
487
stats1.sum += stats2.sum;
488
stats1.sumSquares += stats2.sumSquares;
489
})
490
.andExportFinish(acc -> {
491
Stats stats = acc.get();
492
double mean = stats.sum / stats.count;
493
double variance = (stats.sumSquares / stats.count) - (mean * mean);
494
return Math.sqrt(variance);
495
});
496
497
// Usage
498
pipeline.readFrom(Sources.list("measurements"))
499
.aggregate(stdDev)
500
.writeTo(Sinks.logger());
501
```
502
503
## Advanced Features
504
505
### Job State Snapshots
506
507
```java { .api }
508
// Create job with snapshot configuration
509
JobConfig config = new JobConfig();
510
config.setName("streaming-analytics");
511
config.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
512
config.setSnapshotIntervalMillis(5000); // 5 seconds
513
514
Job job = jet.newJob(pipeline, config);
515
516
// Export snapshot
517
JobStateSnapshot snapshot = job.exportSnapshot("backup-snapshot");
518
519
// Start new job from snapshot
520
JobConfig restoreConfig = new JobConfig();
521
restoreConfig.setInitialSnapshotName("backup-snapshot");
522
Job restoredJob = jet.newJob(newPipeline, restoreConfig);
523
```
524
525
### Processing Guarantees
526
527
```java { .api }
528
import com.hazelcast.jet.config.ProcessingGuarantee;
529
530
JobConfig config = new JobConfig();
531
532
// At-least-once processing (default)
533
config.setProcessingGuarantee(ProcessingGuarantee.AT_LEAST_ONCE);
534
535
// Exactly-once processing (with snapshots)
536
config.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
537
538
// No guarantee (best performance)
539
config.setProcessingGuarantee(ProcessingGuarantee.NONE);
540
```
541
542
### Observables
543
544
```java { .api }
545
import com.hazelcast.jet.Observable;
546
547
// Create observable
548
Observable<String> observable = jet.newObservable();
549
550
// Configure pipeline to write to observable
551
pipeline.readFrom(Sources.list("input"))
552
.map(String::toUpperCase)
553
.writeTo(Sinks.observable("results"));
554
555
Job job = jet.newJob(pipeline);
556
557
// Observe results
558
Observable<String> results = jet.getObservable("results");
559
results.addObserver(result -> {
560
System.out.println("Result: " + result);
561
});
562
```
563
564
## Monitoring and Metrics
565
566
### Job Metrics
567
568
```java { .api }
569
import com.hazelcast.jet.core.metrics.JobMetrics;
570
import com.hazelcast.jet.core.metrics.Measurement;
571
572
Job job = jet.getJob("analytics-job");
573
JobMetrics metrics = job.getMetrics();
574
575
// Iterate through all measurements
576
for (Measurement measurement : metrics) {
577
System.out.println("Metric: " + measurement.metric() +
578
", Value: " + measurement.value() +
579
", Unit: " + measurement.unit());
580
}
581
582
// Get specific metrics
583
long itemsProcessed = metrics.get("[vertex=map-stage]/itemsOut");
584
long throughput = metrics.get("[vertex=map-stage]/throughput");
585
```
586
587
### Job Configuration for Monitoring
588
589
```java { .api }
590
JobConfig config = new JobConfig();
591
config.setMetricsEnabled(true);
592
config.setStoreMetricsAfterJobCompletion(true);
593
594
Job job = jet.newJob(pipeline, config);
595
596
// Monitor job progress
597
while (!job.getStatus().isTerminal()) {
598
JobMetrics currentMetrics = job.getMetrics();
599
// Process metrics...
600
Thread.sleep(1000);
601
}
602
```