Hadoop compatibility layer for Apache Flink providing input/output format wrappers and utilities to integrate Hadoop MapReduce with Flink's DataSet and DataStream APIs
—
The Scala API provides native Scala bindings for Hadoop integration within Flink, offering idiomatic Scala interfaces with implicit type information, tuple syntax, and functional programming patterns.
The Scala API mirrors the Java API functionality but provides Scala-friendly interfaces using native Scala tuples instead of Flink's Tuple2 class, implicit TypeInformation parameters, and object-oriented design patterns consistent with Scala conventions.
The primary entry point for creating Hadoop InputFormat wrappers in Scala.
object HadoopInputs {
// MapRed API methods
def readHadoopFile[K, V](
mapredInputFormat: MapredFileInputFormat[K, V],
key: Class[K],
value: Class[V],
inputPath: String,
job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
def readHadoopFile[K, V](
mapredInputFormat: MapredFileInputFormat[K, V],
key: Class[K],
value: Class[V],
inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
def readSequenceFile[K, V](
key: Class[K],
value: Class[V],
inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
def createHadoopInput[K, V](
mapredInputFormat: MapredInputFormat[K, V],
key: Class[K],
value: Class[V],
job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
// MapReduce API methods
def readHadoopFile[K, V](
mapreduceInputFormat: MapreduceFileInputFormat[K, V],
key: Class[K],
value: Class[V],
inputPath: String,
job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];
def readHadoopFile[K, V](
mapreduceInputFormat: MapreduceFileInputFormat[K, V],
key: Class[K],
value: Class[V],
inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];
def createHadoopInput[K, V](
mapreduceInputFormat: MapreduceInputFormat[K, V],
key: Class[K],
value: Class[V],
job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];
}@Public
class HadoopInputFormat[K, V] extends HadoopInputFormatBase[K, V, (K, V)] {
// Constructor with JobConf
def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V], job: JobConf);
// Constructor with default JobConf
def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]);
// Read next record as Scala tuple
def nextRecord(reuse: (K, V)): (K, V);
}@Public
class HadoopInputFormat[K, V] extends HadoopInputFormatBase[K, V, (K, V)] {
// Constructor with Job
def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V], job: Job);
// Constructor with default Job
def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]);
// Read next record as Scala tuple
def nextRecord(reuse: (K, V)): (K, V);
}@Public
class HadoopOutputFormat[K, V] extends HadoopOutputFormatBase[K, V, (K, V)] {
// Constructor with JobConf
def this(mapredOutputFormat: OutputFormat[K, V], job: JobConf);
// Constructor with OutputCommitter and JobConf
def this(
mapredOutputFormat: OutputFormat[K, V],
outputCommitterClass: Class[OutputCommitter],
job: JobConf);
// Write a record from Scala tuple
def writeRecord(record: (K, V)): Unit;
}import org.apache.flink.api.scala._
import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
import org.apache.hadoop.mapred.{TextInputFormat, JobConf}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.fs.Path
val env = ExecutionEnvironment.getExecutionEnvironment
// Read text files using Scala API
val input: DataSet[(LongWritable, Text)] = env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat(),
classOf[LongWritable],
classOf[Text],
"hdfs://namenode:port/input/path"
)
)
// Process data with Scala operations
val lines: DataSet[String] = input.map(_._2.toString)
val words: DataSet[String] = lines.flatMap(_.split("\\s+"))
val wordCounts: DataSet[(String, Int)] = words
.map((_, 1))
.groupBy(0)
.sum(1)import org.apache.hadoop.io.{IntWritable, Text}
// Read sequence files
val sequenceData: DataSet[(IntWritable, Text)] = env.createInput(
HadoopInputs.readSequenceFile(
classOf[IntWritable],
classOf[Text],
"hdfs://namenode:port/sequence/files"
)
)
// Convert to native Scala types
val nativeData: DataSet[(Int, String)] = sequenceData.map {
case (key, value) => (key.get(), value.toString)
}import org.apache.hadoop.mapred.JobConf
// Configure Hadoop job
val jobConf = new JobConf()
jobConf.set("mapreduce.input.fileinputformat.inputdir", "/custom/input/path")
jobConf.set("custom.property", "custom-value")
jobConf.setBoolean("custom.flag", true)
// Use custom configuration
val customInput: DataSet[(LongWritable, Text)] = env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat(),
classOf[LongWritable],
classOf[Text],
"/input/path",
jobConf
)
)import org.apache.hadoop.mapred.SequenceFileInputFormat
import org.apache.hadoop.io.{BytesWritable, Text}
// Process binary data
val binaryData: DataSet[(BytesWritable, Text)] = env.createInput(
HadoopInputs.createHadoopInput(
new SequenceFileInputFormat[BytesWritable, Text](),
classOf[BytesWritable],
classOf[Text],
jobConf
)
)
// Complex processing pipeline
val processedData = binaryData
.filter(_._2.toString.nonEmpty)
.map { case (bytes, text) =>
(text.toString, bytes.getLength)
}
.groupBy(0)
.reduce { (a, b) =>
(a._1, a._2 + b._2)
}import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
import org.apache.hadoop.mapred.{TextOutputFormat, JobConf}
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.fs.Path
// Configure output
val outputConf = new JobConf()
outputConf.setOutputFormat(classOf[TextOutputFormat[NullWritable, Text]])
outputConf.setOutputKeyClass(classOf[NullWritable])
outputConf.setOutputValueClass(classOf[Text])
TextOutputFormat.setOutputPath(outputConf, new Path("hdfs://namenode:port/output"))
// Create output format
val hadoopOutput = new HadoopOutputFormat(
new TextOutputFormat[NullWritable, Text](),
outputConf
)
// Prepare data for output
val outputData: DataSet[(NullWritable, Text)] = processedData.map {
case (word, count) => (NullWritable.get(), new Text(s"$word: $count"))
}
// Write to Hadoop output
outputData.output(hadoopOutput)
env.execute("Scala Hadoop Integration")// Define a custom Writable type
class CustomRecord extends Writable {
var id: Int = 0
var name: String = ""
var value: Double = 0.0
def this(id: Int, name: String, value: Double) = {
this()
this.id = id
this.name = name
this.value = value
}
override def write(out: DataOutput): Unit = {
out.writeInt(id)
out.writeUTF(name)
out.writeDouble(value)
}
override def readFields(in: DataInput): Unit = {
id = in.readInt()
name = in.readUTF()
value = in.readDouble()
}
override def toString: String = s"CustomRecord($id, $name, $value)"
}
// Use custom Writable in Scala
val customData: DataSet[(LongWritable, CustomRecord)] = env.createInput(
HadoopInputs.createHadoopInput(
new SequenceFileInputFormat[LongWritable, CustomRecord](),
classOf[LongWritable],
classOf[CustomRecord],
jobConf
)
)
// Process custom records
val summary = customData
.map(_._2) // Extract CustomRecord
.groupBy(_.name)
.reduce { (a, b) =>
new CustomRecord(
math.min(a.id, b.id),
a.name,
a.value + b.value
)
}// Use Scala's functional programming features
val result = input
.map(_._2.toString.toLowerCase.trim)
.filter(_.nonEmpty)
.flatMap(_.split("\\W+"))
.filter(word => word.length > 2 && !stopWords.contains(word))
.map((_, 1))
.groupBy(0)
.sum(1)
.filter(_._2 > threshold)
.sortPartition(1, Order.DESCENDING)
// Use pattern matching
val categorized = input.map {
case (offset, text) if text.toString.startsWith("ERROR") =>
("error", text.toString)
case (offset, text) if text.toString.startsWith("WARN") =>
("warning", text.toString)
case (offset, text) =>
("info", text.toString)
}// Type-safe configuration helpers
object HadoopConfig {
def textInput(path: String): JobConf = {
val conf = new JobConf()
conf.setInputFormat(classOf[TextInputFormat])
TextInputFormat.addInputPath(conf, new Path(path))
conf
}
def sequenceOutput(path: String): JobConf = {
val conf = new JobConf()
conf.setOutputFormat(classOf[SequenceFileOutputFormat[_, _]])
SequenceFileOutputFormat.setOutputPath(conf, new Path(path))
conf
}
}
// Use type-safe configuration
val input = env.createInput(
HadoopInputs.createHadoopInput(
new TextInputFormat(),
classOf[LongWritable],
classOf[Text],
HadoopConfig.textInput("/input/path")
)
)import scala.util.{Try, Success, Failure}
// Safe input creation
def createSafeInput(path: String): Option[DataSet[(LongWritable, Text)]] = {
Try {
env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat(),
classOf[LongWritable],
classOf[Text],
path
)
)
} match {
case Success(input) => Some(input)
case Failure(exception) =>
println(s"Failed to create input for path $path: ${exception.getMessage}")
None
}
}
// Use safe input creation
createSafeInput("/input/path") match {
case Some(input) =>
// Process input
val result = input.map(_._2.toString).collect()
println(s"Processed ${result.length} records")
case None =>
println("Failed to create input, using alternative processing")
// Handle error case
}// Use Flink's rich transformation API with Hadoop inputs
val processedData = input
.map(_._2.toString) // Extract text
.flatMap(_.split("\\s+")) // Split into words
.map(_.toLowerCase.replaceAll("[^a-z]", "")) // Clean words
.filter(_.length > 2) // Filter short words
.map((_, 1)) // Create word count pairs
.groupBy(0) // Group by word
.sum(1) // Sum counts
.filter(_._2 > 5) // Filter rare words
.sortPartition(1, Order.DESCENDING) // Sort by count// Use broadcast variables with Hadoop data
val stopWords = env.fromElements("the", "a", "an", "and", "or", "but")
val broadcastStopWords = stopWords.collect().toSet
val filteredWords = input
.map(_._2.toString.toLowerCase)
.flatMap(_.split("\\s+"))
.filter(word => !broadcastStopWords.contains(word))import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
class WordProcessor extends RichMapFunction[String, (String, Int)] {
var stopWords: Set[String] = _
override def open(parameters: Configuration): Unit = {
// Initialize from broadcast variable or configuration
stopWords = Set("the", "a", "an", "and", "or", "but")
}
override def map(word: String): (String, Int) = {
val cleaned = word.toLowerCase.replaceAll("[^a-z]", "")
if (cleaned.length > 2 && !stopWords.contains(cleaned)) {
(cleaned, 1)
} else {
("", 0) // Will be filtered out later
}
}
}
// Use rich function with Hadoop input
val wordCounts = input
.map(_._2.toString)
.flatMap(_.split("\\s+"))
.map(new WordProcessor())
.filter(_._1.nonEmpty)
.groupBy(0)
.sum(1)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11