or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-avro-2-12

Apache Spark Avro connector provides seamless integration between Apache Spark SQL and Apache Avro data format with automatic schema evolution support and built-in compression capabilities

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-avro_2.12@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-avro-2-12@3.5.0

0

# Apache Spark Avro Connector

1

2

Apache Spark Avro connector provides seamless integration between Apache Spark SQL and Apache Avro data format, enabling efficient reading, writing, and processing of Avro files with automatic schema evolution support and built-in compression capabilities.

3

4

## Package Information

5

6

- **Package Name**: spark-avro_2.12

7

- **Package Type**: Maven

8

- **Coordinate**: org.apache.spark:spark-avro_2.12

9

- **Version**: 3.5.6

10

- **Language**: Scala (with Java interoperability)

11

- **Installation**: Add to your build.gradle or pom.xml:

12

13

Maven:

14

```xml

15

<dependency>

16

<groupId>org.apache.spark</groupId>

17

<artifactId>spark-avro_2.12</artifactId>

18

<version>3.5.6</version>

19

</dependency>

20

```

21

22

SBT:

23

```scala

24

libraryDependencies += "org.apache.spark" %% "spark-avro" % "3.5.6"

25

```

26

27

## Core Imports

28

29

```scala

30

import org.apache.spark.sql.avro.functions._

31

import org.apache.spark.sql.SparkSession

32

import org.apache.spark.sql.DataFrame

33

```

34

35

For DataSource API:

36

```scala

37

import org.apache.spark.sql.avro.AvroOptions

38

```

39

40

## Basic Usage

41

42

### Reading Avro Files

43

44

```scala

45

import org.apache.spark.sql.SparkSession

46

47

val spark = SparkSession.builder()

48

.appName("AvroExample")

49

.getOrCreate()

50

51

// Read Avro files using DataSource API

52

val df = spark.read

53

.format("avro")

54

.load("path/to/avro/files")

55

56

df.show()

57

```

58

59

### Writing Avro Files

60

61

```scala

62

// Write DataFrame to Avro format

63

df.write

64

.format("avro")

65

.option("compression", "snappy")

66

.save("path/to/output")

67

```

68

69

### Processing Binary Avro Data

70

71

```scala

72

import org.apache.spark.sql.avro.functions._

73

import org.apache.spark.sql.functions._

74

75

// Convert binary Avro column to structured data

76

val schema = """

77

{

78

"type": "record",

79

"name": "User",

80

"fields": [

81

{"name": "name", "type": "string"},

82

{"name": "age", "type": "int"}

83

]

84

}

85

"""

86

87

val decodedDF = df.select(

88

from_avro(col("avro_bytes"), schema).as("user_data")

89

)

90

91

// Convert structured data to binary Avro

92

val encodedDF = df.select(

93

to_avro(struct(col("name"), col("age"))).as("avro_bytes")

94

)

95

```

96

97

## Architecture

98

99

The Spark Avro connector is built around several key components:

100

101

- **DataSource Integration**: Native Spark SQL DataSource v1 and v2 implementations for seamless file I/O

102

- **Function API**: SQL functions for binary Avro data transformation (`from_avro`, `to_avro`)

103

- **Schema Conversion**: Bidirectional schema mapping between Avro and Spark SQL data types

104

- **Serialization Engine**: High-performance Catalyst expression-based serializers and deserializers

105

- **Configuration System**: Comprehensive options for compression, schema evolution, and compatibility settings

106

107

## Capabilities

108

109

### File I/O Operations

110

111

DataSource API integration for reading and writing Avro files directly through Spark SQL with automatic schema inference and support for partitioned datasets.

112

113

```scala { .api }

114

// DataSource read/write through format("avro")

115

spark.read.format("avro"): DataFrameReader

116

DataFrame.write.format("avro"): DataFrameWriter[Row]

117

```

118

119

[File Operations](./file-operations.md)

120

121

### Binary Data Processing

122

123

Core functions for converting between binary Avro data and Spark SQL structures, enabling processing of Avro-encoded columns within DataFrames.

124

125

```scala { .api }

126

def from_avro(data: Column, jsonFormatSchema: String): Column

127

def from_avro(data: Column, jsonFormatSchema: String, options: java.util.Map[String, String]): Column

128

def to_avro(data: Column): Column

129

def to_avro(data: Column, jsonFormatSchema: String): Column

130

```

131

132

[Binary Data Functions](./binary-functions.md)

133

134

### Schema Conversion

135

136

Utilities for converting between Avro schemas and Spark SQL schemas, supporting complex nested types and logical type mappings.

137

138

```scala { .api }

139

object SchemaConverters {

140

case class SchemaType(dataType: DataType, nullable: Boolean)

141

142

def toSqlType(avroSchema: Schema): SchemaType

143

def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType

144

def toAvroType(

145

catalystType: DataType,

146

nullable: Boolean,

147

recordName: String,

148

nameSpace: String

149

): Schema

150

}

151

```

152

153

[Schema Conversion](./schema-conversion.md)

154

155

### Configuration Options

156

157

Comprehensive configuration system for controlling Avro processing behavior including compression, schema evolution, field matching, and error handling.

158

159

```scala { .api }

160

object AvroOptions {

161

val COMPRESSION: String

162

val RECORD_NAME: String

163

val RECORD_NAMESPACE: String

164

val AVRO_SCHEMA: String

165

val AVRO_SCHEMA_URL: String

166

val POSITIONAL_FIELD_MATCHING: String

167

val DATETIME_REBASE_MODE: String

168

169

def apply(parameters: Map[String, String]): AvroOptions

170

}

171

```

172

173

[Configuration](./configuration.md)

174

175

### Utility Functions

176

177

Core utility functions for schema inference, type validation, and write preparation.

178

179

```scala { .api }

180

object AvroUtils {

181

def inferSchema(

182

spark: SparkSession,

183

options: Map[String, String],

184

files: Seq[FileStatus]

185

): Option[StructType]

186

187

def supportsDataType(dataType: DataType): Boolean

188

189

def prepareWrite(

190

sqlConf: SQLConf,

191

job: Job,

192

options: Map[String, String],

193

dataSchema: StructType

194

): OutputWriterFactory

195

}

196

```

197

198

## Types

199

200

### Core Expression Types

201

202

```scala { .api }

203

class AvroDataToCatalyst(

204

child: Expression,

205

jsonFormatSchema: String,

206

options: Map[String, String]

207

) extends UnaryExpression

208

209

class CatalystDataToAvro(

210

child: Expression,

211

jsonFormatSchema: Option[String]

212

) extends UnaryExpression

213

```

214

215

### Option Types

216

217

```scala { .api }

218

class AvroOptions(

219

parameters: CaseInsensitiveMap[String],

220

conf: Configuration

221

) extends FileSourceOptions(parameters) {

222

val schema: Option[Schema]

223

val positionalFieldMatching: Boolean

224

val recordName: String

225

val recordNamespace: String

226

val compression: String

227

val parseMode: ParseMode

228

val datetimeRebaseModeInRead: String

229

}

230

```

231

232

### Utility Types

233

234

```scala { .api }

235

// Note: Part of AvroUtils object

236

case class AvroMatchedField(

237

catalystField: StructField,

238

catalystPosition: Int,

239

avroField: Schema.Field

240

)

241

242

class AvroSchemaHelper(

243

avroSchema: Schema,

244

catalystSchema: StructType,

245

avroPath: Seq[String],

246

catalystPath: Seq[String],

247

positionalFieldMatch: Boolean

248

) {

249

val matchedFields: Seq[AvroMatchedField]

250

def getAvroField(fieldName: String, catalystPos: Int): Option[Schema.Field]

251

def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit

252

def validateNoExtraAvroFields(): Unit

253

}

254

```

255

256

## Error Handling

257

258

The connector throws specific exceptions for schema incompatibilities and processing errors:

259

260

```scala { .api }

261

// Private to avro package

262

private[avro] class IncompatibleSchemaException(

263

msg: String,

264

ex: Throwable = null

265

) extends Exception(msg, ex)

266

267

private[avro] class UnsupportedAvroTypeException(msg: String) extends Exception(msg)

268

```

269

270

Common error scenarios:

271

- Schema mismatch between Avro and Spark SQL schemas

272

- Invalid JSON schema format

273

- Unsupported data type conversions

274

- Corrupt or unreadable Avro files

275

276

## Deprecated Functionality

277

278

### Package Object Functions (Deprecated since 3.0.0)

279

280

The package object provides deprecated convenience functions that delegate to the main functions API:

281

282

```scala { .api }

283

package org.apache.spark.sql.avro {

284

@deprecated("Please use 'org.apache.spark.sql.avro.functions.from_avro' instead.", "3.0.0")

285

def from_avro(data: Column, jsonFormatSchema: String): Column

286

287

@deprecated("Please use 'org.apache.spark.sql.avro.functions.to_avro' instead.", "3.0.0")

288

def to_avro(data: Column): Column

289

}

290

```

291

292

**Migration Example:**

293

```scala

294

// Old deprecated usage

295

import org.apache.spark.sql.avro._

296

val result = from_avro(col("data"), schema)

297

298

// New recommended usage

299

import org.apache.spark.sql.avro.functions._

300

val result = from_avro(col("data"), schema)

301

```

302

303

### Java Integration Components

304

305

For Java-based MapReduce integrations:

306

307

```scala { .api }

308

class SparkAvroKeyOutputFormat extends OutputFormat[AvroKey[GenericRecord], NullWritable]

309

```