tessl install tessl/maven-org-apache-spark--spark-hadoop-cloud_2-13@3.5.0Spark's cloud storage integration library providing checkpoint file management and output committers for AWS S3, Google Cloud Storage, and Azure Blob Storage.
Spark's cloud storage integration library providing specialized checkpoint file managers and output committers optimized for cloud storage systems. This library enables reliable data operations on AWS S3, Google Cloud Storage, and Azure Blob Storage through Hadoop cloud connectors with advanced features like abortable streams and path-based commit protocols.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hadoop-cloud_2.13</artifactId>
<version>3.5.6</version>
</dependency>For SBT:
libraryDependencies += "org.apache.spark" %% "spark-hadoop-cloud" % "3.5.6"import org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager
import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
import org.apache.spark.internal.io.cloud.BindingParquetOutputCommitterimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager
// Create checkpoint file manager for cloud storage with abortable stream support
val hadoopConf = new Configuration()
val checkpointPath = new Path("s3a://my-bucket/checkpoints/")
val checkpointManager = new AbortableStreamBasedCheckpointFileManager(
checkpointPath,
hadoopConf
)
// Create atomic output stream for writing checkpoint data
val outputStream = checkpointManager.createAtomic(
path = new Path("s3a://my-bucket/checkpoints/checkpoint-001"),
overwriteIfPossible = true
)
// Write data and close (or cancel if needed)
outputStream.write("checkpoint data".getBytes)
outputStream.close() // or outputStream.cancel() to abortimport org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
import org.apache.spark.sql.SparkSession
// Configure Spark to use PathOutputCommitProtocol for cloud storage
val spark = SparkSession.builder()
.appName("CloudStorageApp")
.config("spark.sql.sources.commitProtocolClass",
"org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
.config("mapreduce.outputcommitter.factory.scheme.s3a",
"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
.getOrCreate()
// Write DataFrame to S3 with reliable committing
df.write
.mode("overwrite")
.parquet("s3a://my-bucket/output/")import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
// Create Parquet committer that binds to configured PathOutputCommitter
val outputPath = new Path("s3a://my-bucket/parquet-output/")
val parquetCommitter = new BindingParquetOutputCommitter(outputPath, taskContext)
// The committer automatically binds to the factory-configured committer
val boundCommitter = parquetCommitter.boundCommitter()Spark Hadoop Cloud Integration is built around three key components:
The library leverages Hadoop's cloud storage connectors and their advanced features like abortable streams and cloud-optimized committers to provide reliability guarantees for Spark workloads on cloud storage.
Fault-tolerant checkpoint file management for streaming workloads on cloud storage systems that support abortable streams.
class AbortableStreamBasedCheckpointFileManager(
path: Path,
hadoopConf: Configuration
) extends AbstractFileContextBasedCheckpointFileManager(path, hadoopConf) with Logging
def createAtomic(
path: Path,
overwriteIfPossible: Boolean
): CancellableFSDataOutputStreamSpark commit protocol using Hadoop's PathOutputCommitterFactory API for reliable output operations with cloud-optimized committers.
class PathOutputCommitProtocol(
jobId: String,
dest: String,
dynamicPartitionOverwrite: Boolean = false
) extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite) with Serializable
protected def setupCommitter(context: TaskAttemptContext): PathOutputCommitter
def newTaskTempFile(
taskContext: TaskAttemptContext,
dir: Option[String],
spec: FileNameSpec
): String
def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext,
absoluteDir: String,
spec: FileNameSpec
): StringParquet-specific output committer that dynamically binds to factory-configured PathOutputCommitters, supporting any cloud storage committer.
class BindingParquetOutputCommitter(
path: Path,
context: TaskAttemptContext
) extends ParquetOutputCommitter(path, context) with Logging with StreamCapabilities
private[cloud] def boundCommitter(): PathOutputCommitter
def getWorkPath(): Path
def setupTask(taskAttemptContext: TaskAttemptContext): Unit
def commitTask(taskAttemptContext: TaskAttemptContext): Unit
def abortTask(taskAttemptContext: TaskAttemptContext): Unit
def setupJob(jobContext: JobContext): Unit
def needsTaskCommit(taskAttemptContext: TaskAttemptContext): Boolean
def cleanupJob(jobContext: JobContext): Unit
def isCommitJobRepeatable(jobContext: JobContext): Boolean
def commitJob(jobContext: JobContext): Unit
def recoverTask(taskAttemptContext: TaskAttemptContext): Unit
def abortJob(jobContext: JobContext, state: JobStatus.State): Unit
def isRecoverySupported: Boolean
def isRecoverySupported(jobContext: JobContext): Boolean
def hasCapability(capability: String): Booleanclass AbortableStreamBasedFSDataOutputStream(
fsDataOutputStream: FSDataOutputStream,
fc: FileContext,
path: Path,
overwriteIfPossible: Boolean
) extends CancellableFSDataOutputStream(fsDataOutputStream)
def cancel(): Unit
def close(): Unit
def toString(): Stringobject PathOutputCommitProtocol {
val REJECT_FILE_OUTPUT: String = "pathoutputcommit.reject.fileoutput"
val REJECT_FILE_OUTPUT_DEFVAL: Boolean = false
val CAPABILITY_DYNAMIC_PARTITIONING: String = "mapreduce.job.committer.dynamic.partitioning"
val OUTPUTCOMMITTER_FACTORY_SCHEME: String = "mapreduce.outputcommitter.factory.scheme"
}// From Spark
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
import org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
import org.apache.spark.internal.io.FileNameSpec
// From Hadoop
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileContext, FSDataOutputStream, StreamCapabilities}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, JobContext, JobStatus}
import org.apache.hadoop.mapreduce.lib.output.{PathOutputCommitter, PathOutputCommitterFactory}
// From Parquet
import org.apache.parquet.hadoop.ParquetOutputCommitterFor AWS S3:
conf.set("fs.s3a.access.key", "your-access-key")
conf.set("fs.s3a.secret.key", "your-secret-key")
conf.set("mapreduce.outputcommitter.factory.scheme.s3a",
"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")For Google Cloud Storage:
conf.set("fs.gs.project.id", "your-project-id")
conf.set("google.cloud.auth.service.account.json.keyfile", "/path/to/keyfile.json")For Azure Blob Storage:
conf.set("fs.azure.account.key.youraccount.blob.core.windows.net", "your-account-key")Environment variables for integration tests:
S3A_PATH: S3 bucket path for testingAWS_ACCESS_KEY_ID: AWS access keyAWS_SECRET_ACCESS_KEY: AWS secret keyAWS_SESSION_TOKEN: Optional AWS session tokenThe library handles common cloud storage errors:
All committers include error recovery mechanisms and proper cleanup of partial writes during failures.