CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-hazelcast--hazelcast

In-memory distributed computing platform for real-time stream processing and data storage with SQL capabilities

Pending
Overview
Eval results
Files

stream-processing.mddocs/

Stream Processing (Jet)

Hazelcast Jet is a distributed stream and batch processing engine built into Hazelcast. It provides high-performance data processing with low-latency and high-throughput capabilities for both real-time streaming and batch processing workloads.

JetService Interface

The main entry point for Jet operations within a Hazelcast instance.

import com.hazelcast.jet.JetService;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.JobStateSnapshot;
import com.hazelcast.jet.Observable;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.core.DAG;
import java.util.List;
import java.util.Collection;

public interface JetService {
    // Configuration
    JetConfig getConfig();
    
    // Job creation
    Job newJob(Pipeline pipeline);
    Job newJob(DAG dag);
    Job newJob(Pipeline pipeline, JobConfig config);
    Job newJob(DAG dag, JobConfig config);
    
    // Job creation with conditions
    Job newJobIfAbsent(Pipeline pipeline, JobConfig config);
    Job newJobIfAbsent(DAG dag, JobConfig config);
    
    // Lightweight jobs
    Job newLightJob(Pipeline pipeline);
    Job newLightJob(DAG dag);
    
    // Job management
    List<Job> getJobs();
    List<Job> getJobs(String name);
    Job getJob(long jobId);
    Job getJob(String name);
    
    // Snapshots
    JobStateSnapshot getJobStateSnapshot(String name);
    Collection<JobStateSnapshot> getJobStateSnapshots();
    
    // Observables
    <T> Observable<T> getObservable(String name);
    <T> Observable<T> newObservable();
}

Getting JetService

HazelcastInstance hz = Hazelcast.newHazelcastInstance();
JetService jet = hz.getJet();

Job Management

Job Interface

import com.hazelcast.jet.Job;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.JobSuspensionCause;
import com.hazelcast.jet.core.metrics.JobMetrics;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CancellationException;

public interface Job {
    // Job identification
    long getId();
    String getName();
    
    // Job status and control
    JobStatus getStatus();
    CompletableFuture<Void> getFuture();
    
    // Job lifecycle
    void cancel() throws CancellationException;
    Job suspend();
    Job resume();
    Job restart();
    
    // Suspension information
    JobSuspensionCause getSuspensionCause();
    
    // Metrics and monitoring
    JobMetrics getMetrics();
    
    // Snapshots
    JobStateSnapshot cancelAndExportSnapshot(String name);
    JobStateSnapshot exportSnapshot(String name);
    
    // Configuration
    JobConfig getConfig();
}

JobConfig Class

import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import java.io.File;
import java.net.URL;
import java.util.List;

public class JobConfig {
    // Job identification
    JobConfig setName(String name);
    String getName();
    
    // Processing guarantees
    JobConfig setProcessingGuarantee(ProcessingGuarantee processingGuarantee);
    ProcessingGuarantee getProcessingGuarantee();
    
    // Snapshot configuration
    JobConfig setSnapshotIntervalMillis(long snapshotIntervalMillis);
    long getSnapshotIntervalMillis();
    
    JobConfig setAutoScaling(boolean enabled);
    boolean isAutoScaling();
    
    // Resource management
    JobConfig setSplitBrainProtectionName(String splitBrainProtectionName);
    String getSplitBrainProtectionName();
    
    // Class loading
    JobConfig addClass(Class<?>... classes);
    JobConfig addJar(File jarFile);
    JobConfig addJar(URL jarUrl);
    JobConfig addClasspathResource(URL url);
    JobConfig addClasspathResource(URL url, String id);
    
    List<String> getJars();
    List<String> getClasspathResources();
    
    // Serialization
    JobConfig setSerializer(Class<?> clazz, Class<?> serializerClass);
    Map<String, String> getSerializers();
    
    // Metrics
    JobConfig setStoreMetricsAfterJobCompletion(boolean storeMetricsAfterJobCompletion);
    boolean isStoreMetricsAfterJobCompletion();
    
    JobConfig setMetricsEnabled(boolean enabled);
    boolean isMetricsEnabled();
}

Job Status Handling

import com.hazelcast.jet.core.JobStatus;

public enum JobStatus {
    NOT_RUNNING,
    STARTING,
    RUNNING,
    SUSPENDED,
    SUSPENDED_EXPORTING_SNAPSHOT,
    COMPLETING,
    FAILED,
    COMPLETED,
    RESTARTING
}

// Job status monitoring
Job job = jet.newJob(pipeline);

// Check status
JobStatus status = job.getStatus();
System.out.println("Job status: " + status);

// Wait for completion
try {
    job.getFuture().get(); // Blocks until job completes
    System.out.println("Job completed successfully");
} catch (Exception e) {
    System.err.println("Job failed: " + e.getMessage());
}

// Job control
job.suspend();  // Suspend job
job.resume();   // Resume suspended job
job.restart();  // Restart job
job.cancel();   // Cancel job

Pipeline API

Pipeline Class

High-level API for building data processing pipelines.

import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.Sink;

public class Pipeline {
    // Pipeline creation
    public static Pipeline create();
    
    // Batch sources
    public <T> BatchStage<T> readFrom(BatchSource<T> source);
    
    // Stream sources  
    public <T> StreamStage<T> readFrom(StreamSource<T> source);
    
    // Drawing the DAG
    public String toDotString();
}

Sources

import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.function.SupplierEx;
import java.util.Map;

public final class Sources {
    // Hazelcast data structures
    public static <K, V> BatchSource<Entry<K, V>> map(String mapName);
    public static <K, V> BatchSource<Entry<K, V>> map(String mapName, Predicate<K, V> predicate, Projection<? super Entry<K, V>, T> projection);
    
    public static <T> BatchSource<T> list(String listName);
    public static <T> BatchSource<T> cache(String cacheName);
    
    // Streaming from data structures
    public static <K, V> StreamSource<Entry<K, V>> mapJournal(String mapName, JournalInitialPosition initialPos);
    public static <T> StreamSource<T> cacheJournal(String cacheName, JournalInitialPosition initialPos);
    
    // Files
    public static BatchSource<String> files(String directory);
    public static BatchSource<String> files(String directory, String glob, boolean sharedFileSystem);
    public static <T> BatchSource<T> filesBuilder(String directory);
    
    // Streaming files
    public static StreamSource<String> fileWatcher(String watchedDirectory);
    
    // Collections and arrays
    public static <T> BatchSource<T> fromProcessor(String name, SupplierEx<Processor> supplier);
    
    // Socket
    public static StreamSource<String> socket(String host, int port, Charset charset);
    
    // Test sources
    public static <T> BatchSource<T> empty();
    public static StreamSource<Long> streamFromProcessor(String name, SupplierEx<Processor> supplier);
}

Sinks

import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.function.FunctionEx;

public final class Sinks {
    // Hazelcast data structures
    public static <T, K, V> Sink<T> map(String mapName);
    public static <T, K, V> Sink<T> map(String mapName, FunctionEx<? super T, ? extends K> toKeyFn, FunctionEx<? super T, ? extends V> toValueFn);
    
    public static <T> Sink<T> list(String listName);
    public static <T> Sink<T> cache(String cacheName);
    
    // Remote maps and caches  
    public static <T, K, V> Sink<T> remoteMap(String mapName, ClientConfig clientConfig);
    public static <T, K, V> Sink<T> remoteCache(String cacheName, ClientConfig clientConfig);
    
    // Files
    public static <T> Sink<T> files(String directoryName);
    public static <T> Sink<T> files(String directoryName, FunctionEx<? super T, String> toStringFn);
    
    // Socket
    public static Sink<String> socket(String host, int port);
    public static <T> Sink<T> socket(String host, int port, FunctionEx<? super T, String> toStringFn);
    
    // Logging and testing
    public static <T> Sink<T> logger();
    public static <T> Sink<T> logger(FunctionEx<? super T, String> toStringFn);
    public static <T> Sink<T> noop();
}

Pipeline Stages

BatchStage Interface

import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.BatchStageWithKey;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.PredicateEx;
import com.hazelcast.function.ConsumerEx;

public interface BatchStage<T> extends GeneralStage<T> {
    // Transformation operations
    <R> BatchStage<R> map(FunctionEx<? super T, ? extends R> mapFn);
    <R> BatchStage<R> flatMap(FunctionEx<? super T, ? extends Traverser<R>> flatMapFn);
    BatchStage<T> filter(PredicateEx<? super T> filterFn);
    
    // Grouping
    <K> BatchStageWithKey<T, K> groupingKey(FunctionEx<? super T, ? extends K> keyFn);
    
    // Aggregation
    <A, R> BatchStage<R> aggregate(AggregateOperation<? super T, A, R> aggrOp);
    
    // Joining  
    <T1, R> BatchStage<R> hashJoin(BatchStage<T1> stage1, JoinClause<K, ? super T, ? super T1, ? extends R> joinClause);
    
    // Sorting
    BatchStage<T> sort();
    BatchStage<T> sort(ComparatorEx<? super T> comparatorFn);
    
    // Distinct
    BatchStage<T> distinct();
    BatchStage<T> distinct(FunctionEx<? super T, ?> keyFn);
    
    // Peek (for debugging)
    BatchStage<T> peek(ConsumerEx<? super T> peekFn);
    
    // Terminal operations
    void writeTo(Sink<? super T> sink);
    
    // Custom transformations
    <R> BatchStage<R> customTransform(String stageName, SupplierEx<Processor> procSupplier);
}

StreamStage Interface

import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.StreamStageWithKey;
import com.hazelcast.jet.pipeline.WindowDefinition;

public interface StreamStage<T> extends GeneralStage<T> {
    // Transformation operations
    <R> StreamStage<R> map(FunctionEx<? super T, ? extends R> mapFn);
    <R> StreamStage<R> flatMap(FunctionEx<? super T, ? extends Traverser<R>> flatMapFn);
    StreamStage<T> filter(PredicateEx<? super T> filterFn);
    
    // Grouping and keying
    <K> StreamStageWithKey<T, K> groupingKey(FunctionEx<? super T, ? extends K> keyFn);
    
    // Windowing
    <R> StreamStage<R> window(WindowDefinition wDef);
    
    // Stateful mapping
    <S, R> StreamStage<R> mapStateful(SupplierEx<? extends S> createFn, BiFunctionEx<? super S, ? super T, ? extends R> statefulMapFn);
    
    // Rebalancing
    StreamStage<T> rebalance();
    StreamStage<T> rebalance(FunctionEx<? super T, ?> keyFn);
    
    // Terminal operations
    void writeTo(Sink<? super T> sink);
    
    // Custom transformations
    <R> StreamStage<R> customTransform(String stageName, SupplierEx<Processor> procSupplier);
}

Pipeline Examples

Basic Batch Processing

import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.query.Predicates;

// Word count example
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(Sources.files("/input/directory"))
        .flatMap(line -> Traversers.traverseArray(line.toLowerCase().split("\\W+")))
        .filter(word -> !word.isEmpty())
        .groupingKey(word -> word)
        .aggregate(AggregateOperations.counting())
        .writeTo(Sinks.map("word-counts"));

Job job = jet.newJob(pipeline);
job.getFuture().get(); // Wait for completion

Stream Processing with Windows

import com.hazelcast.jet.pipeline.WindowDefinition;
import static com.hazelcast.jet.aggregate.AggregateOperations.*;

// Real-time analytics pipeline
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(Sources.mapJournal("events", JournalInitialPosition.START_FROM_OLDEST))
        .withIngestionTimestamps()
        .<String, Long>map(entry -> {
            String event = entry.getValue().toString();
            return Util.entry(extractUserId(event), extractAmount(event));
        })
        .groupingKey(Entry::getKey)
        .window(WindowDefinition.sliding(Duration.ofMinutes(5), Duration.ofMinutes(1)))
        .aggregate(summingLong(Entry::getValue))
        .writeTo(Sinks.map("user-totals"));

Job streamJob = jet.newJob(pipeline);

Complex Data Processing

// ETL Pipeline with enrichment
Pipeline pipeline = Pipeline.create();

// Main data stream
StreamStage<Order> orders = pipeline
    .readFrom(Sources.mapJournal("orders", JournalInitialPosition.START_FROM_CURRENT))
    .map(entry -> parseOrder(entry.getValue()));

// Reference data
BatchStage<Entry<String, Customer>> customers = pipeline
    .readFrom(Sources.map("customers"));

// Join and enrich
orders.groupingKey(Order::getCustomerId)
      .hashJoin(customers, JoinClause.joinMapEntries(Customer::getId))
      .map(item -> enrichOrder(item.get1(), item.get2()))
      .filter(enrichedOrder -> enrichedOrder.getAmount() > 1000)
      .writeTo(Sinks.map("high-value-orders"));

Job enrichmentJob = jet.newJob(pipeline);

Aggregations

Standard Aggregations

import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.aggregate.AggregateOperation;

// Built-in aggregation operations
AggregateOperation<Object, ?, Long> counting = AggregateOperations.counting();
AggregateOperation<Long, ?, Long> summingLong = AggregateOperations.summingLong(Long::longValue);
AggregateOperation<Double, ?, Double> averagingDouble = AggregateOperations.averagingDouble(Double::doubleValue);
AggregateOperation<Comparable, ?, Comparable> maxBy = AggregateOperations.maxBy(Comparable::compareTo);
AggregateOperation<Comparable, ?, Comparable> minBy = AggregateOperations.minBy(Comparable::compareTo);

// Usage in pipeline
pipeline.readFrom(Sources.list("numbers"))
        .aggregate(summingLong(Number::longValue))
        .writeTo(Sinks.logger());

Custom Aggregations

import com.hazelcast.jet.aggregate.AggregateOperation;

// Custom aggregation for calculating standard deviation
AggregateOperation<Double, MutableReference<Stats>, Double> stdDev = 
    AggregateOperation
        .withCreate(() -> new MutableReference<>(new Stats()))
        .andAccumulate((MutableReference<Stats> acc, Double value) -> {
            Stats stats = acc.get();
            stats.count++;
            stats.sum += value;
            stats.sumSquares += value * value;
        })
        .andCombine((acc1, acc2) -> {
            Stats stats1 = acc1.get();
            Stats stats2 = acc2.get();
            stats1.count += stats2.count;
            stats1.sum += stats2.sum;
            stats1.sumSquares += stats2.sumSquares;
        })
        .andExportFinish(acc -> {
            Stats stats = acc.get();
            double mean = stats.sum / stats.count;
            double variance = (stats.sumSquares / stats.count) - (mean * mean);
            return Math.sqrt(variance);
        });

// Usage
pipeline.readFrom(Sources.list("measurements"))
        .aggregate(stdDev)
        .writeTo(Sinks.logger());

Advanced Features

Job State Snapshots

// Create job with snapshot configuration
JobConfig config = new JobConfig();
config.setName("streaming-analytics");
config.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
config.setSnapshotIntervalMillis(5000); // 5 seconds

Job job = jet.newJob(pipeline, config);

// Export snapshot
JobStateSnapshot snapshot = job.exportSnapshot("backup-snapshot");

// Start new job from snapshot  
JobConfig restoreConfig = new JobConfig();
restoreConfig.setInitialSnapshotName("backup-snapshot");
Job restoredJob = jet.newJob(newPipeline, restoreConfig);

Processing Guarantees

import com.hazelcast.jet.config.ProcessingGuarantee;

JobConfig config = new JobConfig();

// At-least-once processing (default)
config.setProcessingGuarantee(ProcessingGuarantee.AT_LEAST_ONCE);

// Exactly-once processing (with snapshots)
config.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);

// No guarantee (best performance)  
config.setProcessingGuarantee(ProcessingGuarantee.NONE);

Observables

import com.hazelcast.jet.Observable;

// Create observable
Observable<String> observable = jet.newObservable();

// Configure pipeline to write to observable
pipeline.readFrom(Sources.list("input"))
        .map(String::toUpperCase)
        .writeTo(Sinks.observable("results"));

Job job = jet.newJob(pipeline);

// Observe results
Observable<String> results = jet.getObservable("results");
results.addObserver(result -> {
    System.out.println("Result: " + result);
});

Monitoring and Metrics

Job Metrics

import com.hazelcast.jet.core.metrics.JobMetrics;
import com.hazelcast.jet.core.metrics.Measurement;

Job job = jet.getJob("analytics-job");
JobMetrics metrics = job.getMetrics();

// Iterate through all measurements
for (Measurement measurement : metrics) {
    System.out.println("Metric: " + measurement.metric() + 
                      ", Value: " + measurement.value() +
                      ", Unit: " + measurement.unit());
}

// Get specific metrics
long itemsProcessed = metrics.get("[vertex=map-stage]/itemsOut");
long throughput = metrics.get("[vertex=map-stage]/throughput");

Job Configuration for Monitoring

JobConfig config = new JobConfig();
config.setMetricsEnabled(true);
config.setStoreMetricsAfterJobCompletion(true);

Job job = jet.newJob(pipeline, config);

// Monitor job progress
while (!job.getStatus().isTerminal()) {
    JobMetrics currentMetrics = job.getMetrics();
    // Process metrics...
    Thread.sleep(1000);
}

Install with Tessl CLI

npx tessl i tessl/maven-com-hazelcast--hazelcast

docs

cluster-management.md

configuration.md

core-api.md

data-structures.md

index.md

sql-service.md

stream-processing.md

tile.json