or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aws-credentials.mdindex.mdinitial-positions.mdpython-api.mdstream-creation.md
tile.json

initial-positions.mddocs/

Initial Position Configuration

Configure where to start reading from Kinesis streams with support for latest records, earliest available records, or starting from a specific timestamp.

Initial Position Types

package org.apache.spark.streaming.kinesis

trait KinesisInitialPosition {
  def getPosition(): InitialPositionInStream
}

object KinesisInitialPositions {
  class Latest() extends KinesisInitialPosition
  class TrimHorizon() extends KinesisInitialPosition  
  class AtTimestamp(timestamp: Date) extends KinesisInitialPosition
  
  def fromKinesisInitialPosition(
    initialPositionInStream: InitialPositionInStream
  ): KinesisInitialPosition
}

Latest Position

Start reading from the latest (most recent) records in the stream. This is the default behavior.

import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .initialPosition(new Latest())
  .build()

Use Case: When you only want to process new data that arrives after the application starts.

Trim Horizon Position

Start reading from the earliest available records in the stream (within the retention period, typically 24 hours).

import org.apache.spark.streaming.kinesis.KinesisInitialPositions.TrimHorizon

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)  
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .initialPosition(new TrimHorizon())
  .build()

Use Case: When you want to process all available historical data in the stream.

At Timestamp Position

Start reading from records at or after a specific timestamp.

import java.util.Date
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.AtTimestamp

// Start from a specific time
val startTime = new Date(System.currentTimeMillis() - 3600000) // 1 hour ago

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream") 
  .checkpointAppName("my-app")
  .initialPosition(new AtTimestamp(startTime))
  .build()

Use Case: When you want to replay data from a specific point in time.

Java API Usage

import java.util.Date;
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;

// Latest position
KinesisInputDStream.builder()
    .initialPosition(new KinesisInitialPositions.Latest())
    .build();

// Trim horizon position  
KinesisInputDStream.builder()
    .initialPosition(new KinesisInitialPositions.TrimHorizon())
    .build();

// At timestamp position
Date timestamp = new Date(System.currentTimeMillis() - 3600000);
KinesisInputDStream.builder()
    .initialPosition(new KinesisInitialPositions.AtTimestamp(timestamp))
    .build();

Legacy API (Deprecated)

The legacy initialPositionInStream method is deprecated but still supported for backward compatibility.

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

// Deprecated - use initialPosition instead
val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .initialPositionInStream(InitialPositionInStream.LATEST) // Deprecated
  .build()

Conversion Utility

Convert from KCL's InitialPositionInStream enum to KinesisInitialPosition objects.

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions

// Convert from KCL enum
val position = KinesisInitialPositions.fromKinesisInitialPosition(
  InitialPositionInStream.TRIM_HORIZON
)

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .initialPosition(position)
  .build()

Note: The conversion utility only supports LATEST and TRIM_HORIZON. For AT_TIMESTAMP, use the AtTimestamp class directly.

Checkpointing Behavior

Initial positions only apply when there are no existing checkpoints for the application:

  • No existing checkpoints: Uses the specified initial position
  • Existing checkpoints: Resumes from the last checkpointed position, ignoring the initial position setting

To force reading from a specific position, you must either:

  1. Use a new checkpoint application name
  2. Clear the existing DynamoDB checkpoint table
  3. Wait for checkpoints to expire (based on DynamoDB TTL settings)

Best Practices

Latest Position

  • Use for real-time processing of new events
  • Minimizes startup time and resource usage
  • Good for monitoring and alerting use cases

Trim Horizon Position

  • Use for batch processing of historical data
  • Ensures no data loss when reprocessing
  • May require more time and resources to catch up

At Timestamp Position

  • Use for precise replay scenarios
  • Useful for debugging or reprocessing specific time ranges
  • Consider timezone handling when working with timestamps

Error Handling

All initial position configurations will throw an exception if the position is invalid or if the timestamp is outside the stream's retention period.