CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly

Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams with fault-tolerant checkpointing and automatic shard management

Pending
Overview
Eval results
Files

java-api.mddocs/

Java API

Complete Java API support for Spark Streaming Kinesis integration, providing Java-friendly method signatures, functional interfaces, and seamless integration with Java applications and frameworks.

Core Java API Methods

Generic Type Stream Creation

Create streams with custom type transformation using Java Function interfaces.

public static <T> JavaReceiverInputDStream<T> createStream(
  JavaStreamingContext jssc,
  String kinesisAppName,
  String streamName,
  String endpointUrl,
  String regionName,
  InitialPositionInStream initialPositionInStream,  
  Duration checkpointInterval,
  StorageLevel storageLevel,
  Function<Record, T> messageHandler,
  Class<T> recordClass
);

Parameters:

  • jssc - JavaStreamingContext object
  • kinesisAppName - Kinesis application name for KCL coordination
  • streamName - Kinesis stream name
  • endpointUrl - Kinesis service endpoint URL
  • regionName - AWS region name
  • initialPositionInStream - Starting position (LATEST or TRIM_HORIZON)
  • checkpointInterval - Checkpoint frequency using Duration
  • storageLevel - Spark storage level for received data
  • messageHandler - Function interface for Record transformation
  • recordClass - Class object for type T

Usage Example:

import org.apache.spark.api.java.function.Function;
import com.amazonaws.services.kinesis.model.Record;
import org.json.JSONObject;

// Define message handler for JSON processing
Function<Record, JSONObject> jsonHandler = new Function<Record, JSONObject>() {
    @Override
    public JSONObject call(Record record) throws Exception {
        String data = new String(record.getData().array());
        return new JSONObject(data);
    }
};

// Create stream
JavaReceiverInputDStream<JSONObject> jsonStream = KinesisUtils.createStream(
    jssc,
    "java-json-processor",
    "json-events-stream",
    "https://kinesis.us-east-1.amazonaws.com",
    "us-east-1",
    InitialPositionInStream.LATEST,
    Durations.seconds(30), 
    StorageLevel.MEMORY_AND_DISK_2(),
    jsonHandler,
    JSONObject.class
);

// Process JSON stream
jsonStream.foreachRDD(rdd -> {
    rdd.foreach(json -> {
        System.out.println("Event ID: " + json.getString("eventId"));
        System.out.println("Timestamp: " + json.getLong("timestamp"));
    });
});

Generic Type Stream with Credentials

public static <T> JavaReceiverInputDStream<T> createStream(
  JavaStreamingContext jssc,
  String kinesisAppName,
  String streamName,
  String endpointUrl,
  String regionName,
  InitialPositionInStream initialPositionInStream,
  Duration checkpointInterval,
  StorageLevel storageLevel,
  Function<Record, T> messageHandler,
  Class<T> recordClass,
  String awsAccessKeyId,
  String awsSecretKey
);

Usage Example:

// Custom message handler with error handling
Function<Record, String> safeStringHandler = new Function<Record, String>() {
    @Override
    public String call(Record record) throws Exception {
        try {
            return new String(record.getData().array(), "UTF-8");
        } catch (Exception e) {
            return "ERROR: " + e.getMessage();
        }
    }
};

JavaReceiverInputDStream<String> secureStream = KinesisUtils.createStream(
    jssc,
    "secure-java-app",
    "secure-text-stream",
    "https://kinesis.us-west-2.amazonaws.com",
    "us-west-2",
    InitialPositionInStream.TRIM_HORIZON,
    Durations.seconds(45),
    StorageLevel.MEMORY_AND_DISK_2(),
    safeStringHandler,
    String.class,
    System.getenv("AWS_ACCESS_KEY_ID"),
    System.getenv("AWS_SECRET_ACCESS_KEY")
);

Default Byte Array Streams

Create streams returning raw byte arrays using default message handling.

public static JavaReceiverInputDStream<byte[]> createStream(
  JavaStreamingContext jssc,
  String kinesisAppName,
  String streamName,
  String endpointUrl,
  String regionName,
  InitialPositionInStream initialPositionInStream,
  Duration checkpointInterval,
  StorageLevel storageLevel
);

Usage Example:

// Create byte array stream
JavaReceiverInputDStream<byte[]> byteStream = KinesisUtils.createStream(
    jssc,
    "java-binary-processor",
    "binary-data-stream",
    "https://kinesis.ap-southeast-1.amazonaws.com",
    "ap-southeast-1", 
    InitialPositionInStream.LATEST,
    Durations.seconds(60),
    StorageLevel.MEMORY_AND_DISK_2()
);

// Convert bytes to strings and process
JavaDStream<String> stringStream = byteStream.map(
    bytes -> new String(bytes, "UTF-8")
);

// Filter and transform
JavaDStream<String> processedStream = stringStream
    .filter(text -> text.length() > 10)
    .map(text -> text.toUpperCase());

processedStream.print();

Default Byte Array Stream with Credentials

public static JavaReceiverInputDStream<byte[]> createStream(
  JavaStreamingContext jssc,
  String kinesisAppName,
  String streamName,
  String endpointUrl,
  String regionName,
  InitialPositionInStream initialPositionInStream,
  Duration checkpointInterval,
  StorageLevel storageLevel,
  String awsAccessKeyId,
  String awsSecretKey
);

Deprecated Method (Legacy Support)

@Deprecated
public static JavaReceiverInputDStream<byte[]> createStream(
  JavaStreamingContext jssc,
  String streamName,
  String endpointUrl,
  Duration checkpointInterval,
  InitialPositionInStream initialPositionInStream,
  StorageLevel storageLevel
);

Java-Specific Patterns

Using Lambda Expressions (Java 8+)

Modern Java applications can use lambda expressions for cleaner message handling:

// Lambda expression for message handling
JavaReceiverInputDStream<String> lambdaStream = KinesisUtils.createStream(
    jssc,
    "lambda-app",
    "text-stream",
    "https://kinesis.eu-west-1.amazonaws.com",
    "eu-west-1",
    InitialPositionInStream.LATEST,
    Durations.seconds(30),
    StorageLevel.MEMORY_AND_DISK_2(),
    record -> new String(record.getData().array()), // Lambda expression
    String.class
);

// Process with lambda expressions  
lambdaStream
    .filter(text -> !text.isEmpty())
    .map(String::trim)
    .foreachRDD(rdd -> {
        rdd.collect().forEach(System.out::println);
    });

Method References

// Using method references for common transformations
JavaReceiverInputDStream<String> stream = KinesisUtils.createStream(
    jssc, "app", "stream", endpoint, region,
    InitialPositionInStream.LATEST, Durations.seconds(30),
    StorageLevel.MEMORY_AND_DISK_2(),
    this::parseRecord, // Method reference
    String.class
);

private String parseRecord(Record record) {
    return new String(record.getData().array());
}

Exception Handling in Message Handlers

// Robust error handling in message handlers
Function<Record, String> robustHandler = new Function<Record, String>() {
    @Override
    public String call(Record record) throws Exception {
        try {
            byte[] data = record.getData().array();
            String text = new String(data, "UTF-8");
            
            // Validate data
            if (text.trim().isEmpty()) {
                return null; // Filter out empty records
            }
            
            return text.trim();
        } catch (Exception e) {
            // Log error and return indicator
            System.err.println("Error processing record: " + e.getMessage());
            return "ERROR";
        }
    }
};

Java Integration Examples

Spring Framework Integration

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class KinesisStreamProcessor {
    
    @Value("${kinesis.app.name}")
    private String kinesisAppName;
    
    @Value("${kinesis.stream.name}")
    private String streamName;
    
    @Value("${aws.kinesis.endpoint}")
    private String endpointUrl;
    
    @Value("${aws.region}")
    private String region;
    
    public void startProcessing(JavaStreamingContext jssc) {
        JavaReceiverInputDStream<String> stream = KinesisUtils.createStream(
            jssc,
            kinesisAppName,
            streamName,
            endpointUrl,
            region,
            InitialPositionInStream.LATEST,
            Durations.seconds(30),
            StorageLevel.MEMORY_AND_DISK_2(),
            this::processRecord,
            String.class
        );
        
        stream.foreachRDD(this::handleBatch);
    }
    
    private String processRecord(Record record) {
        return new String(record.getData().array());
    }
    
    private void handleBatch(JavaRDD<String> rdd) {
        // Process batch with Spring services
        rdd.collect().forEach(this::processMessage);
    }
    
    private void processMessage(String message) {
        // Business logic here
        System.out.println("Processing: " + message);
    }
}

Serialization Considerations

When using custom objects, ensure they are serializable:

import java.io.Serializable;

public class EventData implements Serializable {
    private static final long serialVersionUID = 1L;
    
    private String eventId;
    private long timestamp;
    private String payload;
    
    // Constructors, getters, setters
    public EventData(String eventId, long timestamp, String payload) {
        this.eventId = eventId;
        this.timestamp = timestamp;
        this.payload = payload;
    }
    
    // Getters and setters...
}

// Message handler creating serializable objects
Function<Record, EventData> eventHandler = record -> {
    String data = new String(record.getData().array());
    JSONObject json = new JSONObject(data);
    return new EventData(
        json.getString("eventId"),
        json.getLong("timestamp"), 
        json.getString("payload")
    );
};

Java Type System Integration

Working with Generic Types

// Create custom parameterized types
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;

public class TypeReference<T> {
    private final Type type;
    
    protected TypeReference() {
        Type superClass = getClass().getGenericSuperclass();
        this.type = ((ParameterizedType) superClass).getActualTypeArguments()[0];
    }
    
    public Type getType() {
        return type;
    }
}

// Usage with complex types
TypeReference<List<String>> typeRef = new TypeReference<List<String>>() {};

Null Safety and Optional Integration

import java.util.Optional;

// Message handler with Optional return
Function<Record, Optional<String>> optionalHandler = record -> {
    try {
        String data = new String(record.getData().array());
        return data.trim().isEmpty() ? Optional.empty() : Optional.of(data);
    } catch (Exception e) {
        return Optional.empty();
    }
};

// Filter out empty optionals
JavaReceiverInputDStream<Optional<String>> optionalStream = KinesisUtils.createStream(
    jssc, "app", "stream", endpoint, region,
    InitialPositionInStream.LATEST, Durations.seconds(30),
    StorageLevel.MEMORY_AND_DISK_2(),
    optionalHandler,
    Optional.class
);

JavaDStream<String> filteredStream = optionalStream
    .filter(Optional::isPresent)
    .map(Optional::get);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly

docs

credential-management.md

fault-tolerance.md

index.md

java-api.md

stream-creation.md

tile.json