or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-sources-sinks.mdexecution-environment.mdgrouping-aggregation.mdhadoop-integration.mdindex.mditerations.mdjoins-cogroups.mdtransformations.mdtype-system.md
tile.json

hadoop-integration.mddocs/

Hadoop Integration

Apache Flink Scala API provides native integration with Hadoop MapReduce and MapRed input/output formats, enabling seamless interoperability with existing Hadoop-based data processing pipelines and file systems.

Hadoop Input Formats

MapReduce Input Formats

Integration with the newer Hadoop MapReduce API (org.apache.hadoop.mapreduce).

class ExecutionEnvironment {
  // Read using MapReduce InputFormat
  def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
    inputFormat: MapreduceInputFormat[K, V],
    keyClass: Class[K],
    valueClass: Class[V], 
    inputPath: String
  ): DataSet[(K, V)]
  
  // Read with job configuration
  def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
    inputFormat: MapreduceInputFormat[K, V],
    keyClass: Class[K],
    valueClass: Class[V],
    inputPath: String,
    job: Job
  ): DataSet[(K, V)]
}

MapRed Input Formats

Integration with the legacy Hadoop MapRed API (org.apache.hadoop.mapred).

class ExecutionEnvironment {
  // Read using MapRed InputFormat
  def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
    inputFormat: MapredInputFormat[K, V],
    keyClass: Class[K],
    valueClass: Class[V],
    inputPath: String
  ): DataSet[(K, V)]
  
  // Read with job configuration
  def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
    inputFormat: MapredInputFormat[K, V], 
    keyClass: Class[K],
    valueClass: Class[V],
    inputPath: String,
    jobConf: JobConf
  ): DataSet[(K, V)]
}

Hadoop Output Formats

MapReduce Output Formats

// Hadoop MapReduce output format wrapper
class HadoopOutputFormat[K, V](
  outputFormat: MapreduceOutputFormat[K, V],
  job: Job
) extends OutputFormat[(K, V)]

// Usage in DataSet
class DataSet[(K, V)] {
  def writeUsingOutputFormat(outputFormat: HadoopOutputFormat[K, V]): DataSink[(K, V)]
}

MapRed Output Formats

// Hadoop MapRed output format wrapper
class HadoopOutputFormat[K, V](
  outputFormat: MapredOutputFormat[K, V],
  jobConf: JobConf
) extends OutputFormat[(K, V)]

File Format Support

Text Files

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

class ExecutionEnvironment {
  // Read text files using Hadoop TextInputFormat
  def readHadoopFile(
    inputFormat: TextInputFormat,
    keyClass: Class[LongWritable],
    valueClass: Class[Text],
    inputPath: String
  ): DataSet[(LongWritable, Text)]
}

Sequence Files

import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat

class ExecutionEnvironment {
  // Read sequence files
  def readHadoopFile(
    inputFormat: SequenceFileInputFormat[IntWritable, Text],
    keyClass: Class[IntWritable], 
    valueClass: Class[Text],
    inputPath: String
  ): DataSet[(IntWritable, Text)]
}

Usage Examples

Reading Text Files with MapReduce API

import org.apache.flink.api.scala._
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.Job

val env = ExecutionEnvironment.getExecutionEnvironment

// Create job configuration
val job = Job.getInstance()
job.getConfiguration.set("mapreduce.input.fileinputformat.inputdir", "/path/to/input")

// Read text file using Hadoop TextInputFormat
val hadoopData = env.readHadoopFile(
  new TextInputFormat(),
  classOf[LongWritable],
  classOf[Text],
  "/path/to/input",
  job
)

// Convert to Scala types and process
val lines = hadoopData.map { case (offset, text) => text.toString }
val words = lines.flatMap(_.split("\\s+"))
val wordCounts = words
  .map((_, 1))
  .groupBy(0)
  .sum(1)

wordCounts.print()

Reading Text Files with MapRed API

import org.apache.flink.api.scala._
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{TextInputFormat, JobConf}

val env = ExecutionEnvironment.getExecutionEnvironment

// Create job configuration
val jobConf = new JobConf()
jobConf.setInputFormat(classOf[TextInputFormat])

// Read using legacy MapRed API
val hadoopData = env.readHadoopFile(
  new TextInputFormat(),
  classOf[LongWritable],
  classOf[Text],
  "/path/to/input",
  jobConf
)

// Process the data
val processedData = hadoopData.map { case (key, value) =>
  s"Line at offset ${key.get()}: ${value.toString}"
}

processedData.writeAsText("/path/to/output")

Reading Sequence Files

import org.apache.flink.api.scala._
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
import org.apache.hadoop.mapreduce.Job

val env = ExecutionEnvironment.getExecutionEnvironment

val job = Job.getInstance()

// Read sequence file
val sequenceData = env.readHadoopFile(
  new SequenceFileInputFormat[IntWritable, Text](),
  classOf[IntWritable],
  classOf[Text],
  "/path/to/sequence/files",
  job
)

// Process sequence file data
val processedSequence = sequenceData.map { case (intKey, textValue) =>
  (intKey.get(), textValue.toString.toUpperCase)
}

processedSequence.print()

Custom Hadoop Input Format

import org.apache.flink.api.scala._
import org.apache.hadoop.io.{Writable, LongWritable}
import org.apache.hadoop.mapreduce.{InputFormat, InputSplit, RecordReader, TaskAttemptContext}

// Custom Writable class
class CustomWritable extends Writable {
  var data: String = ""
  
  def write(out: java.io.DataOutput): Unit = {
    out.writeUTF(data)
  }
  
  def readFields(in: java.io.DataInput): Unit = {
    data = in.readUTF()
  }
}

// Custom InputFormat
class CustomInputFormat extends InputFormat[LongWritable, CustomWritable] {
  def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[LongWritable, CustomWritable] = {
    // Implementation details...
    null
  }
  
  def getSplits(context: TaskAttemptContext): java.util.List[InputSplit] = {
    // Implementation details...
    null
  }
}

// Usage
val env = ExecutionEnvironment.getExecutionEnvironment

val customData = env.readHadoopFile(
  new CustomInputFormat(),
  classOf[LongWritable],
  classOf[CustomWritable],
  "/path/to/custom/data"
)

val processed = customData.map { case (key, custom) =>
  s"${key.get()}: ${custom.data}"
}

Writing to Hadoop Output Formats

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.mapreduce.Job

val env = ExecutionEnvironment.getExecutionEnvironment

// Create some data
val data = env.fromElements(
  (new LongWritable(1L), new Text("first line")),
  (new LongWritable(2L), new Text("second line")),
  (new LongWritable(3L), new Text("third line"))
)

// Configure Hadoop output
val job = Job.getInstance()
job.getConfiguration.set("mapreduce.output.fileoutputformat.outputdir", "/path/to/output")

val hadoopOutputFormat = new HadoopOutputFormat[LongWritable, Text](
  new TextOutputFormat[LongWritable, Text](),
  job
)

// Write using Hadoop output format
data.output(hadoopOutputFormat)

env.execute("Hadoop Output Example")

Integrating with HDFS

import org.apache.flink.api.scala._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

val env = ExecutionEnvironment.getExecutionEnvironment

// Configure HDFS access
val hadoopConf = new Configuration()
hadoopConf.set("fs.defaultFS", "hdfs://namenode:8020")
hadoopConf.set("hadoop.job.ugi", "username,groupname")

// Read from HDFS
val hdfsData = env.readHadoopFile(
  new TextInputFormat(),
  classOf[LongWritable], 
  classOf[Text],
  "hdfs://namenode:8020/path/to/data"
)

// Process and write back to HDFS
val result = hdfsData.map { case (offset, text) =>
  text.toString.toUpperCase
}

result.writeAsText("hdfs://namenode:8020/path/to/output")

env.execute("HDFS Integration Example")

Parquet File Integration

import org.apache.flink.api.scala._
import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.hadoop.ParquetInputFormat
import org.apache.parquet.hadoop.example.GroupReadSupport
import org.apache.parquet.example.data.Group

val env = ExecutionEnvironment.getExecutionEnvironment

// Configure Parquet reading
val job = Job.getInstance()
ParquetInputFormat.setReadSupportClass(job, classOf[GroupReadSupport])

// Read Parquet files (simplified example)
// Note: Actual Parquet integration requires additional setup
val parquetData = env.readHadoopFile(
  new ParquetInputFormat[Group](),
  classOf[Void],
  classOf[Group],
  "/path/to/parquet/files",
  job
)

// Process Parquet data
val processedParquet = parquetData.map { case (_, group) =>
  // Extract fields from Parquet Group
  val field1 = group.getString("field1", 0)
  val field2 = group.getInteger("field2", 0)
  (field1, field2)
}

processedParquet.print()

Configuration Best Practices

import org.apache.flink.api.scala._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job

val env = ExecutionEnvironment.getExecutionEnvironment

// Create Hadoop configuration with custom settings
val hadoopConf = new Configuration()

// HDFS settings
hadoopConf.set("fs.defaultFS", "hdfs://namenode:8020")
hadoopConf.set("dfs.replication", "3")

// MapReduce settings
hadoopConf.set("mapreduce.job.reduces", "4")
hadoopConf.set("mapreduce.map.memory.mb", "2048")

// Security settings (if using Kerberos)
hadoopConf.set("hadoop.security.authentication", "kerberos")
hadoopConf.set("hadoop.security.authorization", "true")

// Create job with custom configuration
val job = Job.getInstance(hadoopConf)

// Use configuration in Flink operations
val data = env.readTextFile("hdfs://namenode:8020/input/data.txt")
// ... process data ...
data.writeAsText("hdfs://namenode:8020/output/results")

env.execute("Hadoop Configuration Example")

Performance Considerations

Optimization Tips

  1. Use Appropriate Input Splits: Configure input split size for optimal parallelism
  2. Leverage Data Locality: Ensure Flink can access Hadoop data locality information
  3. Configure Memory Settings: Tune Hadoop and Flink memory settings for large datasets
  4. Use Compression: Enable compression for better I/O performance with large files
  5. Monitor Serialization: Be aware of Hadoop Writable serialization overhead

Common Patterns

  1. ETL Pipelines: Read from Hadoop sources, transform in Flink, write to Hadoop sinks
  2. Data Migration: Move data between different Hadoop clusters or formats
  3. Hybrid Processing: Combine Hadoop batch processing with Flink stream processing
  4. Legacy Integration: Integrate Flink with existing Hadoop-based data workflows