CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-10

Spark Streaming integration with Amazon Kinesis for real-time data processing using the Kinesis Client Library

Pending
Overview
Eval results
Files

java-api.mddocs/

Java API

Complete Java API support with type-safe bindings, function interfaces, and seamless integration with Java Streaming contexts and data processing pipelines.

Capabilities

Basic Stream Creation (Java)

Creates a Kinesis input stream using default AWS credential discovery and byte array message handler.

/**
 * Create an input stream that pulls messages from a Kinesis stream using the KCL.
 * Uses DefaultAWSCredentialsProviderChain for AWS authentication.
 *
 * @param jssc Java StreamingContext object
 * @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
 * @param streamName Kinesis stream name
 * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
 * @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
 * @param initialPositionInStream Starting position in stream (TRIM_HORIZON or LATEST)
 * @param checkpointInterval Checkpoint interval for Kinesis checkpointing
 * @param storageLevel Storage level for received objects (MEMORY_AND_DISK_2 recommended)
 * @return JavaReceiverInputDStream<byte[]> containing raw message data
 */
public static JavaReceiverInputDStream<byte[]> createStream(
  JavaStreamingContext jssc,
  String kinesisAppName,
  String streamName,
  String endpointUrl,
  String regionName,
  InitialPositionInStream initialPositionInStream,
  Duration checkpointInterval,
  StorageLevel storageLevel
);

Usage Example:

import org.apache.spark.streaming.kinesis.KinesisUtils;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;

JavaReceiverInputDStream<byte[]> stream = KinesisUtils.createStream(
  jssc,
  "MySparkKinesisApp",
  "my-kinesis-stream", 
  "https://kinesis.us-east-1.amazonaws.com",
  "us-east-1",
  InitialPositionInStream.LATEST,
  new Duration(2000),
  StorageLevel.MEMORY_AND_DISK_2()
);

Stream Creation with Explicit Credentials (Java)

Creates a Kinesis input stream with explicitly provided AWS credentials.

/**
 * Create an input stream with explicit AWS credentials.
 * Note: Credentials will be saved in DStream checkpoints if checkpointing is enabled.
 *
 * @param jssc Java StreamingContext object
 * @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
 * @param streamName Kinesis stream name
 * @param endpointUrl Url of Kinesis service
 * @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
 * @param initialPositionInStream Starting position in stream
 * @param checkpointInterval Checkpoint interval for Kinesis checkpointing
 * @param storageLevel Storage level for received objects
 * @param awsAccessKeyId AWS AccessKeyId (if null, uses DefaultAWSCredentialsProviderChain)
 * @param awsSecretKey AWS SecretKey (if null, uses DefaultAWSCredentialsProviderChain)
 * @return JavaReceiverInputDStream<byte[]> containing raw message data
 */
public static JavaReceiverInputDStream<byte[]> createStream(
  JavaStreamingContext jssc,
  String kinesisAppName,
  String streamName,
  String endpointUrl,
  String regionName,
  InitialPositionInStream initialPositionInStream,
  Duration checkpointInterval,
  StorageLevel storageLevel,
  String awsAccessKeyId,
  String awsSecretKey
);

Usage Example:

JavaReceiverInputDStream<byte[]> stream = KinesisUtils.createStream(
  jssc,
  "MySparkKinesisApp",
  "my-kinesis-stream",
  "https://kinesis.us-east-1.amazonaws.com", 
  "us-east-1",
  InitialPositionInStream.LATEST,
  new Duration(2000),
  StorageLevel.MEMORY_AND_DISK_2(),
  "AKIAIOSFODNN7EXAMPLE",
  "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
);

Stream Creation with Custom Message Handler (Java)

Creates a typed Kinesis input stream with a custom message handler function.

/**
 * Create an input stream with a custom message handler for type-safe data processing.
 *
 * @param jssc Java StreamingContext object
 * @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
 * @param streamName Kinesis stream name
 * @param endpointUrl Url of Kinesis service
 * @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
 * @param initialPositionInStream Starting position in stream
 * @param checkpointInterval Checkpoint interval for Kinesis checkpointing
 * @param storageLevel Storage level for received objects
 * @param messageHandler Custom function to process Kinesis Records into type T
 * @param recordClass Class object for type T (required for Java type erasure)
 * @return JavaReceiverInputDStream<T> containing processed data
 */
public static <T> JavaReceiverInputDStream<T> createStream(
  JavaStreamingContext jssc,
  String kinesisAppName,
  String streamName,
  String endpointUrl,
  String regionName,
  InitialPositionInStream initialPositionInStream,
  Duration checkpointInterval,
  StorageLevel storageLevel,
  Function<Record, T> messageHandler,
  Class<T> recordClass
);

Usage Example:

import org.apache.spark.api.java.function.Function;
import com.amazonaws.services.kinesis.model.Record;
import com.fasterxml.jackson.databind.ObjectMapper;

// Define data class
public class MyEvent implements Serializable {
  private String id;
  private long timestamp;
  private String data;
  
  // Constructors, getters, setters...
  public MyEvent() {}
  public MyEvent(String id, long timestamp, String data) {
    this.id = id;
    this.timestamp = timestamp;
    this.data = data;
  }
  
  // Getters and setters
  public String getId() { return id; }
  public void setId(String id) { this.id = id; }
  public long getTimestamp() { return timestamp; }
  public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
  public String getData() { return data; }
  public void setData(String data) { this.data = data; }
}

// Custom message handler
Function<Record, MyEvent> parseMyEvent = new Function<Record, MyEvent>() {
  private final ObjectMapper mapper = new ObjectMapper();
  
  @Override
  public MyEvent call(Record record) throws Exception {
    byte[] data = new byte[record.getData().remaining()];
    record.getData().get(data);
    String json = new String(data, "UTF-8");
    return mapper.readValue(json, MyEvent.class);
  }
};

JavaReceiverInputDStream<MyEvent> stream = KinesisUtils.createStream(
  jssc,
  "MySparkKinesisApp",
  "my-events-stream",
  "https://kinesis.us-east-1.amazonaws.com",
  "us-east-1", 
  InitialPositionInStream.LATEST,
  new Duration(2000),
  StorageLevel.MEMORY_AND_DISK_2(),
  parseMyEvent,
  MyEvent.class
);

Stream Creation with Custom Handler and Credentials (Java)

Creates a typed Kinesis input stream with both custom message handler and explicit AWS credentials.

/**
 * Create an input stream with custom message handler and explicit AWS credentials.
 *
 * @param jssc Java StreamingContext object
 * @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
 * @param streamName Kinesis stream name
 * @param endpointUrl Url of Kinesis service
 * @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
 * @param initialPositionInStream Starting position in stream
 * @param checkpointInterval Checkpoint interval for Kinesis checkpointing
 * @param storageLevel Storage level for received objects
 * @param messageHandler Custom function to process Kinesis Records into type T
 * @param recordClass Class object for type T
 * @param awsAccessKeyId AWS AccessKeyId
 * @param awsSecretKey AWS SecretKey
 * @return JavaReceiverInputDStream<T> containing processed data
 */
public static <T> JavaReceiverInputDStream<T> createStream(
  JavaStreamingContext jssc,
  String kinesisAppName,
  String streamName,
  String endpointUrl,
  String regionName,
  InitialPositionInStream initialPositionInStream,
  Duration checkpointInterval,
  StorageLevel storageLevel,
  Function<Record, T> messageHandler,
  Class<T> recordClass,
  String awsAccessKeyId,
  String awsSecretKey
);

Deprecated Stream Creation (Java)

Simplified stream creation method (deprecated since version 1.4.0).

/**
 * Create an input stream using app name from SparkConf and region from endpoint.
 * @deprecated use other forms of createStream
 *
 * @param jssc Java StreamingContext object
 * @param streamName Kinesis stream name
 * @param endpointUrl Endpoint url of Kinesis service
 * @param checkpointInterval Checkpoint interval for Kinesis checkpointing
 * @param initialPositionInStream Starting position in stream
 * @param storageLevel Storage level for received objects
 * @return JavaReceiverInputDStream<byte[]> containing raw message data
 */
@Deprecated
public static JavaReceiverInputDStream<byte[]> createStream(
  JavaStreamingContext jssc,
  String streamName,
  String endpointUrl,
  Duration checkpointInterval,
  InitialPositionInStream initialPositionInStream,
  StorageLevel storageLevel
);

Java Usage Examples

Simple Text Processing

import org.apache.spark.streaming.kinesis.KinesisUtils;
import org.apache.spark.api.java.function.Function;
import com.amazonaws.services.kinesis.model.Record;

// Convert bytes to string
Function<Record, String> textConverter = new Function<Record, String>() {
  @Override
  public String call(Record record) throws Exception {
    byte[] data = new byte[record.getData().remaining()];
    record.getData().get(data);
    return new String(data, "UTF-8");
  }
};

JavaReceiverInputDStream<String> textStream = KinesisUtils.createStream(
  jssc,
  "TextProcessor",
  "text-stream",
  "https://kinesis.us-east-1.amazonaws.com",
  "us-east-1",
  InitialPositionInStream.LATEST,
  new Duration(2000),
  StorageLevel.MEMORY_AND_DISK_2(),
  textConverter,
  String.class
);

// Process text messages
textStream.foreachRDD(rdd -> {
  rdd.foreach(text -> {
    System.out.println("Received: " + text);
  });
  return null;
});

JSON Processing with Error Handling

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.Optional;

// JSON parser with error handling
Function<Record, Optional<JsonNode>> jsonParser = new Function<Record, Optional<JsonNode>>() {
  private final ObjectMapper mapper = new ObjectMapper();
  
  @Override
  public Optional<JsonNode> call(Record record) throws Exception {
    try {
      byte[] data = new byte[record.getData().remaining()];
      record.getData().get(data);
      String json = new String(data, "UTF-8");
      JsonNode node = mapper.readTree(json);
      return Optional.of(node);
    } catch (Exception e) {
      System.err.println("Failed to parse JSON: " + e.getMessage());
      return Optional.empty();
    }
  }
};

JavaReceiverInputDStream<Optional<JsonNode>> jsonStream = KinesisUtils.createStream(
  jssc,
  "JsonProcessor", 
  "json-stream",
  "https://kinesis.us-east-1.amazonaws.com",
  "us-east-1",
  InitialPositionInStream.LATEST,
  new Duration(2000),
  StorageLevel.MEMORY_AND_DISK_2(),
  jsonParser,
  Optional.class
);

// Filter and process valid JSON
JavaDStream<JsonNode> validJson = jsonStream.flatMap(opt -> {
  return opt.isPresent() ? 
    Arrays.asList(opt.get()).iterator() : 
    Collections.emptyList().iterator();
});

Message Handler with Metadata Access

// Data class that includes metadata
public class EnrichedMessage implements Serializable {
  private String data;
  private String partitionKey;
  private String sequenceNumber;
  private long arrivalTime;
  
  public EnrichedMessage(String data, String partitionKey, 
                        String sequenceNumber, long arrivalTime) {
    this.data = data;
    this.partitionKey = partitionKey;
    this.sequenceNumber = sequenceNumber;
    this.arrivalTime = arrivalTime;
  }
  
  // Getters...
  public String getData() { return data; }
  public String getPartitionKey() { return partitionKey; }
  public String getSequenceNumber() { return sequenceNumber; }
  public long getArrivalTime() { return arrivalTime; }
}

// Message handler that captures metadata
Function<Record, EnrichedMessage> enrichedHandler = new Function<Record, EnrichedMessage>() {
  @Override
  public EnrichedMessage call(Record record) throws Exception {
    byte[] bytes = new byte[record.getData().remaining()];
    record.getData().get(bytes);
    String data = new String(bytes, "UTF-8");
    
    return new EnrichedMessage(
      data,
      record.getPartitionKey(),
      record.getSequenceNumber(),
      record.getApproximateArrivalTimestamp().getTime()
    );
  }
};

JavaReceiverInputDStream<EnrichedMessage> enrichedStream = 
  KinesisUtils.createStream(
    jssc,
    "EnrichedProcessor",
    "enriched-stream", 
    "https://kinesis.us-east-1.amazonaws.com",
    "us-east-1",
    InitialPositionInStream.LATEST,
    new Duration(2000),
    StorageLevel.MEMORY_AND_DISK_2(),
    enrichedHandler,
    EnrichedMessage.class
  );

Word Count Example

Complete example demonstrating Java API usage with word counting:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.regex.Pattern;

public class JavaKinesisWordCount {
  private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
  
  public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("JavaKinesisWordCount");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
    
    // Create Kinesis stream
    JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
      jssc,
      "JavaWordCount",
      "word-stream",
      "https://kinesis.us-east-1.amazonaws.com",
      "us-east-1",
      InitialPositionInStream.LATEST,
      new Duration(2000),
      StorageLevel.MEMORY_AND_DISK_2()
    );
    
    // Convert bytes to strings and split into words
    JavaDStream<String> words = kinesisStream.flatMap(
      new FlatMapFunction<byte[], String>() {
        @Override
        public Iterator<String> call(byte[] line) {
          String text = new String(line);
          return Arrays.asList(WORD_SEPARATOR.split(text)).iterator();
        }
      }
    );
    
    // Count words
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<>(s, 1);
        }
      }
    ).reduceByKey(
      new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      }
    );
    
    // Print results
    wordCounts.print();
    
    jssc.start();
    jssc.awaitTermination();
  }
}

Java Function Interfaces

The Java API uses Spark's Function interfaces for message handlers:

// Main function interface for message handlers
org.apache.spark.api.java.function.Function<Record, T>

// For operations that may throw exceptions
org.apache.spark.api.java.function.Function<Record, T> {
  T call(Record record) throws Exception;
}

Best Practices for Java API

Type Safety

  • Always specify the record class parameter for type safety
  • Use Optional<T> for operations that may fail
  • Leverage Java generics for compile-time type checking

Error Handling

  • Wrap parsing operations in try-catch blocks
  • Return Optional or use custom error types
  • Log errors appropriately for monitoring

Performance

  • Reuse expensive objects like ObjectMapper in message handlers
  • Avoid creating new objects in tight loops
  • Consider using static methods for stateless operations

Memory Management

  • Be careful with large message payloads
  • Don't hold references to ByteBuffer objects
  • Use streaming parsers for large JSON/XML documents

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-10

docs

aws-configuration.md

data-processing.md

index.md

java-api.md

stream-creation.md

tile.json