Hadoop cloud integration capabilities for Apache Spark, enabling seamless interaction with cloud storage systems
npx @tessl/cli install tessl/maven-org-apache-spark--spark-hadoop-cloud_2-13@4.0.00
# Spark Hadoop Cloud Integration
1
2
The `spark-hadoop-cloud_2.13` package provides internal cloud storage integration capabilities for Apache Spark. This package contains Hadoop JARs and transitive dependencies needed to interact with cloud infrastructures like AWS S3, Google Cloud Storage, and Azure storage systems.
3
4
**Important**: This package contains only internal implementation components in the `org.apache.spark.internal.*` package namespace. These are not intended for direct external use and are subject to change without notice.
5
6
## Package Information
7
8
- **Package Name**: spark-hadoop-cloud_2.13
9
- **Package Type**: maven
10
- **Language**: Scala (with Java dependencies)
11
- **Group ID**: org.apache.spark
12
- **Artifact ID**: spark-hadoop-cloud_2.13
13
- **Version**: 4.0.0
14
15
## Installation
16
17
As a Maven dependency:
18
19
```xml
20
<dependency>
21
<groupId>org.apache.spark</groupId>
22
<artifactId>spark-hadoop-cloud_2.13</artifactId>
23
<version>4.0.0</version>
24
</dependency>
25
```
26
27
For SBT:
28
29
```scala
30
libraryDependencies += "org.apache.spark" %% "spark-hadoop-cloud" % "4.0.0"
31
```
32
33
## Core Components
34
35
This package provides three main internal components that extend Spark's capabilities for cloud storage integration:
36
37
### Cloud Checkpoint File Manager
38
39
Provides atomic checkpoint file operations for Spark streaming applications using abortable streams.
40
41
```scala { .api }
42
class AbortableStreamBasedCheckpointFileManager(
43
path: org.apache.hadoop.fs.Path,
44
hadoopConf: org.apache.hadoop.conf.Configuration
45
) extends AbstractFileContextBasedCheckpointFileManager
46
```
47
48
### Path Output Commit Protocol
49
50
Implements commit protocols for cloud storage systems using Hadoop's PathOutputCommitter framework.
51
52
```scala { .api }
53
class PathOutputCommitProtocol(
54
jobId: String,
55
dest: String,
56
dynamicPartitionOverwrite: Boolean = false
57
) extends HadoopMapReduceCommitProtocol with Serializable
58
```
59
60
### Binding Parquet Output Committer
61
62
Dynamically binds Parquet operations to cloud-specific output committers.
63
64
```scala { .api }
65
class BindingParquetOutputCommitter(
66
path: org.apache.hadoop.fs.Path,
67
context: org.apache.hadoop.mapreduce.TaskAttemptContext
68
) extends org.apache.parquet.hadoop.ParquetOutputCommitter
69
```
70
71
## Cloud Storage Integration
72
73
### Supported Cloud Providers
74
75
This package includes dependencies and integration for:
76
77
- **AWS S3**: Via `hadoop-aws` and AWS SDK v2
78
- **Google Cloud Storage**: Via `gcs-connector`
79
80
### Key Features
81
82
- **Atomic Operations**: Ensures data consistency through abortable stream operations
83
- **Dynamic Partitioning**: Supports dynamic partition overwrite for compatible storage systems
84
- **Integration Testing**: Includes capabilities for testing against real cloud infrastructure
85
- **Factory-Based Committers**: Uses Hadoop's PathOutputCommitterFactory for extensibility
86
87
## Capabilities
88
89
### Abortable Stream-Based Checkpoint Management
90
91
Manages checkpoint files with atomic write operations and abort capabilities for cloud storage systems.
92
93
```scala { .api }
94
// Constructor
95
class AbortableStreamBasedCheckpointFileManager(
96
path: org.apache.hadoop.fs.Path,
97
hadoopConf: org.apache.hadoop.conf.Configuration
98
) extends AbstractFileContextBasedCheckpointFileManager with Logging
99
100
// Key methods
101
def createAtomic(
102
path: org.apache.hadoop.fs.Path,
103
overwriteIfPossible: Boolean
104
): org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
105
106
// Inner class for cancellable stream operations
107
class AbortableStreamBasedFSDataOutputStream(
108
fsDataOutputStream: org.apache.hadoop.fs.FSDataOutputStream,
109
fc: org.apache.hadoop.fs.FileContext,
110
path: org.apache.hadoop.fs.Path,
111
overwriteIfPossible: Boolean
112
) extends CancellableFSDataOutputStream {
113
def cancel(): Unit
114
def close(): Unit
115
}
116
```
117
118
**Requirements**: The filesystem must support `CommonPathCapabilities.ABORTABLE_STREAM`.
119
120
### Path Output Commit Protocol
121
122
Provides Spark commit protocol implementation for cloud storage systems with proper job and task lifecycle management.
123
124
```scala { .api }
125
// Constructor
126
class PathOutputCommitProtocol(
127
jobId: String,
128
dest: String,
129
dynamicPartitionOverwrite: Boolean = false
130
) extends HadoopMapReduceCommitProtocol with Serializable
131
132
// Key methods
133
protected def setupCommitter(
134
context: org.apache.hadoop.mapreduce.TaskAttemptContext
135
): org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter
136
137
def newTaskTempFile(
138
taskContext: org.apache.hadoop.mapreduce.TaskAttemptContext,
139
dir: Option[String],
140
spec: org.apache.spark.internal.io.FileNameSpec
141
): String
142
143
def newTaskTempFileAbsPath(
144
taskContext: org.apache.hadoop.mapreduce.TaskAttemptContext,
145
absoluteDir: String,
146
spec: org.apache.spark.internal.io.FileNameSpec
147
): String
148
149
// Configuration constants
150
val REJECT_FILE_OUTPUT: String = "pathoutputcommit.reject.fileoutput"
151
val REJECT_FILE_OUTPUT_DEFVAL: Boolean = false
152
val CAPABILITY_DYNAMIC_PARTITIONING: String = "mapreduce.job.committer.dynamic.partitioning"
153
val OUTPUTCOMMITTER_FACTORY_SCHEME: String = "mapreduce.outputcommitter.factory.scheme"
154
```
155
156
### Binding Parquet Output Committer
157
158
Enables Parquet files to work with any PathOutputCommitter implementation, not just ParquetOutputCommitter subclasses.
159
160
```scala { .api }
161
// Constructor
162
class BindingParquetOutputCommitter(
163
path: org.apache.hadoop.fs.Path,
164
context: org.apache.hadoop.mapreduce.TaskAttemptContext
165
) extends ParquetOutputCommitter with Logging with StreamCapabilities
166
167
// Key methods
168
def boundCommitter(): org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter
169
def getWorkPath(): org.apache.hadoop.fs.Path
170
def setupTask(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Unit
171
def commitTask(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Unit
172
def abortTask(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Unit
173
def setupJob(jobContext: org.apache.hadoop.mapreduce.JobContext): Unit
174
def commitJob(jobContext: org.apache.hadoop.mapreduce.JobContext): Unit
175
def abortJob(jobContext: org.apache.hadoop.mapreduce.JobContext, state: org.apache.hadoop.mapreduce.JobStatus.State): Unit
176
def needsTaskCommit(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Boolean
177
def cleanupJob(jobContext: org.apache.hadoop.mapreduce.JobContext): Unit
178
def isCommitJobRepeatable(jobContext: org.apache.hadoop.mapreduce.JobContext): Boolean
179
def recoverTask(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Unit
180
def isRecoverySupported: Boolean
181
def isRecoverySupported(jobContext: org.apache.hadoop.mapreduce.JobContext): Boolean
182
def hasCapability(capability: String): Boolean
183
```
184
185
## Types
186
187
### Core Hadoop Types
188
189
```scala { .api }
190
// From org.apache.hadoop.fs
191
type Path = org.apache.hadoop.fs.Path
192
type Configuration = org.apache.hadoop.conf.Configuration
193
type FSDataOutputStream = org.apache.hadoop.fs.FSDataOutputStream
194
type FileContext = org.apache.hadoop.fs.FileContext
195
196
// From org.apache.hadoop.mapreduce
197
type TaskAttemptContext = org.apache.hadoop.mapreduce.TaskAttemptContext
198
type JobContext = org.apache.hadoop.mapreduce.JobContext
199
type JobStatus = org.apache.hadoop.mapreduce.JobStatus
200
201
// From org.apache.hadoop.mapreduce.lib.output
202
type PathOutputCommitter = org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter
203
204
// From org.apache.parquet.hadoop
205
type ParquetOutputCommitter = org.apache.parquet.hadoop.ParquetOutputCommitter
206
207
// Spark internal types
208
type FileNameSpec = org.apache.spark.internal.io.FileNameSpec
209
type CancellableFSDataOutputStream = org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
210
type AbstractFileContextBasedCheckpointFileManager = org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
211
type HadoopMapReduceCommitProtocol = org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
212
213
// From org.apache.hadoop.fs
214
type StreamCapabilities = org.apache.hadoop.fs.StreamCapabilities
215
216
// From org.apache.spark.internal
217
type Logging = org.apache.spark.internal.Logging
218
```
219
220
## Integration Testing
221
222
The package supports integration testing against real cloud infrastructure through the `IntegrationTestSuite` tag. Tests can be configured with environment variables:
223
224
### AWS S3 Testing
225
- `S3A_PATH`: S3 bucket path for testing
226
- `AWS_ACCESS_KEY_ID`: AWS access key
227
- `AWS_SECRET_ACCESS_KEY`: AWS secret key
228
- `AWS_SESSION_TOKEN`: Optional session token
229
- `AWS_ENDPOINT_URL`: Optional custom endpoint
230
231
### Running Integration Tests
232
233
```bash
234
mvn test -Pintegration-test
235
```
236
237
## Architecture Notes
238
239
This package serves as a bridge between Spark's internal streaming and batch processing engines and cloud storage systems. It:
240
241
1. **Extends Spark's checkpoint capabilities** to support cloud storage with atomic operations
242
2. **Provides pluggable commit protocols** that work with various cloud storage committers
243
3. **Enables Parquet integration** with cloud-specific optimizations
244
4. **Bundles necessary dependencies** for cloud storage access in a single artifact
245
246
The package is designed to be used internally by Spark's SQL engine, Structured Streaming, and batch processing components when writing to cloud storage systems. It ensures data consistency and reliability through proven Hadoop cloud storage patterns while maintaining compatibility with Spark's distributed processing model.