Apache Flink Scala API provides a comprehensive type system with automatic type information generation for Scala types, ensuring type safety and efficient serialization for distributed computing.
// Implicit type information generation via macro
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
// Explicit type information for Nothing type
implicit val scalaNothingTypeInfo: TypeInformation[Nothing] = new ScalaNothingTypeInfo()// Abstract base class for all type information
abstract class TypeInformation[T] {
def getTypeClass: Class[T]
def isBasicType: Boolean
def isTupleType: Boolean
def getArity: Int
def getTotalFields: Int
def createSerializer(config: ExecutionConfig): TypeSerializer[T]
}
// Type information for case classes
class CaseClassTypeInfo[T](
clazz: Class[T],
fieldTypes: Array[TypeInformation[_]],
fieldNames: Array[String]
) extends TypeInformation[T]class OptionTypeInfo[T](
elemType: TypeInformation[T]
) extends TypeInformation[Option[T]]
class OptionTypeComparator[T](
elemComparator: TypeComparator[T]
) extends TypeComparator[Option[T]]
class OptionSerializer[T](
elemSerializer: TypeSerializer[T]
) extends TypeSerializer[Option[T]]class EitherTypeInfo[A, B](
leftType: TypeInformation[A],
rightType: TypeInformation[B]
) extends TypeInformation[Either[A, B]]
class EitherSerializer[A, B](
leftSerializer: TypeSerializer[A],
rightSerializer: TypeSerializer[B]
) extends TypeSerializer[Either[A, B]]class TryTypeInfo[T](
elemType: TypeInformation[T],
throwableTypeInfo: TypeInformation[Throwable]
) extends TypeInformation[Try[T]]
class TrySerializer[T](
elemSerializer: TypeSerializer[T],
throwableSerializer: TypeSerializer[Throwable]
) extends TypeSerializer[Try[T]]class TraversableTypeInfo[T](
elemType: TypeInformation[T],
traversableClass: Class[_]
) extends TypeInformation[T]
class TraversableSerializer[T](
elemSerializer: TypeSerializer[T],
traversableClass: Class[_]
) extends TypeSerializer[T]class UnitTypeInfo extends TypeInformation[Unit]
class UnitSerializer extends TypeSerializer[Unit]class EnumValueTypeInfo[E <: Enumeration](
enum: E
) extends TypeInformation[E#Value]
class EnumValueSerializer[E <: Enumeration](
enum: E
) extends TypeSerializer[E#Value]
class EnumValueComparator[E <: Enumeration](
enum: E
) extends TypeComparator[E#Value]// Abstract base class for serializers
abstract class TypeSerializer[T] {
def duplicate(): TypeSerializer[T]
def createInstance(): T
def copy(from: T): T
def copy(from: T, reuse: T): T
def getLength: Int
def serialize(record: T, target: DataOutputView): Unit
def deserialize(source: DataInputView): T
def deserialize(reuse: T, source: DataInputView): T
}
// Case class serialization
class CaseClassSerializer[T](
clazz: Class[T],
fieldSerializers: Array[TypeSerializer[_]]
) extends TypeSerializer[T]// Serializer for Nothing type
class NothingSerializer extends TypeSerializer[Nothing]
// Tuple serialization helper
class Tuple2CaseClassSerializer[T1, T2](
clazz: Class[(T1, T2)],
fieldSerializers: Array[TypeSerializer[_]]
) extends CaseClassSerializer[(T1, T2)]object TypeUtils {
// Macro for creating type information
def createTypeInfo[T]: TypeInformation[T] = macro TypeInformationGen.mkTypeInfo[T]
// Helper methods for type analysis
def createTuple2TypeInformation[T1, T2](
t1: TypeInformation[T1],
t2: TypeInformation[T2]
): TypeInformation[(T1, T2)]
}// Macro-based code generation for type information
object TypeInformationGen {
def mkTypeInfo[T: c.WeakTypeTag](c: Context): c.Expr[TypeInformation[T]]
}
// Type analysis utilities
object TypeAnalyzer {
def getParameterType(tpe: c.Type, clazz: Class[_]): c.Type
}
// AST generation utilities
object TreeGen {
def mkMethodCall(receiver: c.Tree, method: String, args: c.Tree*): c.Tree
}import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
// Type information is automatically generated
case class Person(name: String, age: Int, city: String)
val people = env.fromElements(
Person("Alice", 25, "New York"),
Person("Bob", 30, "London")
)
// TypeInformation[Person] is automatically created via macro
val names = people.map(_.name) // TypeInformation[String] is automaticimport org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
case class PersonWithMiddleName(
firstName: String,
middleName: Option[String],
lastName: String
)
val people = env.fromElements(
PersonWithMiddleName("John", Some("David"), "Smith"),
PersonWithMiddleName("Jane", None, "Doe")
)
// Filter people with middle names
val withMiddleNames = people.filter(_.middleName.isDefined)
// Extract middle names with default
val middleNames = people.map(p => p.middleName.getOrElse("N/A"))import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
// Either for error handling
case class ProcessingResult(
id: String,
result: Either[String, Int] // Left = error message, Right = success value
)
val results = env.fromElements(
ProcessingResult("task1", Right(42)),
ProcessingResult("task2", Left("Division by zero")),
ProcessingResult("task3", Right(100))
)
// Separate successful and failed results
val successful = results.filter(_.result.isRight).map(r => (r.id, r.result.right.get))
val failed = results.filter(_.result.isLeft).map(r => (r.id, r.result.left.get))import org.apache.flink.api.scala._
import scala.util.{Try, Success, Failure}
val env = ExecutionEnvironment.getExecutionEnvironment
case class SafeOperation(
input: String,
result: Try[Int]
)
val operations = env.fromElements(
SafeOperation("42", Try("42".toInt)),
SafeOperation("abc", Try("abc".toInt)), // This will be a Failure
SafeOperation("100", Try("100".toInt))
)
// Process successful operations
val successResults = operations.flatMap { op =>
op.result match {
case Success(value) => Some((op.input, value))
case Failure(_) => None
}
}
// Count failures
val failureCount = operations.map(_.result).filter(_.isFailure).count()import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
case class PersonWithHobbies(
name: String,
hobbies: List[String]
)
val people = env.fromElements(
PersonWithHobbies("Alice", List("reading", "swimming", "coding")),
PersonWithHobbies("Bob", List("gaming", "cooking")),
PersonWithHobbies("Charlie", List())
)
// Flatten hobbies
val allHobbies = people.flatMap(_.hobbies)
// People with multiple hobbies
val socialPeople = people.filter(_.hobbies.length > 1)import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
// Complex nested case class
case class Address(street: String, city: String, zipCode: String)
case class Employee(
id: Long,
name: String,
address: Address,
salary: Option[Double],
skills: List[String]
)
val employees = env.fromElements(
Employee(
1L,
"Alice Johnson",
Address("123 Main St", "New York", "10001"),
Some(75000.0),
List("Scala", "Flink", "Kafka")
),
Employee(
2L,
"Bob Smith",
Address("456 Oak Ave", "San Francisco", "94102"),
None,
List("Java", "Spring")
)
)
// Type information is automatically generated for nested structures
val highSkilled = employees.filter(_.skills.length >= 3)import org.apache.flink.api.scala._
object Status extends Enumeration {
type Status = Value
val Pending, Active, Inactive, Deleted = Value
}
case class User(id: Long, name: String, status: Status.Status)
val env = ExecutionEnvironment.getExecutionEnvironment
val users = env.fromElements(
User(1L, "Alice", Status.Active),
User(2L, "Bob", Status.Pending),
User(3L, "Charlie", Status.Inactive)
)
// Filter by status
val activeUsers = users.filter(_.status == Status.Active)
// Group by status
val usersByStatus = users.groupBy(_.status.toString)import org.apache.flink.api.scala._
import org.apache.flink.api.common.typeinfo.TypeInformation
val env = ExecutionEnvironment.getExecutionEnvironment
// Sometimes you need explicit type information
def processGenericData[T: TypeInformation: ClassTag](data: DataSet[T]): DataSet[String] = {
data.map(_.toString)
}
val numbers = env.fromElements(1, 2, 3, 4, 5)
val strings = processGenericData(numbers)