or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-sources-sinks.mdexecution-environment.mdgrouping-aggregation.mdhadoop-integration.mdindex.mditerations.mdjoins-cogroups.mdtransformations.mdtype-system.md

hadoop-integration.mddocs/

0

# Hadoop Integration

1

2

Apache Flink Scala API provides native integration with Hadoop MapReduce and MapRed input/output formats, enabling seamless interoperability with existing Hadoop-based data processing pipelines and file systems.

3

4

## Hadoop Input Formats

5

6

### MapReduce Input Formats

7

8

Integration with the newer Hadoop MapReduce API (org.apache.hadoop.mapreduce).

9

10

```scala { .api }

11

class ExecutionEnvironment {

12

// Read using MapReduce InputFormat

13

def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](

14

inputFormat: MapreduceInputFormat[K, V],

15

keyClass: Class[K],

16

valueClass: Class[V],

17

inputPath: String

18

): DataSet[(K, V)]

19

20

// Read with job configuration

21

def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](

22

inputFormat: MapreduceInputFormat[K, V],

23

keyClass: Class[K],

24

valueClass: Class[V],

25

inputPath: String,

26

job: Job

27

): DataSet[(K, V)]

28

}

29

```

30

31

### MapRed Input Formats

32

33

Integration with the legacy Hadoop MapRed API (org.apache.hadoop.mapred).

34

35

```scala { .api }

36

class ExecutionEnvironment {

37

// Read using MapRed InputFormat

38

def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](

39

inputFormat: MapredInputFormat[K, V],

40

keyClass: Class[K],

41

valueClass: Class[V],

42

inputPath: String

43

): DataSet[(K, V)]

44

45

// Read with job configuration

46

def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](

47

inputFormat: MapredInputFormat[K, V],

48

keyClass: Class[K],

49

valueClass: Class[V],

50

inputPath: String,

51

jobConf: JobConf

52

): DataSet[(K, V)]

53

}

54

```

55

56

## Hadoop Output Formats

57

58

### MapReduce Output Formats

59

60

```scala { .api }

61

// Hadoop MapReduce output format wrapper

62

class HadoopOutputFormat[K, V](

63

outputFormat: MapreduceOutputFormat[K, V],

64

job: Job

65

) extends OutputFormat[(K, V)]

66

67

// Usage in DataSet

68

class DataSet[(K, V)] {

69

def writeUsingOutputFormat(outputFormat: HadoopOutputFormat[K, V]): DataSink[(K, V)]

70

}

71

```

72

73

### MapRed Output Formats

74

75

```scala { .api }

76

// Hadoop MapRed output format wrapper

77

class HadoopOutputFormat[K, V](

78

outputFormat: MapredOutputFormat[K, V],

79

jobConf: JobConf

80

) extends OutputFormat[(K, V)]

81

```

82

83

## File Format Support

84

85

### Text Files

86

87

```scala { .api }

88

import org.apache.hadoop.io.{LongWritable, Text}

89

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

90

91

class ExecutionEnvironment {

92

// Read text files using Hadoop TextInputFormat

93

def readHadoopFile(

94

inputFormat: TextInputFormat,

95

keyClass: Class[LongWritable],

96

valueClass: Class[Text],

97

inputPath: String

98

): DataSet[(LongWritable, Text)]

99

}

100

```

101

102

### Sequence Files

103

104

```scala { .api }

105

import org.apache.hadoop.io.{IntWritable, Text}

106

import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat

107

108

class ExecutionEnvironment {

109

// Read sequence files

110

def readHadoopFile(

111

inputFormat: SequenceFileInputFormat[IntWritable, Text],

112

keyClass: Class[IntWritable],

113

valueClass: Class[Text],

114

inputPath: String

115

): DataSet[(IntWritable, Text)]

116

}

117

```

118

119

## Usage Examples

120

121

### Reading Text Files with MapReduce API

122

123

```scala

124

import org.apache.flink.api.scala._

125

import org.apache.hadoop.io.{LongWritable, Text}

126

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

127

import org.apache.hadoop.mapreduce.Job

128

129

val env = ExecutionEnvironment.getExecutionEnvironment

130

131

// Create job configuration

132

val job = Job.getInstance()

133

job.getConfiguration.set("mapreduce.input.fileinputformat.inputdir", "/path/to/input")

134

135

// Read text file using Hadoop TextInputFormat

136

val hadoopData = env.readHadoopFile(

137

new TextInputFormat(),

138

classOf[LongWritable],

139

classOf[Text],

140

"/path/to/input",

141

job

142

)

143

144

// Convert to Scala types and process

145

val lines = hadoopData.map { case (offset, text) => text.toString }

146

val words = lines.flatMap(_.split("\\s+"))

147

val wordCounts = words

148

.map((_, 1))

149

.groupBy(0)

150

.sum(1)

151

152

wordCounts.print()

153

```

154

155

### Reading Text Files with MapRed API

156

157

```scala

158

import org.apache.flink.api.scala._

159

import org.apache.hadoop.io.{LongWritable, Text}

160

import org.apache.hadoop.mapred.{TextInputFormat, JobConf}

161

162

val env = ExecutionEnvironment.getExecutionEnvironment

163

164

// Create job configuration

165

val jobConf = new JobConf()

166

jobConf.setInputFormat(classOf[TextInputFormat])

167

168

// Read using legacy MapRed API

169

val hadoopData = env.readHadoopFile(

170

new TextInputFormat(),

171

classOf[LongWritable],

172

classOf[Text],

173

"/path/to/input",

174

jobConf

175

)

176

177

// Process the data

178

val processedData = hadoopData.map { case (key, value) =>

179

s"Line at offset ${key.get()}: ${value.toString}"

180

}

181

182

processedData.writeAsText("/path/to/output")

183

```

184

185

### Reading Sequence Files

186

187

```scala

188

import org.apache.flink.api.scala._

189

import org.apache.hadoop.io.{IntWritable, Text}

190

import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat

191

import org.apache.hadoop.mapreduce.Job

192

193

val env = ExecutionEnvironment.getExecutionEnvironment

194

195

val job = Job.getInstance()

196

197

// Read sequence file

198

val sequenceData = env.readHadoopFile(

199

new SequenceFileInputFormat[IntWritable, Text](),

200

classOf[IntWritable],

201

classOf[Text],

202

"/path/to/sequence/files",

203

job

204

)

205

206

// Process sequence file data

207

val processedSequence = sequenceData.map { case (intKey, textValue) =>

208

(intKey.get(), textValue.toString.toUpperCase)

209

}

210

211

processedSequence.print()

212

```

213

214

### Custom Hadoop Input Format

215

216

```scala

217

import org.apache.flink.api.scala._

218

import org.apache.hadoop.io.{Writable, LongWritable}

219

import org.apache.hadoop.mapreduce.{InputFormat, InputSplit, RecordReader, TaskAttemptContext}

220

221

// Custom Writable class

222

class CustomWritable extends Writable {

223

var data: String = ""

224

225

def write(out: java.io.DataOutput): Unit = {

226

out.writeUTF(data)

227

}

228

229

def readFields(in: java.io.DataInput): Unit = {

230

data = in.readUTF()

231

}

232

}

233

234

// Custom InputFormat

235

class CustomInputFormat extends InputFormat[LongWritable, CustomWritable] {

236

def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[LongWritable, CustomWritable] = {

237

// Implementation details...

238

null

239

}

240

241

def getSplits(context: TaskAttemptContext): java.util.List[InputSplit] = {

242

// Implementation details...

243

null

244

}

245

}

246

247

// Usage

248

val env = ExecutionEnvironment.getExecutionEnvironment

249

250

val customData = env.readHadoopFile(

251

new CustomInputFormat(),

252

classOf[LongWritable],

253

classOf[CustomWritable],

254

"/path/to/custom/data"

255

)

256

257

val processed = customData.map { case (key, custom) =>

258

s"${key.get()}: ${custom.data}"

259

}

260

```

261

262

### Writing to Hadoop Output Formats

263

264

```scala

265

import org.apache.flink.api.scala._

266

import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat

267

import org.apache.hadoop.io.{LongWritable, Text}

268

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

269

import org.apache.hadoop.mapreduce.Job

270

271

val env = ExecutionEnvironment.getExecutionEnvironment

272

273

// Create some data

274

val data = env.fromElements(

275

(new LongWritable(1L), new Text("first line")),

276

(new LongWritable(2L), new Text("second line")),

277

(new LongWritable(3L), new Text("third line"))

278

)

279

280

// Configure Hadoop output

281

val job = Job.getInstance()

282

job.getConfiguration.set("mapreduce.output.fileoutputformat.outputdir", "/path/to/output")

283

284

val hadoopOutputFormat = new HadoopOutputFormat[LongWritable, Text](

285

new TextOutputFormat[LongWritable, Text](),

286

job

287

)

288

289

// Write using Hadoop output format

290

data.output(hadoopOutputFormat)

291

292

env.execute("Hadoop Output Example")

293

```

294

295

### Integrating with HDFS

296

297

```scala

298

import org.apache.flink.api.scala._

299

import org.apache.hadoop.conf.Configuration

300

import org.apache.hadoop.fs.{FileSystem, Path}

301

import org.apache.hadoop.io.{LongWritable, Text}

302

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

303

304

val env = ExecutionEnvironment.getExecutionEnvironment

305

306

// Configure HDFS access

307

val hadoopConf = new Configuration()

308

hadoopConf.set("fs.defaultFS", "hdfs://namenode:8020")

309

hadoopConf.set("hadoop.job.ugi", "username,groupname")

310

311

// Read from HDFS

312

val hdfsData = env.readHadoopFile(

313

new TextInputFormat(),

314

classOf[LongWritable],

315

classOf[Text],

316

"hdfs://namenode:8020/path/to/data"

317

)

318

319

// Process and write back to HDFS

320

val result = hdfsData.map { case (offset, text) =>

321

text.toString.toUpperCase

322

}

323

324

result.writeAsText("hdfs://namenode:8020/path/to/output")

325

326

env.execute("HDFS Integration Example")

327

```

328

329

### Parquet File Integration

330

331

```scala

332

import org.apache.flink.api.scala._

333

import org.apache.hadoop.mapreduce.Job

334

import org.apache.parquet.hadoop.ParquetInputFormat

335

import org.apache.parquet.hadoop.example.GroupReadSupport

336

import org.apache.parquet.example.data.Group

337

338

val env = ExecutionEnvironment.getExecutionEnvironment

339

340

// Configure Parquet reading

341

val job = Job.getInstance()

342

ParquetInputFormat.setReadSupportClass(job, classOf[GroupReadSupport])

343

344

// Read Parquet files (simplified example)

345

// Note: Actual Parquet integration requires additional setup

346

val parquetData = env.readHadoopFile(

347

new ParquetInputFormat[Group](),

348

classOf[Void],

349

classOf[Group],

350

"/path/to/parquet/files",

351

job

352

)

353

354

// Process Parquet data

355

val processedParquet = parquetData.map { case (_, group) =>

356

// Extract fields from Parquet Group

357

val field1 = group.getString("field1", 0)

358

val field2 = group.getInteger("field2", 0)

359

(field1, field2)

360

}

361

362

processedParquet.print()

363

```

364

365

### Configuration Best Practices

366

367

```scala

368

import org.apache.flink.api.scala._

369

import org.apache.hadoop.conf.Configuration

370

import org.apache.hadoop.mapreduce.Job

371

372

val env = ExecutionEnvironment.getExecutionEnvironment

373

374

// Create Hadoop configuration with custom settings

375

val hadoopConf = new Configuration()

376

377

// HDFS settings

378

hadoopConf.set("fs.defaultFS", "hdfs://namenode:8020")

379

hadoopConf.set("dfs.replication", "3")

380

381

// MapReduce settings

382

hadoopConf.set("mapreduce.job.reduces", "4")

383

hadoopConf.set("mapreduce.map.memory.mb", "2048")

384

385

// Security settings (if using Kerberos)

386

hadoopConf.set("hadoop.security.authentication", "kerberos")

387

hadoopConf.set("hadoop.security.authorization", "true")

388

389

// Create job with custom configuration

390

val job = Job.getInstance(hadoopConf)

391

392

// Use configuration in Flink operations

393

val data = env.readTextFile("hdfs://namenode:8020/input/data.txt")

394

// ... process data ...

395

data.writeAsText("hdfs://namenode:8020/output/results")

396

397

env.execute("Hadoop Configuration Example")

398

```

399

400

## Performance Considerations

401

402

### Optimization Tips

403

404

1. **Use Appropriate Input Splits**: Configure input split size for optimal parallelism

405

2. **Leverage Data Locality**: Ensure Flink can access Hadoop data locality information

406

3. **Configure Memory Settings**: Tune Hadoop and Flink memory settings for large datasets

407

4. **Use Compression**: Enable compression for better I/O performance with large files

408

5. **Monitor Serialization**: Be aware of Hadoop Writable serialization overhead

409

410

### Common Patterns

411

412

1. **ETL Pipelines**: Read from Hadoop sources, transform in Flink, write to Hadoop sinks

413

2. **Data Migration**: Move data between different Hadoop clusters or formats

414

3. **Hybrid Processing**: Combine Hadoop batch processing with Flink stream processing

415

4. **Legacy Integration**: Integrate Flink with existing Hadoop-based data workflows