In-memory distributed computing platform for real-time stream processing and data storage with SQL capabilities
—
Hazelcast Jet is a distributed stream and batch processing engine built into Hazelcast. It provides high-performance data processing with low-latency and high-throughput capabilities for both real-time streaming and batch processing workloads.
The main entry point for Jet operations within a Hazelcast instance.
import com.hazelcast.jet.JetService;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.JobStateSnapshot;
import com.hazelcast.jet.Observable;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.core.DAG;
import java.util.List;
import java.util.Collection;
public interface JetService {
// Configuration
JetConfig getConfig();
// Job creation
Job newJob(Pipeline pipeline);
Job newJob(DAG dag);
Job newJob(Pipeline pipeline, JobConfig config);
Job newJob(DAG dag, JobConfig config);
// Job creation with conditions
Job newJobIfAbsent(Pipeline pipeline, JobConfig config);
Job newJobIfAbsent(DAG dag, JobConfig config);
// Lightweight jobs
Job newLightJob(Pipeline pipeline);
Job newLightJob(DAG dag);
// Job management
List<Job> getJobs();
List<Job> getJobs(String name);
Job getJob(long jobId);
Job getJob(String name);
// Snapshots
JobStateSnapshot getJobStateSnapshot(String name);
Collection<JobStateSnapshot> getJobStateSnapshots();
// Observables
<T> Observable<T> getObservable(String name);
<T> Observable<T> newObservable();
}HazelcastInstance hz = Hazelcast.newHazelcastInstance();
JetService jet = hz.getJet();import com.hazelcast.jet.Job;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.JobSuspensionCause;
import com.hazelcast.jet.core.metrics.JobMetrics;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CancellationException;
public interface Job {
// Job identification
long getId();
String getName();
// Job status and control
JobStatus getStatus();
CompletableFuture<Void> getFuture();
// Job lifecycle
void cancel() throws CancellationException;
Job suspend();
Job resume();
Job restart();
// Suspension information
JobSuspensionCause getSuspensionCause();
// Metrics and monitoring
JobMetrics getMetrics();
// Snapshots
JobStateSnapshot cancelAndExportSnapshot(String name);
JobStateSnapshot exportSnapshot(String name);
// Configuration
JobConfig getConfig();
}import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import java.io.File;
import java.net.URL;
import java.util.List;
public class JobConfig {
// Job identification
JobConfig setName(String name);
String getName();
// Processing guarantees
JobConfig setProcessingGuarantee(ProcessingGuarantee processingGuarantee);
ProcessingGuarantee getProcessingGuarantee();
// Snapshot configuration
JobConfig setSnapshotIntervalMillis(long snapshotIntervalMillis);
long getSnapshotIntervalMillis();
JobConfig setAutoScaling(boolean enabled);
boolean isAutoScaling();
// Resource management
JobConfig setSplitBrainProtectionName(String splitBrainProtectionName);
String getSplitBrainProtectionName();
// Class loading
JobConfig addClass(Class<?>... classes);
JobConfig addJar(File jarFile);
JobConfig addJar(URL jarUrl);
JobConfig addClasspathResource(URL url);
JobConfig addClasspathResource(URL url, String id);
List<String> getJars();
List<String> getClasspathResources();
// Serialization
JobConfig setSerializer(Class<?> clazz, Class<?> serializerClass);
Map<String, String> getSerializers();
// Metrics
JobConfig setStoreMetricsAfterJobCompletion(boolean storeMetricsAfterJobCompletion);
boolean isStoreMetricsAfterJobCompletion();
JobConfig setMetricsEnabled(boolean enabled);
boolean isMetricsEnabled();
}import com.hazelcast.jet.core.JobStatus;
public enum JobStatus {
NOT_RUNNING,
STARTING,
RUNNING,
SUSPENDED,
SUSPENDED_EXPORTING_SNAPSHOT,
COMPLETING,
FAILED,
COMPLETED,
RESTARTING
}
// Job status monitoring
Job job = jet.newJob(pipeline);
// Check status
JobStatus status = job.getStatus();
System.out.println("Job status: " + status);
// Wait for completion
try {
job.getFuture().get(); // Blocks until job completes
System.out.println("Job completed successfully");
} catch (Exception e) {
System.err.println("Job failed: " + e.getMessage());
}
// Job control
job.suspend(); // Suspend job
job.resume(); // Resume suspended job
job.restart(); // Restart job
job.cancel(); // Cancel jobHigh-level API for building data processing pipelines.
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.Sink;
public class Pipeline {
// Pipeline creation
public static Pipeline create();
// Batch sources
public <T> BatchStage<T> readFrom(BatchSource<T> source);
// Stream sources
public <T> StreamStage<T> readFrom(StreamSource<T> source);
// Drawing the DAG
public String toDotString();
}import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.function.SupplierEx;
import java.util.Map;
public final class Sources {
// Hazelcast data structures
public static <K, V> BatchSource<Entry<K, V>> map(String mapName);
public static <K, V> BatchSource<Entry<K, V>> map(String mapName, Predicate<K, V> predicate, Projection<? super Entry<K, V>, T> projection);
public static <T> BatchSource<T> list(String listName);
public static <T> BatchSource<T> cache(String cacheName);
// Streaming from data structures
public static <K, V> StreamSource<Entry<K, V>> mapJournal(String mapName, JournalInitialPosition initialPos);
public static <T> StreamSource<T> cacheJournal(String cacheName, JournalInitialPosition initialPos);
// Files
public static BatchSource<String> files(String directory);
public static BatchSource<String> files(String directory, String glob, boolean sharedFileSystem);
public static <T> BatchSource<T> filesBuilder(String directory);
// Streaming files
public static StreamSource<String> fileWatcher(String watchedDirectory);
// Collections and arrays
public static <T> BatchSource<T> fromProcessor(String name, SupplierEx<Processor> supplier);
// Socket
public static StreamSource<String> socket(String host, int port, Charset charset);
// Test sources
public static <T> BatchSource<T> empty();
public static StreamSource<Long> streamFromProcessor(String name, SupplierEx<Processor> supplier);
}import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.function.FunctionEx;
public final class Sinks {
// Hazelcast data structures
public static <T, K, V> Sink<T> map(String mapName);
public static <T, K, V> Sink<T> map(String mapName, FunctionEx<? super T, ? extends K> toKeyFn, FunctionEx<? super T, ? extends V> toValueFn);
public static <T> Sink<T> list(String listName);
public static <T> Sink<T> cache(String cacheName);
// Remote maps and caches
public static <T, K, V> Sink<T> remoteMap(String mapName, ClientConfig clientConfig);
public static <T, K, V> Sink<T> remoteCache(String cacheName, ClientConfig clientConfig);
// Files
public static <T> Sink<T> files(String directoryName);
public static <T> Sink<T> files(String directoryName, FunctionEx<? super T, String> toStringFn);
// Socket
public static Sink<String> socket(String host, int port);
public static <T> Sink<T> socket(String host, int port, FunctionEx<? super T, String> toStringFn);
// Logging and testing
public static <T> Sink<T> logger();
public static <T> Sink<T> logger(FunctionEx<? super T, String> toStringFn);
public static <T> Sink<T> noop();
}import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.BatchStageWithKey;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.PredicateEx;
import com.hazelcast.function.ConsumerEx;
public interface BatchStage<T> extends GeneralStage<T> {
// Transformation operations
<R> BatchStage<R> map(FunctionEx<? super T, ? extends R> mapFn);
<R> BatchStage<R> flatMap(FunctionEx<? super T, ? extends Traverser<R>> flatMapFn);
BatchStage<T> filter(PredicateEx<? super T> filterFn);
// Grouping
<K> BatchStageWithKey<T, K> groupingKey(FunctionEx<? super T, ? extends K> keyFn);
// Aggregation
<A, R> BatchStage<R> aggregate(AggregateOperation<? super T, A, R> aggrOp);
// Joining
<T1, R> BatchStage<R> hashJoin(BatchStage<T1> stage1, JoinClause<K, ? super T, ? super T1, ? extends R> joinClause);
// Sorting
BatchStage<T> sort();
BatchStage<T> sort(ComparatorEx<? super T> comparatorFn);
// Distinct
BatchStage<T> distinct();
BatchStage<T> distinct(FunctionEx<? super T, ?> keyFn);
// Peek (for debugging)
BatchStage<T> peek(ConsumerEx<? super T> peekFn);
// Terminal operations
void writeTo(Sink<? super T> sink);
// Custom transformations
<R> BatchStage<R> customTransform(String stageName, SupplierEx<Processor> procSupplier);
}import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.StreamStageWithKey;
import com.hazelcast.jet.pipeline.WindowDefinition;
public interface StreamStage<T> extends GeneralStage<T> {
// Transformation operations
<R> StreamStage<R> map(FunctionEx<? super T, ? extends R> mapFn);
<R> StreamStage<R> flatMap(FunctionEx<? super T, ? extends Traverser<R>> flatMapFn);
StreamStage<T> filter(PredicateEx<? super T> filterFn);
// Grouping and keying
<K> StreamStageWithKey<T, K> groupingKey(FunctionEx<? super T, ? extends K> keyFn);
// Windowing
<R> StreamStage<R> window(WindowDefinition wDef);
// Stateful mapping
<S, R> StreamStage<R> mapStateful(SupplierEx<? extends S> createFn, BiFunctionEx<? super S, ? super T, ? extends R> statefulMapFn);
// Rebalancing
StreamStage<T> rebalance();
StreamStage<T> rebalance(FunctionEx<? super T, ?> keyFn);
// Terminal operations
void writeTo(Sink<? super T> sink);
// Custom transformations
<R> StreamStage<R> customTransform(String stageName, SupplierEx<Processor> procSupplier);
}import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.query.Predicates;
// Word count example
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(Sources.files("/input/directory"))
.flatMap(line -> Traversers.traverseArray(line.toLowerCase().split("\\W+")))
.filter(word -> !word.isEmpty())
.groupingKey(word -> word)
.aggregate(AggregateOperations.counting())
.writeTo(Sinks.map("word-counts"));
Job job = jet.newJob(pipeline);
job.getFuture().get(); // Wait for completionimport com.hazelcast.jet.pipeline.WindowDefinition;
import static com.hazelcast.jet.aggregate.AggregateOperations.*;
// Real-time analytics pipeline
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(Sources.mapJournal("events", JournalInitialPosition.START_FROM_OLDEST))
.withIngestionTimestamps()
.<String, Long>map(entry -> {
String event = entry.getValue().toString();
return Util.entry(extractUserId(event), extractAmount(event));
})
.groupingKey(Entry::getKey)
.window(WindowDefinition.sliding(Duration.ofMinutes(5), Duration.ofMinutes(1)))
.aggregate(summingLong(Entry::getValue))
.writeTo(Sinks.map("user-totals"));
Job streamJob = jet.newJob(pipeline);// ETL Pipeline with enrichment
Pipeline pipeline = Pipeline.create();
// Main data stream
StreamStage<Order> orders = pipeline
.readFrom(Sources.mapJournal("orders", JournalInitialPosition.START_FROM_CURRENT))
.map(entry -> parseOrder(entry.getValue()));
// Reference data
BatchStage<Entry<String, Customer>> customers = pipeline
.readFrom(Sources.map("customers"));
// Join and enrich
orders.groupingKey(Order::getCustomerId)
.hashJoin(customers, JoinClause.joinMapEntries(Customer::getId))
.map(item -> enrichOrder(item.get1(), item.get2()))
.filter(enrichedOrder -> enrichedOrder.getAmount() > 1000)
.writeTo(Sinks.map("high-value-orders"));
Job enrichmentJob = jet.newJob(pipeline);import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.aggregate.AggregateOperation;
// Built-in aggregation operations
AggregateOperation<Object, ?, Long> counting = AggregateOperations.counting();
AggregateOperation<Long, ?, Long> summingLong = AggregateOperations.summingLong(Long::longValue);
AggregateOperation<Double, ?, Double> averagingDouble = AggregateOperations.averagingDouble(Double::doubleValue);
AggregateOperation<Comparable, ?, Comparable> maxBy = AggregateOperations.maxBy(Comparable::compareTo);
AggregateOperation<Comparable, ?, Comparable> minBy = AggregateOperations.minBy(Comparable::compareTo);
// Usage in pipeline
pipeline.readFrom(Sources.list("numbers"))
.aggregate(summingLong(Number::longValue))
.writeTo(Sinks.logger());import com.hazelcast.jet.aggregate.AggregateOperation;
// Custom aggregation for calculating standard deviation
AggregateOperation<Double, MutableReference<Stats>, Double> stdDev =
AggregateOperation
.withCreate(() -> new MutableReference<>(new Stats()))
.andAccumulate((MutableReference<Stats> acc, Double value) -> {
Stats stats = acc.get();
stats.count++;
stats.sum += value;
stats.sumSquares += value * value;
})
.andCombine((acc1, acc2) -> {
Stats stats1 = acc1.get();
Stats stats2 = acc2.get();
stats1.count += stats2.count;
stats1.sum += stats2.sum;
stats1.sumSquares += stats2.sumSquares;
})
.andExportFinish(acc -> {
Stats stats = acc.get();
double mean = stats.sum / stats.count;
double variance = (stats.sumSquares / stats.count) - (mean * mean);
return Math.sqrt(variance);
});
// Usage
pipeline.readFrom(Sources.list("measurements"))
.aggregate(stdDev)
.writeTo(Sinks.logger());// Create job with snapshot configuration
JobConfig config = new JobConfig();
config.setName("streaming-analytics");
config.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
config.setSnapshotIntervalMillis(5000); // 5 seconds
Job job = jet.newJob(pipeline, config);
// Export snapshot
JobStateSnapshot snapshot = job.exportSnapshot("backup-snapshot");
// Start new job from snapshot
JobConfig restoreConfig = new JobConfig();
restoreConfig.setInitialSnapshotName("backup-snapshot");
Job restoredJob = jet.newJob(newPipeline, restoreConfig);import com.hazelcast.jet.config.ProcessingGuarantee;
JobConfig config = new JobConfig();
// At-least-once processing (default)
config.setProcessingGuarantee(ProcessingGuarantee.AT_LEAST_ONCE);
// Exactly-once processing (with snapshots)
config.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
// No guarantee (best performance)
config.setProcessingGuarantee(ProcessingGuarantee.NONE);import com.hazelcast.jet.Observable;
// Create observable
Observable<String> observable = jet.newObservable();
// Configure pipeline to write to observable
pipeline.readFrom(Sources.list("input"))
.map(String::toUpperCase)
.writeTo(Sinks.observable("results"));
Job job = jet.newJob(pipeline);
// Observe results
Observable<String> results = jet.getObservable("results");
results.addObserver(result -> {
System.out.println("Result: " + result);
});import com.hazelcast.jet.core.metrics.JobMetrics;
import com.hazelcast.jet.core.metrics.Measurement;
Job job = jet.getJob("analytics-job");
JobMetrics metrics = job.getMetrics();
// Iterate through all measurements
for (Measurement measurement : metrics) {
System.out.println("Metric: " + measurement.metric() +
", Value: " + measurement.value() +
", Unit: " + measurement.unit());
}
// Get specific metrics
long itemsProcessed = metrics.get("[vertex=map-stage]/itemsOut");
long throughput = metrics.get("[vertex=map-stage]/throughput");JobConfig config = new JobConfig();
config.setMetricsEnabled(true);
config.setStoreMetricsAfterJobCompletion(true);
Job job = jet.newJob(pipeline, config);
// Monitor job progress
while (!job.getStatus().isTerminal()) {
JobMetrics currentMetrics = job.getMetrics();
// Process metrics...
Thread.sleep(1000);
}Install with Tessl CLI
npx tessl i tessl/maven-com-hazelcast--hazelcast