Apache Flink Scala API provides native integration with Hadoop MapReduce and MapRed input/output formats, enabling seamless interoperability with existing Hadoop-based data processing pipelines and file systems.
Integration with the newer Hadoop MapReduce API (org.apache.hadoop.mapreduce).
class ExecutionEnvironment {
// Read using MapReduce InputFormat
def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
inputFormat: MapreduceInputFormat[K, V],
keyClass: Class[K],
valueClass: Class[V],
inputPath: String
): DataSet[(K, V)]
// Read with job configuration
def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
inputFormat: MapreduceInputFormat[K, V],
keyClass: Class[K],
valueClass: Class[V],
inputPath: String,
job: Job
): DataSet[(K, V)]
}Integration with the legacy Hadoop MapRed API (org.apache.hadoop.mapred).
class ExecutionEnvironment {
// Read using MapRed InputFormat
def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
inputFormat: MapredInputFormat[K, V],
keyClass: Class[K],
valueClass: Class[V],
inputPath: String
): DataSet[(K, V)]
// Read with job configuration
def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
inputFormat: MapredInputFormat[K, V],
keyClass: Class[K],
valueClass: Class[V],
inputPath: String,
jobConf: JobConf
): DataSet[(K, V)]
}// Hadoop MapReduce output format wrapper
class HadoopOutputFormat[K, V](
outputFormat: MapreduceOutputFormat[K, V],
job: Job
) extends OutputFormat[(K, V)]
// Usage in DataSet
class DataSet[(K, V)] {
def writeUsingOutputFormat(outputFormat: HadoopOutputFormat[K, V]): DataSink[(K, V)]
}// Hadoop MapRed output format wrapper
class HadoopOutputFormat[K, V](
outputFormat: MapredOutputFormat[K, V],
jobConf: JobConf
) extends OutputFormat[(K, V)]import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
class ExecutionEnvironment {
// Read text files using Hadoop TextInputFormat
def readHadoopFile(
inputFormat: TextInputFormat,
keyClass: Class[LongWritable],
valueClass: Class[Text],
inputPath: String
): DataSet[(LongWritable, Text)]
}import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
class ExecutionEnvironment {
// Read sequence files
def readHadoopFile(
inputFormat: SequenceFileInputFormat[IntWritable, Text],
keyClass: Class[IntWritable],
valueClass: Class[Text],
inputPath: String
): DataSet[(IntWritable, Text)]
}import org.apache.flink.api.scala._
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.Job
val env = ExecutionEnvironment.getExecutionEnvironment
// Create job configuration
val job = Job.getInstance()
job.getConfiguration.set("mapreduce.input.fileinputformat.inputdir", "/path/to/input")
// Read text file using Hadoop TextInputFormat
val hadoopData = env.readHadoopFile(
new TextInputFormat(),
classOf[LongWritable],
classOf[Text],
"/path/to/input",
job
)
// Convert to Scala types and process
val lines = hadoopData.map { case (offset, text) => text.toString }
val words = lines.flatMap(_.split("\\s+"))
val wordCounts = words
.map((_, 1))
.groupBy(0)
.sum(1)
wordCounts.print()import org.apache.flink.api.scala._
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{TextInputFormat, JobConf}
val env = ExecutionEnvironment.getExecutionEnvironment
// Create job configuration
val jobConf = new JobConf()
jobConf.setInputFormat(classOf[TextInputFormat])
// Read using legacy MapRed API
val hadoopData = env.readHadoopFile(
new TextInputFormat(),
classOf[LongWritable],
classOf[Text],
"/path/to/input",
jobConf
)
// Process the data
val processedData = hadoopData.map { case (key, value) =>
s"Line at offset ${key.get()}: ${value.toString}"
}
processedData.writeAsText("/path/to/output")import org.apache.flink.api.scala._
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
import org.apache.hadoop.mapreduce.Job
val env = ExecutionEnvironment.getExecutionEnvironment
val job = Job.getInstance()
// Read sequence file
val sequenceData = env.readHadoopFile(
new SequenceFileInputFormat[IntWritable, Text](),
classOf[IntWritable],
classOf[Text],
"/path/to/sequence/files",
job
)
// Process sequence file data
val processedSequence = sequenceData.map { case (intKey, textValue) =>
(intKey.get(), textValue.toString.toUpperCase)
}
processedSequence.print()import org.apache.flink.api.scala._
import org.apache.hadoop.io.{Writable, LongWritable}
import org.apache.hadoop.mapreduce.{InputFormat, InputSplit, RecordReader, TaskAttemptContext}
// Custom Writable class
class CustomWritable extends Writable {
var data: String = ""
def write(out: java.io.DataOutput): Unit = {
out.writeUTF(data)
}
def readFields(in: java.io.DataInput): Unit = {
data = in.readUTF()
}
}
// Custom InputFormat
class CustomInputFormat extends InputFormat[LongWritable, CustomWritable] {
def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[LongWritable, CustomWritable] = {
// Implementation details...
null
}
def getSplits(context: TaskAttemptContext): java.util.List[InputSplit] = {
// Implementation details...
null
}
}
// Usage
val env = ExecutionEnvironment.getExecutionEnvironment
val customData = env.readHadoopFile(
new CustomInputFormat(),
classOf[LongWritable],
classOf[CustomWritable],
"/path/to/custom/data"
)
val processed = customData.map { case (key, custom) =>
s"${key.get()}: ${custom.data}"
}import org.apache.flink.api.scala._
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.mapreduce.Job
val env = ExecutionEnvironment.getExecutionEnvironment
// Create some data
val data = env.fromElements(
(new LongWritable(1L), new Text("first line")),
(new LongWritable(2L), new Text("second line")),
(new LongWritable(3L), new Text("third line"))
)
// Configure Hadoop output
val job = Job.getInstance()
job.getConfiguration.set("mapreduce.output.fileoutputformat.outputdir", "/path/to/output")
val hadoopOutputFormat = new HadoopOutputFormat[LongWritable, Text](
new TextOutputFormat[LongWritable, Text](),
job
)
// Write using Hadoop output format
data.output(hadoopOutputFormat)
env.execute("Hadoop Output Example")import org.apache.flink.api.scala._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
val env = ExecutionEnvironment.getExecutionEnvironment
// Configure HDFS access
val hadoopConf = new Configuration()
hadoopConf.set("fs.defaultFS", "hdfs://namenode:8020")
hadoopConf.set("hadoop.job.ugi", "username,groupname")
// Read from HDFS
val hdfsData = env.readHadoopFile(
new TextInputFormat(),
classOf[LongWritable],
classOf[Text],
"hdfs://namenode:8020/path/to/data"
)
// Process and write back to HDFS
val result = hdfsData.map { case (offset, text) =>
text.toString.toUpperCase
}
result.writeAsText("hdfs://namenode:8020/path/to/output")
env.execute("HDFS Integration Example")import org.apache.flink.api.scala._
import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.hadoop.ParquetInputFormat
import org.apache.parquet.hadoop.example.GroupReadSupport
import org.apache.parquet.example.data.Group
val env = ExecutionEnvironment.getExecutionEnvironment
// Configure Parquet reading
val job = Job.getInstance()
ParquetInputFormat.setReadSupportClass(job, classOf[GroupReadSupport])
// Read Parquet files (simplified example)
// Note: Actual Parquet integration requires additional setup
val parquetData = env.readHadoopFile(
new ParquetInputFormat[Group](),
classOf[Void],
classOf[Group],
"/path/to/parquet/files",
job
)
// Process Parquet data
val processedParquet = parquetData.map { case (_, group) =>
// Extract fields from Parquet Group
val field1 = group.getString("field1", 0)
val field2 = group.getInteger("field2", 0)
(field1, field2)
}
processedParquet.print()import org.apache.flink.api.scala._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
val env = ExecutionEnvironment.getExecutionEnvironment
// Create Hadoop configuration with custom settings
val hadoopConf = new Configuration()
// HDFS settings
hadoopConf.set("fs.defaultFS", "hdfs://namenode:8020")
hadoopConf.set("dfs.replication", "3")
// MapReduce settings
hadoopConf.set("mapreduce.job.reduces", "4")
hadoopConf.set("mapreduce.map.memory.mb", "2048")
// Security settings (if using Kerberos)
hadoopConf.set("hadoop.security.authentication", "kerberos")
hadoopConf.set("hadoop.security.authorization", "true")
// Create job with custom configuration
val job = Job.getInstance(hadoopConf)
// Use configuration in Flink operations
val data = env.readTextFile("hdfs://namenode:8020/input/data.txt")
// ... process data ...
data.writeAsText("hdfs://namenode:8020/output/results")
env.execute("Hadoop Configuration Example")