0
# Scala API
1
2
The Scala API provides native Scala bindings for Hadoop integration within Flink, offering idiomatic Scala interfaces with implicit type information, tuple syntax, and functional programming patterns.
3
4
## Overview
5
6
The Scala API mirrors the Java API functionality but provides Scala-friendly interfaces using native Scala tuples instead of Flink's Tuple2 class, implicit TypeInformation parameters, and object-oriented design patterns consistent with Scala conventions.
7
8
## HadoopInputs Object
9
10
The primary entry point for creating Hadoop InputFormat wrappers in Scala.
11
12
```scala { .api }
13
object HadoopInputs {
14
15
// MapRed API methods
16
def readHadoopFile[K, V](
17
mapredInputFormat: MapredFileInputFormat[K, V],
18
key: Class[K],
19
value: Class[V],
20
inputPath: String,
21
job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
22
23
def readHadoopFile[K, V](
24
mapredInputFormat: MapredFileInputFormat[K, V],
25
key: Class[K],
26
value: Class[V],
27
inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
28
29
def readSequenceFile[K, V](
30
key: Class[K],
31
value: Class[V],
32
inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
33
34
def createHadoopInput[K, V](
35
mapredInputFormat: MapredInputFormat[K, V],
36
key: Class[K],
37
value: Class[V],
38
job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
39
40
// MapReduce API methods
41
def readHadoopFile[K, V](
42
mapreduceInputFormat: MapreduceFileInputFormat[K, V],
43
key: Class[K],
44
value: Class[V],
45
inputPath: String,
46
job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];
47
48
def readHadoopFile[K, V](
49
mapreduceInputFormat: MapreduceFileInputFormat[K, V],
50
key: Class[K],
51
value: Class[V],
52
inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];
53
54
def createHadoopInput[K, V](
55
mapreduceInputFormat: MapreduceInputFormat[K, V],
56
key: Class[K],
57
value: Class[V],
58
job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];
59
}
60
```
61
62
## Scala HadoopInputFormat Classes
63
64
### MapRed HadoopInputFormat
65
66
```scala { .api }
67
@Public
68
class HadoopInputFormat[K, V] extends HadoopInputFormatBase[K, V, (K, V)] {
69
70
// Constructor with JobConf
71
def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V], job: JobConf);
72
73
// Constructor with default JobConf
74
def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]);
75
76
// Read next record as Scala tuple
77
def nextRecord(reuse: (K, V)): (K, V);
78
}
79
```
80
81
### MapReduce HadoopInputFormat
82
83
```scala { .api }
84
@Public
85
class HadoopInputFormat[K, V] extends HadoopInputFormatBase[K, V, (K, V)] {
86
87
// Constructor with Job
88
def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V], job: Job);
89
90
// Constructor with default Job
91
def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]);
92
93
// Read next record as Scala tuple
94
def nextRecord(reuse: (K, V)): (K, V);
95
}
96
```
97
98
## Scala HadoopOutputFormat Classes
99
100
### MapRed HadoopOutputFormat
101
102
```scala { .api }
103
@Public
104
class HadoopOutputFormat[K, V] extends HadoopOutputFormatBase[K, V, (K, V)] {
105
106
// Constructor with JobConf
107
def this(mapredOutputFormat: OutputFormat[K, V], job: JobConf);
108
109
// Constructor with OutputCommitter and JobConf
110
def this(
111
mapredOutputFormat: OutputFormat[K, V],
112
outputCommitterClass: Class[OutputCommitter],
113
job: JobConf);
114
115
// Write a record from Scala tuple
116
def writeRecord(record: (K, V)): Unit;
117
}
118
```
119
120
## Usage Examples
121
122
### Basic Input/Output with Scala
123
124
```scala
125
import org.apache.flink.api.scala._
126
import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
127
import org.apache.hadoop.mapred.{TextInputFormat, JobConf}
128
import org.apache.hadoop.io.{LongWritable, Text}
129
import org.apache.hadoop.fs.Path
130
131
val env = ExecutionEnvironment.getExecutionEnvironment
132
133
// Read text files using Scala API
134
val input: DataSet[(LongWritable, Text)] = env.createInput(
135
HadoopInputs.readHadoopFile(
136
new TextInputFormat(),
137
classOf[LongWritable],
138
classOf[Text],
139
"hdfs://namenode:port/input/path"
140
)
141
)
142
143
// Process data with Scala operations
144
val lines: DataSet[String] = input.map(_._2.toString)
145
val words: DataSet[String] = lines.flatMap(_.split("\\s+"))
146
val wordCounts: DataSet[(String, Int)] = words
147
.map((_, 1))
148
.groupBy(0)
149
.sum(1)
150
```
151
152
### Working with Sequence Files
153
154
```scala
155
import org.apache.hadoop.io.{IntWritable, Text}
156
157
// Read sequence files
158
val sequenceData: DataSet[(IntWritable, Text)] = env.createInput(
159
HadoopInputs.readSequenceFile(
160
classOf[IntWritable],
161
classOf[Text],
162
"hdfs://namenode:port/sequence/files"
163
)
164
)
165
166
// Convert to native Scala types
167
val nativeData: DataSet[(Int, String)] = sequenceData.map {
168
case (key, value) => (key.get(), value.toString)
169
}
170
```
171
172
### Custom Configuration
173
174
```scala
175
import org.apache.hadoop.mapred.JobConf
176
177
// Configure Hadoop job
178
val jobConf = new JobConf()
179
jobConf.set("mapreduce.input.fileinputformat.inputdir", "/custom/input/path")
180
jobConf.set("custom.property", "custom-value")
181
jobConf.setBoolean("custom.flag", true)
182
183
// Use custom configuration
184
val customInput: DataSet[(LongWritable, Text)] = env.createInput(
185
HadoopInputs.readHadoopFile(
186
new TextInputFormat(),
187
classOf[LongWritable],
188
classOf[Text],
189
"/input/path",
190
jobConf
191
)
192
)
193
```
194
195
### Advanced Data Processing
196
197
```scala
198
import org.apache.hadoop.mapred.SequenceFileInputFormat
199
import org.apache.hadoop.io.{BytesWritable, Text}
200
201
// Process binary data
202
val binaryData: DataSet[(BytesWritable, Text)] = env.createInput(
203
HadoopInputs.createHadoopInput(
204
new SequenceFileInputFormat[BytesWritable, Text](),
205
classOf[BytesWritable],
206
classOf[Text],
207
jobConf
208
)
209
)
210
211
// Complex processing pipeline
212
val processedData = binaryData
213
.filter(_._2.toString.nonEmpty)
214
.map { case (bytes, text) =>
215
(text.toString, bytes.getLength)
216
}
217
.groupBy(0)
218
.reduce { (a, b) =>
219
(a._1, a._2 + b._2)
220
}
221
```
222
223
### Writing Output with Scala
224
225
```scala
226
import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
227
import org.apache.hadoop.mapred.{TextOutputFormat, JobConf}
228
import org.apache.hadoop.io.{NullWritable, Text}
229
import org.apache.hadoop.fs.Path
230
231
// Configure output
232
val outputConf = new JobConf()
233
outputConf.setOutputFormat(classOf[TextOutputFormat[NullWritable, Text]])
234
outputConf.setOutputKeyClass(classOf[NullWritable])
235
outputConf.setOutputValueClass(classOf[Text])
236
TextOutputFormat.setOutputPath(outputConf, new Path("hdfs://namenode:port/output"))
237
238
// Create output format
239
val hadoopOutput = new HadoopOutputFormat(
240
new TextOutputFormat[NullWritable, Text](),
241
outputConf
242
)
243
244
// Prepare data for output
245
val outputData: DataSet[(NullWritable, Text)] = processedData.map {
246
case (word, count) => (NullWritable.get(), new Text(s"$word: $count"))
247
}
248
249
// Write to Hadoop output
250
outputData.output(hadoopOutput)
251
env.execute("Scala Hadoop Integration")
252
```
253
254
### Working with Custom Writable Types
255
256
```scala
257
// Define a custom Writable type
258
class CustomRecord extends Writable {
259
var id: Int = 0
260
var name: String = ""
261
var value: Double = 0.0
262
263
def this(id: Int, name: String, value: Double) = {
264
this()
265
this.id = id
266
this.name = name
267
this.value = value
268
}
269
270
override def write(out: DataOutput): Unit = {
271
out.writeInt(id)
272
out.writeUTF(name)
273
out.writeDouble(value)
274
}
275
276
override def readFields(in: DataInput): Unit = {
277
id = in.readInt()
278
name = in.readUTF()
279
value = in.readDouble()
280
}
281
282
override def toString: String = s"CustomRecord($id, $name, $value)"
283
}
284
285
// Use custom Writable in Scala
286
val customData: DataSet[(LongWritable, CustomRecord)] = env.createInput(
287
HadoopInputs.createHadoopInput(
288
new SequenceFileInputFormat[LongWritable, CustomRecord](),
289
classOf[LongWritable],
290
classOf[CustomRecord],
291
jobConf
292
)
293
)
294
295
// Process custom records
296
val summary = customData
297
.map(_._2) // Extract CustomRecord
298
.groupBy(_.name)
299
.reduce { (a, b) =>
300
new CustomRecord(
301
math.min(a.id, b.id),
302
a.name,
303
a.value + b.value
304
)
305
}
306
```
307
308
### Functional Programming Patterns
309
310
```scala
311
// Use Scala's functional programming features
312
val result = input
313
.map(_._2.toString.toLowerCase.trim)
314
.filter(_.nonEmpty)
315
.flatMap(_.split("\\W+"))
316
.filter(word => word.length > 2 && !stopWords.contains(word))
317
.map((_, 1))
318
.groupBy(0)
319
.sum(1)
320
.filter(_._2 > threshold)
321
.sortPartition(1, Order.DESCENDING)
322
323
// Use pattern matching
324
val categorized = input.map {
325
case (offset, text) if text.toString.startsWith("ERROR") =>
326
("error", text.toString)
327
case (offset, text) if text.toString.startsWith("WARN") =>
328
("warning", text.toString)
329
case (offset, text) =>
330
("info", text.toString)
331
}
332
```
333
334
### Type-Safe Configuration
335
336
```scala
337
// Type-safe configuration helpers
338
object HadoopConfig {
339
def textInput(path: String): JobConf = {
340
val conf = new JobConf()
341
conf.setInputFormat(classOf[TextInputFormat])
342
TextInputFormat.addInputPath(conf, new Path(path))
343
conf
344
}
345
346
def sequenceOutput(path: String): JobConf = {
347
val conf = new JobConf()
348
conf.setOutputFormat(classOf[SequenceFileOutputFormat[_, _]])
349
SequenceFileOutputFormat.setOutputPath(conf, new Path(path))
350
conf
351
}
352
}
353
354
// Use type-safe configuration
355
val input = env.createInput(
356
HadoopInputs.createHadoopInput(
357
new TextInputFormat(),
358
classOf[LongWritable],
359
classOf[Text],
360
HadoopConfig.textInput("/input/path")
361
)
362
)
363
```
364
365
### Error Handling in Scala
366
367
```scala
368
import scala.util.{Try, Success, Failure}
369
370
// Safe input creation
371
def createSafeInput(path: String): Option[DataSet[(LongWritable, Text)]] = {
372
Try {
373
env.createInput(
374
HadoopInputs.readHadoopFile(
375
new TextInputFormat(),
376
classOf[LongWritable],
377
classOf[Text],
378
path
379
)
380
)
381
} match {
382
case Success(input) => Some(input)
383
case Failure(exception) =>
384
println(s"Failed to create input for path $path: ${exception.getMessage}")
385
None
386
}
387
}
388
389
// Use safe input creation
390
createSafeInput("/input/path") match {
391
case Some(input) =>
392
// Process input
393
val result = input.map(_._2.toString).collect()
394
println(s"Processed ${result.length} records")
395
396
case None =>
397
println("Failed to create input, using alternative processing")
398
// Handle error case
399
}
400
```
401
402
## Integration with Flink Scala API Features
403
404
### DataSet Transformations
405
406
```scala
407
// Use Flink's rich transformation API with Hadoop inputs
408
val processedData = input
409
.map(_._2.toString) // Extract text
410
.flatMap(_.split("\\s+")) // Split into words
411
.map(_.toLowerCase.replaceAll("[^a-z]", "")) // Clean words
412
.filter(_.length > 2) // Filter short words
413
.map((_, 1)) // Create word count pairs
414
.groupBy(0) // Group by word
415
.sum(1) // Sum counts
416
.filter(_._2 > 5) // Filter rare words
417
.sortPartition(1, Order.DESCENDING) // Sort by count
418
```
419
420
### Broadcast Variables
421
422
```scala
423
// Use broadcast variables with Hadoop data
424
val stopWords = env.fromElements("the", "a", "an", "and", "or", "but")
425
val broadcastStopWords = stopWords.collect().toSet
426
427
val filteredWords = input
428
.map(_._2.toString.toLowerCase)
429
.flatMap(_.split("\\s+"))
430
.filter(word => !broadcastStopWords.contains(word))
431
```
432
433
### Rich Functions
434
435
```scala
436
import org.apache.flink.api.common.functions.RichMapFunction
437
import org.apache.flink.configuration.Configuration
438
439
class WordProcessor extends RichMapFunction[String, (String, Int)] {
440
var stopWords: Set[String] = _
441
442
override def open(parameters: Configuration): Unit = {
443
// Initialize from broadcast variable or configuration
444
stopWords = Set("the", "a", "an", "and", "or", "but")
445
}
446
447
override def map(word: String): (String, Int) = {
448
val cleaned = word.toLowerCase.replaceAll("[^a-z]", "")
449
if (cleaned.length > 2 && !stopWords.contains(cleaned)) {
450
(cleaned, 1)
451
} else {
452
("", 0) // Will be filtered out later
453
}
454
}
455
}
456
457
// Use rich function with Hadoop input
458
val wordCounts = input
459
.map(_._2.toString)
460
.flatMap(_.split("\\s+"))
461
.map(new WordProcessor())
462
.filter(_._1.nonEmpty)
463
.groupBy(0)
464
.sum(1)
465
```