or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-spark--spark-hadoop-cloud_2-13

Hadoop cloud integration capabilities for Apache Spark, enabling seamless interaction with cloud storage systems

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-hadoop-cloud_2.13@4.0.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-hadoop-cloud_2-13@4.0.0

index.mddocs/

Spark Hadoop Cloud Integration

The spark-hadoop-cloud_2.13 package provides internal cloud storage integration capabilities for Apache Spark. This package contains Hadoop JARs and transitive dependencies needed to interact with cloud infrastructures like AWS S3, Google Cloud Storage, and Azure storage systems.

Important: This package contains only internal implementation components in the org.apache.spark.internal.* package namespace. These are not intended for direct external use and are subject to change without notice.

Package Information

  • Package Name: spark-hadoop-cloud_2.13
  • Package Type: maven
  • Language: Scala (with Java dependencies)
  • Group ID: org.apache.spark
  • Artifact ID: spark-hadoop-cloud_2.13
  • Version: 4.0.0

Installation

As a Maven dependency:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hadoop-cloud_2.13</artifactId>
    <version>4.0.0</version>
</dependency>

For SBT:

libraryDependencies += "org.apache.spark" %% "spark-hadoop-cloud" % "4.0.0"

Core Components

This package provides three main internal components that extend Spark's capabilities for cloud storage integration:

Cloud Checkpoint File Manager

Provides atomic checkpoint file operations for Spark streaming applications using abortable streams.

class AbortableStreamBasedCheckpointFileManager(
    path: org.apache.hadoop.fs.Path, 
    hadoopConf: org.apache.hadoop.conf.Configuration
) extends AbstractFileContextBasedCheckpointFileManager

Path Output Commit Protocol

Implements commit protocols for cloud storage systems using Hadoop's PathOutputCommitter framework.

class PathOutputCommitProtocol(
    jobId: String,
    dest: String,
    dynamicPartitionOverwrite: Boolean = false
) extends HadoopMapReduceCommitProtocol with Serializable

Binding Parquet Output Committer

Dynamically binds Parquet operations to cloud-specific output committers.

class BindingParquetOutputCommitter(
    path: org.apache.hadoop.fs.Path,
    context: org.apache.hadoop.mapreduce.TaskAttemptContext
) extends org.apache.parquet.hadoop.ParquetOutputCommitter

Cloud Storage Integration

Supported Cloud Providers

This package includes dependencies and integration for:

  • AWS S3: Via hadoop-aws and AWS SDK v2
  • Google Cloud Storage: Via gcs-connector

Key Features

  • Atomic Operations: Ensures data consistency through abortable stream operations
  • Dynamic Partitioning: Supports dynamic partition overwrite for compatible storage systems
  • Integration Testing: Includes capabilities for testing against real cloud infrastructure
  • Factory-Based Committers: Uses Hadoop's PathOutputCommitterFactory for extensibility

Capabilities

Abortable Stream-Based Checkpoint Management

Manages checkpoint files with atomic write operations and abort capabilities for cloud storage systems.

// Constructor
class AbortableStreamBasedCheckpointFileManager(
    path: org.apache.hadoop.fs.Path,
    hadoopConf: org.apache.hadoop.conf.Configuration
) extends AbstractFileContextBasedCheckpointFileManager with Logging

// Key methods
def createAtomic(
    path: org.apache.hadoop.fs.Path, 
    overwriteIfPossible: Boolean
): org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream

// Inner class for cancellable stream operations
class AbortableStreamBasedFSDataOutputStream(
    fsDataOutputStream: org.apache.hadoop.fs.FSDataOutputStream,
    fc: org.apache.hadoop.fs.FileContext,
    path: org.apache.hadoop.fs.Path,
    overwriteIfPossible: Boolean
) extends CancellableFSDataOutputStream {
  def cancel(): Unit
  def close(): Unit
}

Requirements: The filesystem must support CommonPathCapabilities.ABORTABLE_STREAM.

Path Output Commit Protocol

Provides Spark commit protocol implementation for cloud storage systems with proper job and task lifecycle management.

// Constructor
class PathOutputCommitProtocol(
    jobId: String,
    dest: String,
    dynamicPartitionOverwrite: Boolean = false
) extends HadoopMapReduceCommitProtocol with Serializable

// Key methods
protected def setupCommitter(
    context: org.apache.hadoop.mapreduce.TaskAttemptContext
): org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter

def newTaskTempFile(
    taskContext: org.apache.hadoop.mapreduce.TaskAttemptContext,
    dir: Option[String],
    spec: org.apache.spark.internal.io.FileNameSpec
): String

def newTaskTempFileAbsPath(
    taskContext: org.apache.hadoop.mapreduce.TaskAttemptContext,
    absoluteDir: String,
    spec: org.apache.spark.internal.io.FileNameSpec
): String

// Configuration constants
val REJECT_FILE_OUTPUT: String = "pathoutputcommit.reject.fileoutput"
val REJECT_FILE_OUTPUT_DEFVAL: Boolean = false
val CAPABILITY_DYNAMIC_PARTITIONING: String = "mapreduce.job.committer.dynamic.partitioning"
val OUTPUTCOMMITTER_FACTORY_SCHEME: String = "mapreduce.outputcommitter.factory.scheme"

Binding Parquet Output Committer

Enables Parquet files to work with any PathOutputCommitter implementation, not just ParquetOutputCommitter subclasses.

// Constructor
class BindingParquetOutputCommitter(
    path: org.apache.hadoop.fs.Path,
    context: org.apache.hadoop.mapreduce.TaskAttemptContext
) extends ParquetOutputCommitter with Logging with StreamCapabilities

// Key methods
def boundCommitter(): org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter
def getWorkPath(): org.apache.hadoop.fs.Path
def setupTask(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Unit
def commitTask(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Unit
def abortTask(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Unit
def setupJob(jobContext: org.apache.hadoop.mapreduce.JobContext): Unit
def commitJob(jobContext: org.apache.hadoop.mapreduce.JobContext): Unit
def abortJob(jobContext: org.apache.hadoop.mapreduce.JobContext, state: org.apache.hadoop.mapreduce.JobStatus.State): Unit
def needsTaskCommit(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Boolean
def cleanupJob(jobContext: org.apache.hadoop.mapreduce.JobContext): Unit
def isCommitJobRepeatable(jobContext: org.apache.hadoop.mapreduce.JobContext): Boolean
def recoverTask(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Unit
def isRecoverySupported: Boolean
def isRecoverySupported(jobContext: org.apache.hadoop.mapreduce.JobContext): Boolean
def hasCapability(capability: String): Boolean

Types

Core Hadoop Types

// From org.apache.hadoop.fs
type Path = org.apache.hadoop.fs.Path
type Configuration = org.apache.hadoop.conf.Configuration
type FSDataOutputStream = org.apache.hadoop.fs.FSDataOutputStream
type FileContext = org.apache.hadoop.fs.FileContext

// From org.apache.hadoop.mapreduce  
type TaskAttemptContext = org.apache.hadoop.mapreduce.TaskAttemptContext
type JobContext = org.apache.hadoop.mapreduce.JobContext
type JobStatus = org.apache.hadoop.mapreduce.JobStatus

// From org.apache.hadoop.mapreduce.lib.output
type PathOutputCommitter = org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter

// From org.apache.parquet.hadoop
type ParquetOutputCommitter = org.apache.parquet.hadoop.ParquetOutputCommitter

// Spark internal types
type FileNameSpec = org.apache.spark.internal.io.FileNameSpec
type CancellableFSDataOutputStream = org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
type AbstractFileContextBasedCheckpointFileManager = org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
type HadoopMapReduceCommitProtocol = org.apache.spark.internal.io.HadoopMapReduceCommitProtocol

// From org.apache.hadoop.fs
type StreamCapabilities = org.apache.hadoop.fs.StreamCapabilities

// From org.apache.spark.internal
type Logging = org.apache.spark.internal.Logging

Integration Testing

The package supports integration testing against real cloud infrastructure through the IntegrationTestSuite tag. Tests can be configured with environment variables:

AWS S3 Testing

  • S3A_PATH: S3 bucket path for testing
  • AWS_ACCESS_KEY_ID: AWS access key
  • AWS_SECRET_ACCESS_KEY: AWS secret key
  • AWS_SESSION_TOKEN: Optional session token
  • AWS_ENDPOINT_URL: Optional custom endpoint

Running Integration Tests

mvn test -Pintegration-test

Architecture Notes

This package serves as a bridge between Spark's internal streaming and batch processing engines and cloud storage systems. It:

  1. Extends Spark's checkpoint capabilities to support cloud storage with atomic operations
  2. Provides pluggable commit protocols that work with various cloud storage committers
  3. Enables Parquet integration with cloud-specific optimizations
  4. Bundles necessary dependencies for cloud storage access in a single artifact

The package is designed to be used internally by Spark's SQL engine, Structured Streaming, and batch processing components when writing to cloud storage systems. It ensures data consistency and reliability through proven Hadoop cloud storage patterns while maintaining compatibility with Spark's distributed processing model.