Hadoop cloud integration capabilities for Apache Spark, enabling seamless interaction with cloud storage systems
npx @tessl/cli install tessl/maven-org-apache-spark--spark-hadoop-cloud_2-13@4.0.0The spark-hadoop-cloud_2.13 package provides internal cloud storage integration capabilities for Apache Spark. This package contains Hadoop JARs and transitive dependencies needed to interact with cloud infrastructures like AWS S3, Google Cloud Storage, and Azure storage systems.
Important: This package contains only internal implementation components in the org.apache.spark.internal.* package namespace. These are not intended for direct external use and are subject to change without notice.
As a Maven dependency:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hadoop-cloud_2.13</artifactId>
<version>4.0.0</version>
</dependency>For SBT:
libraryDependencies += "org.apache.spark" %% "spark-hadoop-cloud" % "4.0.0"This package provides three main internal components that extend Spark's capabilities for cloud storage integration:
Provides atomic checkpoint file operations for Spark streaming applications using abortable streams.
class AbortableStreamBasedCheckpointFileManager(
path: org.apache.hadoop.fs.Path,
hadoopConf: org.apache.hadoop.conf.Configuration
) extends AbstractFileContextBasedCheckpointFileManagerImplements commit protocols for cloud storage systems using Hadoop's PathOutputCommitter framework.
class PathOutputCommitProtocol(
jobId: String,
dest: String,
dynamicPartitionOverwrite: Boolean = false
) extends HadoopMapReduceCommitProtocol with SerializableDynamically binds Parquet operations to cloud-specific output committers.
class BindingParquetOutputCommitter(
path: org.apache.hadoop.fs.Path,
context: org.apache.hadoop.mapreduce.TaskAttemptContext
) extends org.apache.parquet.hadoop.ParquetOutputCommitterThis package includes dependencies and integration for:
hadoop-aws and AWS SDK v2gcs-connectorManages checkpoint files with atomic write operations and abort capabilities for cloud storage systems.
// Constructor
class AbortableStreamBasedCheckpointFileManager(
path: org.apache.hadoop.fs.Path,
hadoopConf: org.apache.hadoop.conf.Configuration
) extends AbstractFileContextBasedCheckpointFileManager with Logging
// Key methods
def createAtomic(
path: org.apache.hadoop.fs.Path,
overwriteIfPossible: Boolean
): org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
// Inner class for cancellable stream operations
class AbortableStreamBasedFSDataOutputStream(
fsDataOutputStream: org.apache.hadoop.fs.FSDataOutputStream,
fc: org.apache.hadoop.fs.FileContext,
path: org.apache.hadoop.fs.Path,
overwriteIfPossible: Boolean
) extends CancellableFSDataOutputStream {
def cancel(): Unit
def close(): Unit
}Requirements: The filesystem must support CommonPathCapabilities.ABORTABLE_STREAM.
Provides Spark commit protocol implementation for cloud storage systems with proper job and task lifecycle management.
// Constructor
class PathOutputCommitProtocol(
jobId: String,
dest: String,
dynamicPartitionOverwrite: Boolean = false
) extends HadoopMapReduceCommitProtocol with Serializable
// Key methods
protected def setupCommitter(
context: org.apache.hadoop.mapreduce.TaskAttemptContext
): org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter
def newTaskTempFile(
taskContext: org.apache.hadoop.mapreduce.TaskAttemptContext,
dir: Option[String],
spec: org.apache.spark.internal.io.FileNameSpec
): String
def newTaskTempFileAbsPath(
taskContext: org.apache.hadoop.mapreduce.TaskAttemptContext,
absoluteDir: String,
spec: org.apache.spark.internal.io.FileNameSpec
): String
// Configuration constants
val REJECT_FILE_OUTPUT: String = "pathoutputcommit.reject.fileoutput"
val REJECT_FILE_OUTPUT_DEFVAL: Boolean = false
val CAPABILITY_DYNAMIC_PARTITIONING: String = "mapreduce.job.committer.dynamic.partitioning"
val OUTPUTCOMMITTER_FACTORY_SCHEME: String = "mapreduce.outputcommitter.factory.scheme"Enables Parquet files to work with any PathOutputCommitter implementation, not just ParquetOutputCommitter subclasses.
// Constructor
class BindingParquetOutputCommitter(
path: org.apache.hadoop.fs.Path,
context: org.apache.hadoop.mapreduce.TaskAttemptContext
) extends ParquetOutputCommitter with Logging with StreamCapabilities
// Key methods
def boundCommitter(): org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter
def getWorkPath(): org.apache.hadoop.fs.Path
def setupTask(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Unit
def commitTask(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Unit
def abortTask(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Unit
def setupJob(jobContext: org.apache.hadoop.mapreduce.JobContext): Unit
def commitJob(jobContext: org.apache.hadoop.mapreduce.JobContext): Unit
def abortJob(jobContext: org.apache.hadoop.mapreduce.JobContext, state: org.apache.hadoop.mapreduce.JobStatus.State): Unit
def needsTaskCommit(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Boolean
def cleanupJob(jobContext: org.apache.hadoop.mapreduce.JobContext): Unit
def isCommitJobRepeatable(jobContext: org.apache.hadoop.mapreduce.JobContext): Boolean
def recoverTask(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Unit
def isRecoverySupported: Boolean
def isRecoverySupported(jobContext: org.apache.hadoop.mapreduce.JobContext): Boolean
def hasCapability(capability: String): Boolean// From org.apache.hadoop.fs
type Path = org.apache.hadoop.fs.Path
type Configuration = org.apache.hadoop.conf.Configuration
type FSDataOutputStream = org.apache.hadoop.fs.FSDataOutputStream
type FileContext = org.apache.hadoop.fs.FileContext
// From org.apache.hadoop.mapreduce
type TaskAttemptContext = org.apache.hadoop.mapreduce.TaskAttemptContext
type JobContext = org.apache.hadoop.mapreduce.JobContext
type JobStatus = org.apache.hadoop.mapreduce.JobStatus
// From org.apache.hadoop.mapreduce.lib.output
type PathOutputCommitter = org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter
// From org.apache.parquet.hadoop
type ParquetOutputCommitter = org.apache.parquet.hadoop.ParquetOutputCommitter
// Spark internal types
type FileNameSpec = org.apache.spark.internal.io.FileNameSpec
type CancellableFSDataOutputStream = org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
type AbstractFileContextBasedCheckpointFileManager = org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
type HadoopMapReduceCommitProtocol = org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
// From org.apache.hadoop.fs
type StreamCapabilities = org.apache.hadoop.fs.StreamCapabilities
// From org.apache.spark.internal
type Logging = org.apache.spark.internal.LoggingThe package supports integration testing against real cloud infrastructure through the IntegrationTestSuite tag. Tests can be configured with environment variables:
S3A_PATH: S3 bucket path for testingAWS_ACCESS_KEY_ID: AWS access keyAWS_SECRET_ACCESS_KEY: AWS secret keyAWS_SESSION_TOKEN: Optional session tokenAWS_ENDPOINT_URL: Optional custom endpointmvn test -Pintegration-testThis package serves as a bridge between Spark's internal streaming and batch processing engines and cloud storage systems. It:
The package is designed to be used internally by Spark's SQL engine, Structured Streaming, and batch processing components when writing to cloud storage systems. It ensures data consistency and reliability through proven Hadoop cloud storage patterns while maintaining compatibility with Spark's distributed processing model.