or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-hadoop-cloud_2-13

Hadoop cloud integration capabilities for Apache Spark, enabling seamless interaction with cloud storage systems

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-hadoop-cloud_2.13@4.0.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-hadoop-cloud_2-13@4.0.0

0

# Spark Hadoop Cloud Integration

1

2

The `spark-hadoop-cloud_2.13` package provides internal cloud storage integration capabilities for Apache Spark. This package contains Hadoop JARs and transitive dependencies needed to interact with cloud infrastructures like AWS S3, Google Cloud Storage, and Azure storage systems.

3

4

**Important**: This package contains only internal implementation components in the `org.apache.spark.internal.*` package namespace. These are not intended for direct external use and are subject to change without notice.

5

6

## Package Information

7

8

- **Package Name**: spark-hadoop-cloud_2.13

9

- **Package Type**: maven

10

- **Language**: Scala (with Java dependencies)

11

- **Group ID**: org.apache.spark

12

- **Artifact ID**: spark-hadoop-cloud_2.13

13

- **Version**: 4.0.0

14

15

## Installation

16

17

As a Maven dependency:

18

19

```xml

20

<dependency>

21

<groupId>org.apache.spark</groupId>

22

<artifactId>spark-hadoop-cloud_2.13</artifactId>

23

<version>4.0.0</version>

24

</dependency>

25

```

26

27

For SBT:

28

29

```scala

30

libraryDependencies += "org.apache.spark" %% "spark-hadoop-cloud" % "4.0.0"

31

```

32

33

## Core Components

34

35

This package provides three main internal components that extend Spark's capabilities for cloud storage integration:

36

37

### Cloud Checkpoint File Manager

38

39

Provides atomic checkpoint file operations for Spark streaming applications using abortable streams.

40

41

```scala { .api }

42

class AbortableStreamBasedCheckpointFileManager(

43

path: org.apache.hadoop.fs.Path,

44

hadoopConf: org.apache.hadoop.conf.Configuration

45

) extends AbstractFileContextBasedCheckpointFileManager

46

```

47

48

### Path Output Commit Protocol

49

50

Implements commit protocols for cloud storage systems using Hadoop's PathOutputCommitter framework.

51

52

```scala { .api }

53

class PathOutputCommitProtocol(

54

jobId: String,

55

dest: String,

56

dynamicPartitionOverwrite: Boolean = false

57

) extends HadoopMapReduceCommitProtocol with Serializable

58

```

59

60

### Binding Parquet Output Committer

61

62

Dynamically binds Parquet operations to cloud-specific output committers.

63

64

```scala { .api }

65

class BindingParquetOutputCommitter(

66

path: org.apache.hadoop.fs.Path,

67

context: org.apache.hadoop.mapreduce.TaskAttemptContext

68

) extends org.apache.parquet.hadoop.ParquetOutputCommitter

69

```

70

71

## Cloud Storage Integration

72

73

### Supported Cloud Providers

74

75

This package includes dependencies and integration for:

76

77

- **AWS S3**: Via `hadoop-aws` and AWS SDK v2

78

- **Google Cloud Storage**: Via `gcs-connector`

79

80

### Key Features

81

82

- **Atomic Operations**: Ensures data consistency through abortable stream operations

83

- **Dynamic Partitioning**: Supports dynamic partition overwrite for compatible storage systems

84

- **Integration Testing**: Includes capabilities for testing against real cloud infrastructure

85

- **Factory-Based Committers**: Uses Hadoop's PathOutputCommitterFactory for extensibility

86

87

## Capabilities

88

89

### Abortable Stream-Based Checkpoint Management

90

91

Manages checkpoint files with atomic write operations and abort capabilities for cloud storage systems.

92

93

```scala { .api }

94

// Constructor

95

class AbortableStreamBasedCheckpointFileManager(

96

path: org.apache.hadoop.fs.Path,

97

hadoopConf: org.apache.hadoop.conf.Configuration

98

) extends AbstractFileContextBasedCheckpointFileManager with Logging

99

100

// Key methods

101

def createAtomic(

102

path: org.apache.hadoop.fs.Path,

103

overwriteIfPossible: Boolean

104

): org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream

105

106

// Inner class for cancellable stream operations

107

class AbortableStreamBasedFSDataOutputStream(

108

fsDataOutputStream: org.apache.hadoop.fs.FSDataOutputStream,

109

fc: org.apache.hadoop.fs.FileContext,

110

path: org.apache.hadoop.fs.Path,

111

overwriteIfPossible: Boolean

112

) extends CancellableFSDataOutputStream {

113

def cancel(): Unit

114

def close(): Unit

115

}

116

```

117

118

**Requirements**: The filesystem must support `CommonPathCapabilities.ABORTABLE_STREAM`.

119

120

### Path Output Commit Protocol

121

122

Provides Spark commit protocol implementation for cloud storage systems with proper job and task lifecycle management.

123

124

```scala { .api }

125

// Constructor

126

class PathOutputCommitProtocol(

127

jobId: String,

128

dest: String,

129

dynamicPartitionOverwrite: Boolean = false

130

) extends HadoopMapReduceCommitProtocol with Serializable

131

132

// Key methods

133

protected def setupCommitter(

134

context: org.apache.hadoop.mapreduce.TaskAttemptContext

135

): org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter

136

137

def newTaskTempFile(

138

taskContext: org.apache.hadoop.mapreduce.TaskAttemptContext,

139

dir: Option[String],

140

spec: org.apache.spark.internal.io.FileNameSpec

141

): String

142

143

def newTaskTempFileAbsPath(

144

taskContext: org.apache.hadoop.mapreduce.TaskAttemptContext,

145

absoluteDir: String,

146

spec: org.apache.spark.internal.io.FileNameSpec

147

): String

148

149

// Configuration constants

150

val REJECT_FILE_OUTPUT: String = "pathoutputcommit.reject.fileoutput"

151

val REJECT_FILE_OUTPUT_DEFVAL: Boolean = false

152

val CAPABILITY_DYNAMIC_PARTITIONING: String = "mapreduce.job.committer.dynamic.partitioning"

153

val OUTPUTCOMMITTER_FACTORY_SCHEME: String = "mapreduce.outputcommitter.factory.scheme"

154

```

155

156

### Binding Parquet Output Committer

157

158

Enables Parquet files to work with any PathOutputCommitter implementation, not just ParquetOutputCommitter subclasses.

159

160

```scala { .api }

161

// Constructor

162

class BindingParquetOutputCommitter(

163

path: org.apache.hadoop.fs.Path,

164

context: org.apache.hadoop.mapreduce.TaskAttemptContext

165

) extends ParquetOutputCommitter with Logging with StreamCapabilities

166

167

// Key methods

168

def boundCommitter(): org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter

169

def getWorkPath(): org.apache.hadoop.fs.Path

170

def setupTask(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Unit

171

def commitTask(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Unit

172

def abortTask(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Unit

173

def setupJob(jobContext: org.apache.hadoop.mapreduce.JobContext): Unit

174

def commitJob(jobContext: org.apache.hadoop.mapreduce.JobContext): Unit

175

def abortJob(jobContext: org.apache.hadoop.mapreduce.JobContext, state: org.apache.hadoop.mapreduce.JobStatus.State): Unit

176

def needsTaskCommit(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Boolean

177

def cleanupJob(jobContext: org.apache.hadoop.mapreduce.JobContext): Unit

178

def isCommitJobRepeatable(jobContext: org.apache.hadoop.mapreduce.JobContext): Boolean

179

def recoverTask(taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext): Unit

180

def isRecoverySupported: Boolean

181

def isRecoverySupported(jobContext: org.apache.hadoop.mapreduce.JobContext): Boolean

182

def hasCapability(capability: String): Boolean

183

```

184

185

## Types

186

187

### Core Hadoop Types

188

189

```scala { .api }

190

// From org.apache.hadoop.fs

191

type Path = org.apache.hadoop.fs.Path

192

type Configuration = org.apache.hadoop.conf.Configuration

193

type FSDataOutputStream = org.apache.hadoop.fs.FSDataOutputStream

194

type FileContext = org.apache.hadoop.fs.FileContext

195

196

// From org.apache.hadoop.mapreduce

197

type TaskAttemptContext = org.apache.hadoop.mapreduce.TaskAttemptContext

198

type JobContext = org.apache.hadoop.mapreduce.JobContext

199

type JobStatus = org.apache.hadoop.mapreduce.JobStatus

200

201

// From org.apache.hadoop.mapreduce.lib.output

202

type PathOutputCommitter = org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter

203

204

// From org.apache.parquet.hadoop

205

type ParquetOutputCommitter = org.apache.parquet.hadoop.ParquetOutputCommitter

206

207

// Spark internal types

208

type FileNameSpec = org.apache.spark.internal.io.FileNameSpec

209

type CancellableFSDataOutputStream = org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream

210

type AbstractFileContextBasedCheckpointFileManager = org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager

211

type HadoopMapReduceCommitProtocol = org.apache.spark.internal.io.HadoopMapReduceCommitProtocol

212

213

// From org.apache.hadoop.fs

214

type StreamCapabilities = org.apache.hadoop.fs.StreamCapabilities

215

216

// From org.apache.spark.internal

217

type Logging = org.apache.spark.internal.Logging

218

```

219

220

## Integration Testing

221

222

The package supports integration testing against real cloud infrastructure through the `IntegrationTestSuite` tag. Tests can be configured with environment variables:

223

224

### AWS S3 Testing

225

- `S3A_PATH`: S3 bucket path for testing

226

- `AWS_ACCESS_KEY_ID`: AWS access key

227

- `AWS_SECRET_ACCESS_KEY`: AWS secret key

228

- `AWS_SESSION_TOKEN`: Optional session token

229

- `AWS_ENDPOINT_URL`: Optional custom endpoint

230

231

### Running Integration Tests

232

233

```bash

234

mvn test -Pintegration-test

235

```

236

237

## Architecture Notes

238

239

This package serves as a bridge between Spark's internal streaming and batch processing engines and cloud storage systems. It:

240

241

1. **Extends Spark's checkpoint capabilities** to support cloud storage with atomic operations

242

2. **Provides pluggable commit protocols** that work with various cloud storage committers

243

3. **Enables Parquet integration** with cloud-specific optimizations

244

4. **Bundles necessary dependencies** for cloud storage access in a single artifact

245

246

The package is designed to be used internally by Spark's SQL engine, Structured Streaming, and batch processing components when writing to cloud storage systems. It ensures data consistency and reliability through proven Hadoop cloud storage patterns while maintaining compatibility with Spark's distributed processing model.