CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11

Hadoop compatibility layer for Apache Flink providing input/output format wrappers and utilities to integrate Hadoop MapReduce with Flink's DataSet and DataStream APIs

Pending
Overview
Eval results
Files

scala-api.mddocs/

Scala API

The Scala API provides native Scala bindings for Hadoop integration within Flink, offering idiomatic Scala interfaces with implicit type information, tuple syntax, and functional programming patterns.

Overview

The Scala API mirrors the Java API functionality but provides Scala-friendly interfaces using native Scala tuples instead of Flink's Tuple2 class, implicit TypeInformation parameters, and object-oriented design patterns consistent with Scala conventions.

HadoopInputs Object

The primary entry point for creating Hadoop InputFormat wrappers in Scala.

object HadoopInputs {
  
  // MapRed API methods
  def readHadoopFile[K, V](
    mapredInputFormat: MapredFileInputFormat[K, V],
    key: Class[K],
    value: Class[V],
    inputPath: String,
    job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

  def readHadoopFile[K, V](
    mapredInputFormat: MapredFileInputFormat[K, V],
    key: Class[K],
    value: Class[V],
    inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

  def readSequenceFile[K, V](
    key: Class[K],
    value: Class[V],
    inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

  def createHadoopInput[K, V](
    mapredInputFormat: MapredInputFormat[K, V],
    key: Class[K],
    value: Class[V],
    job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

  // MapReduce API methods
  def readHadoopFile[K, V](
    mapreduceInputFormat: MapreduceFileInputFormat[K, V],
    key: Class[K],
    value: Class[V],
    inputPath: String,
    job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];

  def readHadoopFile[K, V](
    mapreduceInputFormat: MapreduceFileInputFormat[K, V],
    key: Class[K],
    value: Class[V],
    inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];

  def createHadoopInput[K, V](
    mapreduceInputFormat: MapreduceInputFormat[K, V],
    key: Class[K],
    value: Class[V],
    job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];
}

Scala HadoopInputFormat Classes

MapRed HadoopInputFormat

@Public
class HadoopInputFormat[K, V] extends HadoopInputFormatBase[K, V, (K, V)] {
  
  // Constructor with JobConf
  def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V], job: JobConf);
  
  // Constructor with default JobConf
  def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]);
  
  // Read next record as Scala tuple
  def nextRecord(reuse: (K, V)): (K, V);
}

MapReduce HadoopInputFormat

@Public
class HadoopInputFormat[K, V] extends HadoopInputFormatBase[K, V, (K, V)] {
  
  // Constructor with Job
  def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V], job: Job);
  
  // Constructor with default Job
  def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]);
  
  // Read next record as Scala tuple
  def nextRecord(reuse: (K, V)): (K, V);
}

Scala HadoopOutputFormat Classes

MapRed HadoopOutputFormat

@Public
class HadoopOutputFormat[K, V] extends HadoopOutputFormatBase[K, V, (K, V)] {
  
  // Constructor with JobConf
  def this(mapredOutputFormat: OutputFormat[K, V], job: JobConf);
  
  // Constructor with OutputCommitter and JobConf
  def this(
    mapredOutputFormat: OutputFormat[K, V], 
    outputCommitterClass: Class[OutputCommitter], 
    job: JobConf);
  
  // Write a record from Scala tuple
  def writeRecord(record: (K, V)): Unit;
}

Usage Examples

Basic Input/Output with Scala

import org.apache.flink.api.scala._
import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
import org.apache.hadoop.mapred.{TextInputFormat, JobConf}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.fs.Path

val env = ExecutionEnvironment.getExecutionEnvironment

// Read text files using Scala API
val input: DataSet[(LongWritable, Text)] = env.createInput(
  HadoopInputs.readHadoopFile(
    new TextInputFormat(),
    classOf[LongWritable],
    classOf[Text],
    "hdfs://namenode:port/input/path"
  )
)

// Process data with Scala operations
val lines: DataSet[String] = input.map(_._2.toString)
val words: DataSet[String] = lines.flatMap(_.split("\\s+"))
val wordCounts: DataSet[(String, Int)] = words
  .map((_, 1))
  .groupBy(0)
  .sum(1)

Working with Sequence Files

import org.apache.hadoop.io.{IntWritable, Text}

// Read sequence files
val sequenceData: DataSet[(IntWritable, Text)] = env.createInput(
  HadoopInputs.readSequenceFile(
    classOf[IntWritable],
    classOf[Text],
    "hdfs://namenode:port/sequence/files"
  )
)

// Convert to native Scala types
val nativeData: DataSet[(Int, String)] = sequenceData.map {
  case (key, value) => (key.get(), value.toString)
}

Custom Configuration

import org.apache.hadoop.mapred.JobConf

// Configure Hadoop job
val jobConf = new JobConf()
jobConf.set("mapreduce.input.fileinputformat.inputdir", "/custom/input/path")
jobConf.set("custom.property", "custom-value")
jobConf.setBoolean("custom.flag", true)

// Use custom configuration
val customInput: DataSet[(LongWritable, Text)] = env.createInput(
  HadoopInputs.readHadoopFile(
    new TextInputFormat(),
    classOf[LongWritable],
    classOf[Text],
    "/input/path",
    jobConf
  )
)

Advanced Data Processing

import org.apache.hadoop.mapred.SequenceFileInputFormat
import org.apache.hadoop.io.{BytesWritable, Text}

// Process binary data
val binaryData: DataSet[(BytesWritable, Text)] = env.createInput(
  HadoopInputs.createHadoopInput(
    new SequenceFileInputFormat[BytesWritable, Text](),
    classOf[BytesWritable],
    classOf[Text],
    jobConf
  )
)

// Complex processing pipeline
val processedData = binaryData
  .filter(_._2.toString.nonEmpty)
  .map { case (bytes, text) => 
    (text.toString, bytes.getLength)
  }
  .groupBy(0)
  .reduce { (a, b) => 
    (a._1, a._2 + b._2)
  }

Writing Output with Scala

import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
import org.apache.hadoop.mapred.{TextOutputFormat, JobConf}
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.fs.Path

// Configure output
val outputConf = new JobConf()
outputConf.setOutputFormat(classOf[TextOutputFormat[NullWritable, Text]])
outputConf.setOutputKeyClass(classOf[NullWritable])
outputConf.setOutputValueClass(classOf[Text])
TextOutputFormat.setOutputPath(outputConf, new Path("hdfs://namenode:port/output"))

// Create output format
val hadoopOutput = new HadoopOutputFormat(
  new TextOutputFormat[NullWritable, Text](),
  outputConf
)

// Prepare data for output
val outputData: DataSet[(NullWritable, Text)] = processedData.map {
  case (word, count) => (NullWritable.get(), new Text(s"$word: $count"))
}

// Write to Hadoop output
outputData.output(hadoopOutput)
env.execute("Scala Hadoop Integration")

Working with Custom Writable Types

// Define a custom Writable type
class CustomRecord extends Writable {
  var id: Int = 0
  var name: String = ""
  var value: Double = 0.0
  
  def this(id: Int, name: String, value: Double) = {
    this()
    this.id = id
    this.name = name
    this.value = value
  }
  
  override def write(out: DataOutput): Unit = {
    out.writeInt(id)
    out.writeUTF(name)
    out.writeDouble(value)
  }
  
  override def readFields(in: DataInput): Unit = {
    id = in.readInt()
    name = in.readUTF()
    value = in.readDouble()
  }
  
  override def toString: String = s"CustomRecord($id, $name, $value)"
}

// Use custom Writable in Scala
val customData: DataSet[(LongWritable, CustomRecord)] = env.createInput(
  HadoopInputs.createHadoopInput(
    new SequenceFileInputFormat[LongWritable, CustomRecord](),
    classOf[LongWritable],
    classOf[CustomRecord],
    jobConf
  )
)

// Process custom records
val summary = customData
  .map(_._2) // Extract CustomRecord
  .groupBy(_.name)
  .reduce { (a, b) =>
    new CustomRecord(
      math.min(a.id, b.id),
      a.name,
      a.value + b.value
    )
  }

Functional Programming Patterns

// Use Scala's functional programming features
val result = input
  .map(_._2.toString.toLowerCase.trim)
  .filter(_.nonEmpty)
  .flatMap(_.split("\\W+"))
  .filter(word => word.length > 2 && !stopWords.contains(word))
  .map((_, 1))
  .groupBy(0)
  .sum(1)
  .filter(_._2 > threshold)
  .sortPartition(1, Order.DESCENDING)

// Use pattern matching
val categorized = input.map {
  case (offset, text) if text.toString.startsWith("ERROR") => 
    ("error", text.toString)
  case (offset, text) if text.toString.startsWith("WARN") => 
    ("warning", text.toString)
  case (offset, text) => 
    ("info", text.toString)
}

Type-Safe Configuration

// Type-safe configuration helpers
object HadoopConfig {
  def textInput(path: String): JobConf = {
    val conf = new JobConf()
    conf.setInputFormat(classOf[TextInputFormat])
    TextInputFormat.addInputPath(conf, new Path(path))
    conf
  }
  
  def sequenceOutput(path: String): JobConf = {
    val conf = new JobConf()
    conf.setOutputFormat(classOf[SequenceFileOutputFormat[_, _]])
    SequenceFileOutputFormat.setOutputPath(conf, new Path(path))
    conf
  }
}

// Use type-safe configuration
val input = env.createInput(
  HadoopInputs.createHadoopInput(
    new TextInputFormat(),
    classOf[LongWritable],
    classOf[Text],
    HadoopConfig.textInput("/input/path")
  )
)

Error Handling in Scala

import scala.util.{Try, Success, Failure}

// Safe input creation
def createSafeInput(path: String): Option[DataSet[(LongWritable, Text)]] = {
  Try {
    env.createInput(
      HadoopInputs.readHadoopFile(
        new TextInputFormat(),
        classOf[LongWritable],
        classOf[Text],
        path
      )
    )
  } match {
    case Success(input) => Some(input)
    case Failure(exception) =>
      println(s"Failed to create input for path $path: ${exception.getMessage}")
      None
  }
}

// Use safe input creation
createSafeInput("/input/path") match {
  case Some(input) => 
    // Process input
    val result = input.map(_._2.toString).collect()
    println(s"Processed ${result.length} records")
    
  case None => 
    println("Failed to create input, using alternative processing")
    // Handle error case
}

Integration with Flink Scala API Features

DataSet Transformations

// Use Flink's rich transformation API with Hadoop inputs
val processedData = input
  .map(_._2.toString)                    // Extract text
  .flatMap(_.split("\\s+"))             // Split into words
  .map(_.toLowerCase.replaceAll("[^a-z]", "")) // Clean words
  .filter(_.length > 2)                 // Filter short words
  .map((_, 1))                          // Create word count pairs
  .groupBy(0)                           // Group by word
  .sum(1)                               // Sum counts
  .filter(_._2 > 5)                     // Filter rare words
  .sortPartition(1, Order.DESCENDING)   // Sort by count

Broadcast Variables

// Use broadcast variables with Hadoop data
val stopWords = env.fromElements("the", "a", "an", "and", "or", "but")
val broadcastStopWords = stopWords.collect().toSet

val filteredWords = input
  .map(_._2.toString.toLowerCase)
  .flatMap(_.split("\\s+"))
  .filter(word => !broadcastStopWords.contains(word))

Rich Functions

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration

class WordProcessor extends RichMapFunction[String, (String, Int)] {
  var stopWords: Set[String] = _
  
  override def open(parameters: Configuration): Unit = {
    // Initialize from broadcast variable or configuration
    stopWords = Set("the", "a", "an", "and", "or", "but")
  }
  
  override def map(word: String): (String, Int) = {
    val cleaned = word.toLowerCase.replaceAll("[^a-z]", "")
    if (cleaned.length > 2 && !stopWords.contains(cleaned)) {
      (cleaned, 1)
    } else {
      ("", 0) // Will be filtered out later
    }
  }
}

// Use rich function with Hadoop input
val wordCounts = input
  .map(_._2.toString)
  .flatMap(_.split("\\s+"))
  .map(new WordProcessor())
  .filter(_._1.nonEmpty)
  .groupBy(0)
  .sum(1)

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11

docs

configuration.md

index.md

input-formats.md

mapreduce-functions.md

output-formats.md

scala-api.md

type-system.md

tile.json