or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-hadoop-cloud_2.13@3.5.x

docs

index.md
tile.json

tessl/maven-org-apache-spark--spark-hadoop-cloud_2-13

tessl install tessl/maven-org-apache-spark--spark-hadoop-cloud_2-13@3.5.0

Spark's cloud storage integration library providing checkpoint file management and output committers for AWS S3, Google Cloud Storage, and Azure Blob Storage.

index.mddocs/

Spark Hadoop Cloud Integration

Spark's cloud storage integration library providing specialized checkpoint file managers and output committers optimized for cloud storage systems. This library enables reliable data operations on AWS S3, Google Cloud Storage, and Azure Blob Storage through Hadoop cloud connectors with advanced features like abortable streams and path-based commit protocols.

Package Information

  • Package Name: spark-hadoop-cloud_2.13
  • Package Type: Maven
  • Language: Scala
  • Installation: Add to your Maven pom.xml:
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hadoop-cloud_2.13</artifactId>
      <version>3.5.6</version>
    </dependency>

For SBT:

libraryDependencies += "org.apache.spark" %% "spark-hadoop-cloud" % "3.5.6"

Core Imports

import org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager
import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
import org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

Basic Usage

Checkpoint File Management with Abortable Streams

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager

// Create checkpoint file manager for cloud storage with abortable stream support
val hadoopConf = new Configuration()
val checkpointPath = new Path("s3a://my-bucket/checkpoints/")

val checkpointManager = new AbortableStreamBasedCheckpointFileManager(
  checkpointPath,
  hadoopConf
)

// Create atomic output stream for writing checkpoint data
val outputStream = checkpointManager.createAtomic(
  path = new Path("s3a://my-bucket/checkpoints/checkpoint-001"),
  overwriteIfPossible = true
)

// Write data and close (or cancel if needed)
outputStream.write("checkpoint data".getBytes)
outputStream.close() // or outputStream.cancel() to abort

Path-Based Output Committing

import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
import org.apache.spark.sql.SparkSession

// Configure Spark to use PathOutputCommitProtocol for cloud storage
val spark = SparkSession.builder()
  .appName("CloudStorageApp")
  .config("spark.sql.sources.commitProtocolClass", 
          "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
  .config("mapreduce.outputcommitter.factory.scheme.s3a",
          "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
  .getOrCreate()

// Write DataFrame to S3 with reliable committing
df.write
  .mode("overwrite")
  .parquet("s3a://my-bucket/output/")

Parquet Output with Dynamic Committer Binding

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

// Create Parquet committer that binds to configured PathOutputCommitter
val outputPath = new Path("s3a://my-bucket/parquet-output/")
val parquetCommitter = new BindingParquetOutputCommitter(outputPath, taskContext)

// The committer automatically binds to the factory-configured committer
val boundCommitter = parquetCommitter.boundCommitter()

Architecture

Spark Hadoop Cloud Integration is built around three key components:

  • AbortableStreamBasedCheckpointFileManager: Provides fault-tolerant checkpoint operations using cloud filesystems' abortable stream capabilities
  • PathOutputCommitProtocol: Implements Spark's commit protocol using Hadoop's PathOutputCommitterFactory for reliable output operations
  • BindingParquetOutputCommitter: Extends Parquet's output committer to work with any PathOutputCommitter through dynamic binding

The library leverages Hadoop's cloud storage connectors and their advanced features like abortable streams and cloud-optimized committers to provide reliability guarantees for Spark workloads on cloud storage.

Capabilities

Checkpoint File Management

Fault-tolerant checkpoint file management for streaming workloads on cloud storage systems that support abortable streams.

class AbortableStreamBasedCheckpointFileManager(
  path: Path, 
  hadoopConf: Configuration
) extends AbstractFileContextBasedCheckpointFileManager(path, hadoopConf) with Logging

def createAtomic(
  path: Path, 
  overwriteIfPossible: Boolean
): CancellableFSDataOutputStream

Path-Based Output Committing

Spark commit protocol using Hadoop's PathOutputCommitterFactory API for reliable output operations with cloud-optimized committers.

class PathOutputCommitProtocol(
  jobId: String,
  dest: String,
  dynamicPartitionOverwrite: Boolean = false
) extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite) with Serializable

protected def setupCommitter(context: TaskAttemptContext): PathOutputCommitter

def newTaskTempFile(
  taskContext: TaskAttemptContext,
  dir: Option[String],
  spec: FileNameSpec
): String

def newTaskTempFileAbsPath(
  taskContext: TaskAttemptContext,
  absoluteDir: String,
  spec: FileNameSpec
): String

Parquet Output Committing

Parquet-specific output committer that dynamically binds to factory-configured PathOutputCommitters, supporting any cloud storage committer.

class BindingParquetOutputCommitter(
  path: Path,
  context: TaskAttemptContext
) extends ParquetOutputCommitter(path, context) with Logging with StreamCapabilities

private[cloud] def boundCommitter(): PathOutputCommitter
def getWorkPath(): Path
def setupTask(taskAttemptContext: TaskAttemptContext): Unit
def commitTask(taskAttemptContext: TaskAttemptContext): Unit
def abortTask(taskAttemptContext: TaskAttemptContext): Unit
def setupJob(jobContext: JobContext): Unit
def needsTaskCommit(taskAttemptContext: TaskAttemptContext): Boolean
def cleanupJob(jobContext: JobContext): Unit
def isCommitJobRepeatable(jobContext: JobContext): Boolean
def commitJob(jobContext: JobContext): Unit
def recoverTask(taskAttemptContext: TaskAttemptContext): Unit
def abortJob(jobContext: JobContext, state: JobStatus.State): Unit
def isRecoverySupported: Boolean
def isRecoverySupported(jobContext: JobContext): Boolean
def hasCapability(capability: String): Boolean

Types

AbortableStreamBasedFSDataOutputStream

class AbortableStreamBasedFSDataOutputStream(
  fsDataOutputStream: FSDataOutputStream,
  fc: FileContext,
  path: Path,
  overwriteIfPossible: Boolean
) extends CancellableFSDataOutputStream(fsDataOutputStream)

def cancel(): Unit
def close(): Unit  
def toString(): String

PathOutputCommitProtocol Constants

object PathOutputCommitProtocol {
  val REJECT_FILE_OUTPUT: String = "pathoutputcommit.reject.fileoutput"
  val REJECT_FILE_OUTPUT_DEFVAL: Boolean = false
  val CAPABILITY_DYNAMIC_PARTITIONING: String = "mapreduce.job.committer.dynamic.partitioning"
  val OUTPUTCOMMITTER_FACTORY_SCHEME: String = "mapreduce.outputcommitter.factory.scheme"
}

Key Imported Types

// From Spark
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
import org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
import org.apache.spark.internal.io.FileNameSpec

// From Hadoop
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileContext, FSDataOutputStream, StreamCapabilities}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, JobContext, JobStatus}
import org.apache.hadoop.mapreduce.lib.output.{PathOutputCommitter, PathOutputCommitterFactory}

// From Parquet
import org.apache.parquet.hadoop.ParquetOutputCommitter

Configuration

Required Hadoop Configuration

For AWS S3:

conf.set("fs.s3a.access.key", "your-access-key")
conf.set("fs.s3a.secret.key", "your-secret-key")
conf.set("mapreduce.outputcommitter.factory.scheme.s3a", 
         "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")

For Google Cloud Storage:

conf.set("fs.gs.project.id", "your-project-id")
conf.set("google.cloud.auth.service.account.json.keyfile", "/path/to/keyfile.json")

For Azure Blob Storage:

conf.set("fs.azure.account.key.youraccount.blob.core.windows.net", "your-account-key")

Integration Testing Configuration

Environment variables for integration tests:

  • S3A_PATH: S3 bucket path for testing
  • AWS_ACCESS_KEY_ID: AWS access key
  • AWS_SECRET_ACCESS_KEY: AWS secret key
  • AWS_SESSION_TOKEN: Optional AWS session token

Error Handling

The library handles common cloud storage errors:

  • UnsupportedFileSystemException: Thrown when filesystem doesn't support abortable streams
  • FileAlreadyExistsException: Thrown when attempting to create file that already exists without overwrite
  • IOException: Various I/O errors during cloud storage operations are logged and handled gracefully

All committers include error recovery mechanisms and proper cleanup of partial writes during failures.