or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aws-credentials.mdconfiguration.mdindex.mdinitial-position.mdpython-integration.mdstream-creation.md
tile.json

initial-position.mddocs/

Initial Position Configuration

The initial position system controls where Spark Streaming starts reading from a Kinesis stream when no checkpoint data exists. This is crucial for determining whether to process historical data, start with new data, or begin from a specific point in time.

Core API

KinesisInitialPosition Interface

The base interface for initial position configurations.

interface KinesisInitialPosition {
  InitialPositionInStream getPosition();
}

Position Implementations

public class KinesisInitialPositions {
  // Start reading from the latest records (most recent)
  public static class Latest implements KinesisInitialPosition, Serializable {
    public Latest();
    public InitialPositionInStream getPosition();
  }

  // Start reading from the oldest available records (up to 24 hours retention)
  public static class TrimHorizon implements KinesisInitialPosition, Serializable {
    public TrimHorizon();
    public InitialPositionInStream getPosition();
  }

  // Start reading from a specific timestamp
  public static class AtTimestamp implements KinesisInitialPosition, Serializable {
    public AtTimestamp(Date timestamp);
    public InitialPositionInStream getPosition();
    public Date getTimestamp();
  }

  // Utility method for backward compatibility (deprecated usage)
  public static KinesisInitialPosition fromKinesisInitialPosition(
    InitialPositionInStream initialPosition
  ) throws UnsupportedOperationException;
}

Latest Position

Starts consuming from the most recent records in the stream. This is the default behavior and is ideal for real-time processing where you only care about new data.

import org.apache.spark.streaming.kinesis.KinesisInitialPositions

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .initialPosition(new KinesisInitialPositions.Latest())
  .build()

Use cases:

  • Real-time analytics where historical data is not needed
  • Event-driven applications that only process new events
  • High-throughput streams where catching up on old data would be overwhelming

Trim Horizon Position

Starts consuming from the oldest available records in the stream (within Kinesis retention period, up to 24 hours by default).

import org.apache.spark.streaming.kinesis.KinesisInitialPositions

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .initialPosition(new KinesisInitialPositions.TrimHorizon())
  .build()

Use cases:

  • Batch processing applications that need to process all available data
  • Data migration or backfill scenarios
  • Applications that cannot afford to miss any data

At Timestamp Position

Starts consuming from records at or after a specific timestamp. This provides precise control over the starting point.

import org.apache.spark.streaming.kinesis.KinesisInitialPositions
import java.util.Date

// Start from records after a specific time
val startTime = new Date(System.currentTimeMillis() - (2 * 60 * 60 * 1000)) // 2 hours ago
val timestamp = new KinesisInitialPositions.AtTimestamp(startTime)

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .initialPosition(timestamp)
  .build()

Use cases:

  • Reprocessing data from a specific point in time
  • Recovery scenarios where you know when issues occurred
  • Testing with historical data from a known timeframe

Deprecated API (Backward Compatibility)

For backward compatibility, the deprecated initialPositionInStream method is still available:

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .initialPositionInStream(InitialPositionInStream.LATEST)  // Deprecated in 2.3.0
  .build()

Note: This method only supports LATEST and TRIM_HORIZON. For timestamp-based positioning, use the new initialPosition method.

Important Behavior Notes

Checkpointing Takes Precedence

Initial position is only used when no checkpoint data exists. If your application has previously checkpointed progress, it will resume from the checkpointed position regardless of the initial position setting.

// First run: starts from TRIM_HORIZON
// Subsequent runs: resumes from last checkpoint, ignoring TRIM_HORIZON
val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream") 
  .checkpointAppName("existing-app")  // Has checkpoint data
  .initialPosition(new KinesisInitialPositions.TrimHorizon())
  .build()

To force a restart from the initial position, you must:

  1. Delete the DynamoDB checkpoint table, or
  2. Use a different checkpointAppName

Kinesis Retention Limits

Kinesis streams have a retention period (default 24 hours, configurable up to 365 days). Records older than the retention period are not available.

// This will start from the earliest available record within retention
val stream = KinesisInputDStream.builder
  .initialPosition(new KinesisInitialPositions.TrimHorizon())
  .build()

// This may not find records if timestamp is outside retention period
val oldTimestamp = new Date(System.currentTimeMillis() - (48 * 60 * 60 * 1000)) // 48 hours ago
val stream2 = KinesisInputDStream.builder
  .initialPosition(new KinesisInitialPositions.AtTimestamp(oldTimestamp))
  .build()

Complete Examples

Real-time Processing

import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}

// Only process new data as it arrives
val realtimeStream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("real-time-events")
  .checkpointAppName("realtime-processor")
  .initialPosition(new KinesisInitialPositions.Latest())
  .build()

realtimeStream.foreachRDD { rdd =>
  println(s"Processing ${rdd.count()} new records")
  // Process only recent data
}

Historical Data Processing

import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}

// Process all available historical data
val batchStream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("historical-data")
  .checkpointAppName("batch-processor")
  .initialPosition(new KinesisInitialPositions.TrimHorizon())
  .build()

batchStream.foreachRDD { rdd =>
  println(s"Processing batch of ${rdd.count()} records")
  // Process all available data
}

Point-in-Time Recovery

import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}
import java.util.Date
import java.text.SimpleDateFormat

// Start processing from a specific incident time
val incidentTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  .parse("2024-01-15 14:30:00")

val recoveryStream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("application-logs")
  .checkpointAppName("incident-recovery")  // Use unique name to avoid existing checkpoints
  .initialPosition(new KinesisInitialPositions.AtTimestamp(incidentTime))
  .build()

recoveryStream.foreachRDD { rdd =>
  // Reprocess data from the incident onwards
  rdd.collect().foreach { record =>
    println(s"Reprocessing: ${new String(record)}")
  }
}

Development and Testing

import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}

// For testing: start from a known point with limited data
val testStartTime = new Date(System.currentTimeMillis() - (30 * 60 * 1000)) // 30 minutes ago

val testStream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("test-stream")
  .checkpointAppName(s"test-${System.currentTimeMillis()}")  // Unique name for each test
  .initialPosition(new KinesisInitialPositions.AtTimestamp(testStartTime))
  .build()

Best Practices

  1. Use Latest for real-time: Choose Latest for applications that only need new data
  2. Use TrimHorizon for completeness: Choose TrimHorizon when you need all available data
  3. Use AtTimestamp for precision: Choose AtTimestamp for point-in-time recovery or testing
  4. Consider checkpoint behavior: Remember that checkpoints override initial position
  5. Plan for retention limits: Ensure your timestamp is within the stream's retention period
  6. Test with different positions: Verify your application works correctly with various starting points