0
# Hadoop Integration
1
2
Apache Flink Scala API provides native integration with Hadoop MapReduce and MapRed input/output formats, enabling seamless interoperability with existing Hadoop-based data processing pipelines and file systems.
3
4
## Hadoop Input Formats
5
6
### MapReduce Input Formats
7
8
Integration with the newer Hadoop MapReduce API (org.apache.hadoop.mapreduce).
9
10
```scala { .api }
11
class ExecutionEnvironment {
12
// Read using MapReduce InputFormat
13
def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
14
inputFormat: MapreduceInputFormat[K, V],
15
keyClass: Class[K],
16
valueClass: Class[V],
17
inputPath: String
18
): DataSet[(K, V)]
19
20
// Read with job configuration
21
def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
22
inputFormat: MapreduceInputFormat[K, V],
23
keyClass: Class[K],
24
valueClass: Class[V],
25
inputPath: String,
26
job: Job
27
): DataSet[(K, V)]
28
}
29
```
30
31
### MapRed Input Formats
32
33
Integration with the legacy Hadoop MapRed API (org.apache.hadoop.mapred).
34
35
```scala { .api }
36
class ExecutionEnvironment {
37
// Read using MapRed InputFormat
38
def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
39
inputFormat: MapredInputFormat[K, V],
40
keyClass: Class[K],
41
valueClass: Class[V],
42
inputPath: String
43
): DataSet[(K, V)]
44
45
// Read with job configuration
46
def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
47
inputFormat: MapredInputFormat[K, V],
48
keyClass: Class[K],
49
valueClass: Class[V],
50
inputPath: String,
51
jobConf: JobConf
52
): DataSet[(K, V)]
53
}
54
```
55
56
## Hadoop Output Formats
57
58
### MapReduce Output Formats
59
60
```scala { .api }
61
// Hadoop MapReduce output format wrapper
62
class HadoopOutputFormat[K, V](
63
outputFormat: MapreduceOutputFormat[K, V],
64
job: Job
65
) extends OutputFormat[(K, V)]
66
67
// Usage in DataSet
68
class DataSet[(K, V)] {
69
def writeUsingOutputFormat(outputFormat: HadoopOutputFormat[K, V]): DataSink[(K, V)]
70
}
71
```
72
73
### MapRed Output Formats
74
75
```scala { .api }
76
// Hadoop MapRed output format wrapper
77
class HadoopOutputFormat[K, V](
78
outputFormat: MapredOutputFormat[K, V],
79
jobConf: JobConf
80
) extends OutputFormat[(K, V)]
81
```
82
83
## File Format Support
84
85
### Text Files
86
87
```scala { .api }
88
import org.apache.hadoop.io.{LongWritable, Text}
89
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
90
91
class ExecutionEnvironment {
92
// Read text files using Hadoop TextInputFormat
93
def readHadoopFile(
94
inputFormat: TextInputFormat,
95
keyClass: Class[LongWritable],
96
valueClass: Class[Text],
97
inputPath: String
98
): DataSet[(LongWritable, Text)]
99
}
100
```
101
102
### Sequence Files
103
104
```scala { .api }
105
import org.apache.hadoop.io.{IntWritable, Text}
106
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
107
108
class ExecutionEnvironment {
109
// Read sequence files
110
def readHadoopFile(
111
inputFormat: SequenceFileInputFormat[IntWritable, Text],
112
keyClass: Class[IntWritable],
113
valueClass: Class[Text],
114
inputPath: String
115
): DataSet[(IntWritable, Text)]
116
}
117
```
118
119
## Usage Examples
120
121
### Reading Text Files with MapReduce API
122
123
```scala
124
import org.apache.flink.api.scala._
125
import org.apache.hadoop.io.{LongWritable, Text}
126
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
127
import org.apache.hadoop.mapreduce.Job
128
129
val env = ExecutionEnvironment.getExecutionEnvironment
130
131
// Create job configuration
132
val job = Job.getInstance()
133
job.getConfiguration.set("mapreduce.input.fileinputformat.inputdir", "/path/to/input")
134
135
// Read text file using Hadoop TextInputFormat
136
val hadoopData = env.readHadoopFile(
137
new TextInputFormat(),
138
classOf[LongWritable],
139
classOf[Text],
140
"/path/to/input",
141
job
142
)
143
144
// Convert to Scala types and process
145
val lines = hadoopData.map { case (offset, text) => text.toString }
146
val words = lines.flatMap(_.split("\\s+"))
147
val wordCounts = words
148
.map((_, 1))
149
.groupBy(0)
150
.sum(1)
151
152
wordCounts.print()
153
```
154
155
### Reading Text Files with MapRed API
156
157
```scala
158
import org.apache.flink.api.scala._
159
import org.apache.hadoop.io.{LongWritable, Text}
160
import org.apache.hadoop.mapred.{TextInputFormat, JobConf}
161
162
val env = ExecutionEnvironment.getExecutionEnvironment
163
164
// Create job configuration
165
val jobConf = new JobConf()
166
jobConf.setInputFormat(classOf[TextInputFormat])
167
168
// Read using legacy MapRed API
169
val hadoopData = env.readHadoopFile(
170
new TextInputFormat(),
171
classOf[LongWritable],
172
classOf[Text],
173
"/path/to/input",
174
jobConf
175
)
176
177
// Process the data
178
val processedData = hadoopData.map { case (key, value) =>
179
s"Line at offset ${key.get()}: ${value.toString}"
180
}
181
182
processedData.writeAsText("/path/to/output")
183
```
184
185
### Reading Sequence Files
186
187
```scala
188
import org.apache.flink.api.scala._
189
import org.apache.hadoop.io.{IntWritable, Text}
190
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
191
import org.apache.hadoop.mapreduce.Job
192
193
val env = ExecutionEnvironment.getExecutionEnvironment
194
195
val job = Job.getInstance()
196
197
// Read sequence file
198
val sequenceData = env.readHadoopFile(
199
new SequenceFileInputFormat[IntWritable, Text](),
200
classOf[IntWritable],
201
classOf[Text],
202
"/path/to/sequence/files",
203
job
204
)
205
206
// Process sequence file data
207
val processedSequence = sequenceData.map { case (intKey, textValue) =>
208
(intKey.get(), textValue.toString.toUpperCase)
209
}
210
211
processedSequence.print()
212
```
213
214
### Custom Hadoop Input Format
215
216
```scala
217
import org.apache.flink.api.scala._
218
import org.apache.hadoop.io.{Writable, LongWritable}
219
import org.apache.hadoop.mapreduce.{InputFormat, InputSplit, RecordReader, TaskAttemptContext}
220
221
// Custom Writable class
222
class CustomWritable extends Writable {
223
var data: String = ""
224
225
def write(out: java.io.DataOutput): Unit = {
226
out.writeUTF(data)
227
}
228
229
def readFields(in: java.io.DataInput): Unit = {
230
data = in.readUTF()
231
}
232
}
233
234
// Custom InputFormat
235
class CustomInputFormat extends InputFormat[LongWritable, CustomWritable] {
236
def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[LongWritable, CustomWritable] = {
237
// Implementation details...
238
null
239
}
240
241
def getSplits(context: TaskAttemptContext): java.util.List[InputSplit] = {
242
// Implementation details...
243
null
244
}
245
}
246
247
// Usage
248
val env = ExecutionEnvironment.getExecutionEnvironment
249
250
val customData = env.readHadoopFile(
251
new CustomInputFormat(),
252
classOf[LongWritable],
253
classOf[CustomWritable],
254
"/path/to/custom/data"
255
)
256
257
val processed = customData.map { case (key, custom) =>
258
s"${key.get()}: ${custom.data}"
259
}
260
```
261
262
### Writing to Hadoop Output Formats
263
264
```scala
265
import org.apache.flink.api.scala._
266
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
267
import org.apache.hadoop.io.{LongWritable, Text}
268
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
269
import org.apache.hadoop.mapreduce.Job
270
271
val env = ExecutionEnvironment.getExecutionEnvironment
272
273
// Create some data
274
val data = env.fromElements(
275
(new LongWritable(1L), new Text("first line")),
276
(new LongWritable(2L), new Text("second line")),
277
(new LongWritable(3L), new Text("third line"))
278
)
279
280
// Configure Hadoop output
281
val job = Job.getInstance()
282
job.getConfiguration.set("mapreduce.output.fileoutputformat.outputdir", "/path/to/output")
283
284
val hadoopOutputFormat = new HadoopOutputFormat[LongWritable, Text](
285
new TextOutputFormat[LongWritable, Text](),
286
job
287
)
288
289
// Write using Hadoop output format
290
data.output(hadoopOutputFormat)
291
292
env.execute("Hadoop Output Example")
293
```
294
295
### Integrating with HDFS
296
297
```scala
298
import org.apache.flink.api.scala._
299
import org.apache.hadoop.conf.Configuration
300
import org.apache.hadoop.fs.{FileSystem, Path}
301
import org.apache.hadoop.io.{LongWritable, Text}
302
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
303
304
val env = ExecutionEnvironment.getExecutionEnvironment
305
306
// Configure HDFS access
307
val hadoopConf = new Configuration()
308
hadoopConf.set("fs.defaultFS", "hdfs://namenode:8020")
309
hadoopConf.set("hadoop.job.ugi", "username,groupname")
310
311
// Read from HDFS
312
val hdfsData = env.readHadoopFile(
313
new TextInputFormat(),
314
classOf[LongWritable],
315
classOf[Text],
316
"hdfs://namenode:8020/path/to/data"
317
)
318
319
// Process and write back to HDFS
320
val result = hdfsData.map { case (offset, text) =>
321
text.toString.toUpperCase
322
}
323
324
result.writeAsText("hdfs://namenode:8020/path/to/output")
325
326
env.execute("HDFS Integration Example")
327
```
328
329
### Parquet File Integration
330
331
```scala
332
import org.apache.flink.api.scala._
333
import org.apache.hadoop.mapreduce.Job
334
import org.apache.parquet.hadoop.ParquetInputFormat
335
import org.apache.parquet.hadoop.example.GroupReadSupport
336
import org.apache.parquet.example.data.Group
337
338
val env = ExecutionEnvironment.getExecutionEnvironment
339
340
// Configure Parquet reading
341
val job = Job.getInstance()
342
ParquetInputFormat.setReadSupportClass(job, classOf[GroupReadSupport])
343
344
// Read Parquet files (simplified example)
345
// Note: Actual Parquet integration requires additional setup
346
val parquetData = env.readHadoopFile(
347
new ParquetInputFormat[Group](),
348
classOf[Void],
349
classOf[Group],
350
"/path/to/parquet/files",
351
job
352
)
353
354
// Process Parquet data
355
val processedParquet = parquetData.map { case (_, group) =>
356
// Extract fields from Parquet Group
357
val field1 = group.getString("field1", 0)
358
val field2 = group.getInteger("field2", 0)
359
(field1, field2)
360
}
361
362
processedParquet.print()
363
```
364
365
### Configuration Best Practices
366
367
```scala
368
import org.apache.flink.api.scala._
369
import org.apache.hadoop.conf.Configuration
370
import org.apache.hadoop.mapreduce.Job
371
372
val env = ExecutionEnvironment.getExecutionEnvironment
373
374
// Create Hadoop configuration with custom settings
375
val hadoopConf = new Configuration()
376
377
// HDFS settings
378
hadoopConf.set("fs.defaultFS", "hdfs://namenode:8020")
379
hadoopConf.set("dfs.replication", "3")
380
381
// MapReduce settings
382
hadoopConf.set("mapreduce.job.reduces", "4")
383
hadoopConf.set("mapreduce.map.memory.mb", "2048")
384
385
// Security settings (if using Kerberos)
386
hadoopConf.set("hadoop.security.authentication", "kerberos")
387
hadoopConf.set("hadoop.security.authorization", "true")
388
389
// Create job with custom configuration
390
val job = Job.getInstance(hadoopConf)
391
392
// Use configuration in Flink operations
393
val data = env.readTextFile("hdfs://namenode:8020/input/data.txt")
394
// ... process data ...
395
data.writeAsText("hdfs://namenode:8020/output/results")
396
397
env.execute("Hadoop Configuration Example")
398
```
399
400
## Performance Considerations
401
402
### Optimization Tips
403
404
1. **Use Appropriate Input Splits**: Configure input split size for optimal parallelism
405
2. **Leverage Data Locality**: Ensure Flink can access Hadoop data locality information
406
3. **Configure Memory Settings**: Tune Hadoop and Flink memory settings for large datasets
407
4. **Use Compression**: Enable compression for better I/O performance with large files
408
5. **Monitor Serialization**: Be aware of Hadoop Writable serialization overhead
409
410
### Common Patterns
411
412
1. **ETL Pipelines**: Read from Hadoop sources, transform in Flink, write to Hadoop sinks
413
2. **Data Migration**: Move data between different Hadoop clusters or formats
414
3. **Hybrid Processing**: Combine Hadoop batch processing with Flink stream processing
415
4. **Legacy Integration**: Integrate Flink with existing Hadoop-based data workflows