Apache Flink Core runtime components, type system, and foundational APIs for stream processing applications
—
Apache Flink Core provides a rich set of utility classes and interfaces for common operations like I/O handling, memory management, filesystem operations, and general-purpose utilities. These components form the foundation for many Flink operations and can be used to build efficient applications.
The primary mechanism for emitting records in Flink functions.
import org.apache.flink.util.Collector;
// Using collector in FlatMapFunction
public class TokenizerFunction implements FlatMapFunction<String, String> {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String word : value.split("\\s+")) {
if (!word.isEmpty()) {
out.collect(word);
}
}
}
}
// Custom collector implementation
public class FilteringCollector<T> implements Collector<T> {
private final Collector<T> delegate;
private final Predicate<T> filter;
public FilteringCollector(Collector<T> delegate, Predicate<T> filter) {
this.delegate = delegate;
this.filter = filter;
}
@Override
public void collect(T record) {
if (filter.test(record)) {
delegate.collect(record);
}
}
@Override
public void close() {
delegate.close();
}
}
// Batching collector
public class BatchingCollector<T> implements Collector<T> {
private final Collector<List<T>> delegate;
private final int batchSize;
private final List<T> buffer;
public BatchingCollector(Collector<List<T>> delegate, int batchSize) {
this.delegate = delegate;
this.batchSize = batchSize;
this.buffer = new ArrayList<>(batchSize);
}
@Override
public void collect(T record) {
buffer.add(record);
if (buffer.size() >= batchSize) {
flush();
}
}
@Override
public void close() {
if (!buffer.isEmpty()) {
flush();
}
delegate.close();
}
private void flush() {
if (!buffer.isEmpty()) {
delegate.collect(new ArrayList<>(buffer));
buffer.clear();
}
}
}Handle resource cleanup for iterators backed by native resources.
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CloseableIterable;
// File-based closeable iterator
public class FileLineIterator implements CloseableIterator<String> {
private final BufferedReader reader;
private String nextLine;
private boolean closed = false;
public FileLineIterator(Path filePath) throws IOException {
this.reader = Files.newBufferedReader(filePath);
this.nextLine = reader.readLine();
}
@Override
public boolean hasNext() {
return !closed && nextLine != null;
}
@Override
public String next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
String current = nextLine;
try {
nextLine = reader.readLine();
} catch (IOException e) {
throw new RuntimeException("Error reading file", e);
}
return current;
}
@Override
public void close() throws IOException {
if (!closed) {
closed = true;
reader.close();
}
}
}
// Directory-based closeable iterable
public class DirectoryIterable implements CloseableIterable<Path> {
private final Path directory;
public DirectoryIterable(Path directory) {
this.directory = directory;
}
@Override
public CloseableIterator<Path> iterator() {
try {
Stream<Path> stream = Files.list(directory);
Iterator<Path> iterator = stream.iterator();
return new CloseableIterator<Path>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public Path next() {
return iterator.next();
}
@Override
public void close() {
stream.close();
}
};
} catch (IOException e) {
throw new RuntimeException("Error listing directory", e);
}
}
@Override
public void close() throws IOException {
// Nothing specific to close for directory itself
}
}
// Using closeable iterables safely
public class CloseableIterableExample {
public static void processFiles(Path directory) {
try (DirectoryIterable iterable = new DirectoryIterable(directory)) {
try (CloseableIterator<Path> iterator = iterable.iterator()) {
while (iterator.hasNext()) {
Path file = iterator.next();
System.out.println("Processing file: " + file);
// Process file with another closeable iterator
if (Files.isRegularFile(file)) {
try (FileLineIterator lineIterator = new FileLineIterator(file)) {
while (lineIterator.hasNext()) {
String line = lineIterator.next();
processLine(line);
}
}
}
}
}
} catch (IOException e) {
System.err.println("Error processing files: " + e.getMessage());
}
}
private static void processLine(String line) {
// Process individual line
System.out.println("Line: " + line);
}
}import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.HybridMemorySegment;
// Working with memory segments
public class MemorySegmentExample {
public static void basicMemorySegmentOperations() {
// Allocate memory segment
byte[] buffer = new byte[1024];
MemorySegment segment = MemorySegment.wrap(buffer);
// Write primitives
segment.putInt(0, 42);
segment.putLong(4, 123456789L);
segment.putDouble(12, 3.14159);
// Write bytes
byte[] data = "Hello, Flink!".getBytes();
segment.put(20, data);
// Read primitives
int intValue = segment.getInt(0);
long longValue = segment.getLong(4);
double doubleValue = segment.getDouble(12);
// Read bytes
byte[] readData = new byte[data.length];
segment.get(20, readData);
String text = new String(readData);
System.out.println("Int: " + intValue);
System.out.println("Long: " + longValue);
System.out.println("Double: " + doubleValue);
System.out.println("Text: " + text);
}
public static void memorySegmentUtilities() {
MemorySegment segment1 = MemorySegment.wrap(new byte[100]);
MemorySegment segment2 = MemorySegment.wrap(new byte[100]);
// Fill with pattern
segment1.put(0, (byte) 0xAA, 50); // Fill first 50 bytes with 0xAA
// Copy between segments
segment1.copyTo(0, segment2, 0, 50);
// Compare segments
int comparison = segment1.compare(segment2, 0, 0, 50);
System.out.println("Segments equal: " + (comparison == 0));
// Swap bytes
segment1.swapBytes(new byte[100], segment2, 0, 0, 50);
}
}import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
// Custom serializable object
public class SerializableRecord implements IOReadableWritable {
private String name;
private int value;
private long timestamp;
public SerializableRecord() {
// Default constructor required
}
public SerializableRecord(String name, int value, long timestamp) {
this.name = name;
this.value = value;
this.timestamp = timestamp;
}
@Override
public void write(DataOutputView out) throws IOException {
out.writeUTF(name != null ? name : "");
out.writeInt(value);
out.writeLong(timestamp);
}
@Override
public void read(DataInputView in) throws IOException {
this.name = in.readUTF();
this.value = in.readInt();
this.timestamp = in.readLong();
// Handle empty string case
if (this.name.isEmpty()) {
this.name = null;
}
}
// Getters and setters
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public int getValue() { return value; }
public void setValue(int value) { this.value = value; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
@Override
public String toString() {
return "SerializableRecord{name='" + name + "', value=" + value +
", timestamp=" + timestamp + "}";
}
}
// I/O utilities for serialization
public class IOUtils {
public static byte[] serialize(IOReadableWritable object) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos)) {
DataOutputView outputView = new DataOutputViewStreamWrapper(dos);
object.write(outputView);
return baos.toByteArray();
}
}
public static <T extends IOReadableWritable> T deserialize(byte[] data,
Class<T> clazz) throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(data);
DataInputStream dis = new DataInputStream(bais)) {
T object = clazz.newInstance();
DataInputView inputView = new DataInputViewStreamWrapper(dis);
object.read(inputView);
return object;
} catch (InstantiationException | IllegalAccessException e) {
throw new IOException("Failed to instantiate object", e);
}
}
public static void serializeToFile(IOReadableWritable object, Path filePath)
throws IOException {
try (FileOutputStream fos = Files.newOutputStream(filePath);
DataOutputStream dos = new DataOutputStream(fos)) {
DataOutputView outputView = new DataOutputViewStreamWrapper(dos);
object.write(outputView);
}
}
public static <T extends IOReadableWritable> T deserializeFromFile(Path filePath,
Class<T> clazz) throws IOException {
try (FileInputStream fis = Files.newInputStream(filePath);
DataInputStream dis = new DataInputStream(fis)) {
T object = clazz.newInstance();
DataInputView inputView = new DataInputViewStreamWrapper(dis);
object.read(inputView);
return object;
} catch (InstantiationException | IllegalAccessException e) {
throw new IOException("Failed to instantiate object", e);
}
}
}import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
// Filesystem operations
public class FileSystemUtils {
public static void fileSystemOperations() throws IOException {
// Get filesystem for path
Path remotePath = new Path("hdfs://namenode:8020/data/input.txt");
FileSystem fs = remotePath.getFileSystem();
// Check if file/directory exists
boolean exists = fs.exists(remotePath);
System.out.println("File exists: " + exists);
// Get file status
if (exists) {
FileStatus status = fs.getFileStatus(remotePath);
System.out.println("File size: " + status.getLen());
System.out.println("Is directory: " + status.isDir());
System.out.println("Modification time: " + status.getModificationTime());
}
// List files in directory
Path directory = new Path("hdfs://namenode:8020/data/");
if (fs.exists(directory)) {
FileStatus[] files = fs.listStatus(directory);
for (FileStatus file : files) {
System.out.println("File: " + file.getPath() +
" (size: " + file.getLen() + ")");
}
}
}
public static void readFromFileSystem() throws IOException {
Path inputPath = new Path("hdfs://namenode:8020/data/input.txt");
FileSystem fs = inputPath.getFileSystem();
try (FSDataInputStream inputStream = fs.open(inputPath);
BufferedReader reader = new BufferedReader(
new InputStreamReader(inputStream))) {
String line;
while ((line = reader.readLine()) != null) {
System.out.println("Read line: " + line);
}
}
}
public static void writeToFileSystem() throws IOException {
Path outputPath = new Path("hdfs://namenode:8020/data/output.txt");
FileSystem fs = outputPath.getFileSystem();
try (FSDataOutputStream outputStream = fs.create(outputPath,
FileSystem.WriteMode.OVERWRITE);
PrintWriter writer = new PrintWriter(
new OutputStreamWriter(outputStream))) {
writer.println("Hello, Flink FileSystem!");
writer.println("Writing to: " + outputPath);
}
}
public static void copyFiles() throws IOException {
Path sourcePath = new Path("file:///local/source.txt");
Path destPath = new Path("hdfs://namenode:8020/data/copied.txt");
FileSystem sourceFs = sourcePath.getFileSystem();
FileSystem destFs = destPath.getFileSystem();
try (FSDataInputStream input = sourceFs.open(sourcePath);
FSDataOutputStream output = destFs.create(destPath,
FileSystem.WriteMode.OVERWRITE)) {
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = input.read(buffer)) != -1) {
output.write(buffer, 0, bytesRead);
}
}
}
public static void atomicFileOperations() throws IOException {
Path tempPath = new Path("hdfs://namenode:8020/data/temp_file.txt");
Path finalPath = new Path("hdfs://namenode:8020/data/final_file.txt");
FileSystem fs = tempPath.getFileSystem();
// Write to temporary file first
try (FSDataOutputStream output = fs.create(tempPath,
FileSystem.WriteMode.OVERWRITE);
PrintWriter writer = new PrintWriter(
new OutputStreamWriter(output))) {
writer.println("Critical data");
writer.println("Must be written atomically");
}
// Atomically rename to final location
if (fs.rename(tempPath, finalPath)) {
System.out.println("File written atomically");
} else {
System.err.println("Failed to rename file");
fs.delete(tempPath, false); // Clean up temp file
}
}
}import org.apache.flink.util.AbstractAutoCloseableRegistry;
import org.apache.flink.util.AutoCloseableAsync;
// Resource registry for managing multiple resources
public class ResourceManager extends AbstractAutoCloseableRegistry<Closeable, IOException> {
@Override
protected void doClose() throws IOException {
IOException exception = null;
for (Closeable resource : getCloseableIterator()) {
try {
resource.close();
} catch (IOException e) {
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
}
}
if (exception != null) {
throw exception;
}
}
}
// Async closeable implementation
public class AsyncResource implements AutoCloseableAsync {
private final ExecutorService executor;
private volatile boolean closed = false;
public AsyncResource() {
this.executor = Executors.newSingleThreadExecutor();
}
public void doWork() {
if (closed) {
throw new IllegalStateException("Resource is closed");
}
executor.submit(() -> {
// Simulate async work
try {
Thread.sleep(1000);
System.out.println("Work completed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
@Override
public CompletableFuture<Void> closeAsync() {
if (closed) {
return CompletableFuture.completedFuture(null);
}
closed = true;
return CompletableFuture.runAsync(() -> {
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
});
}
}
// Usage example
public class ResourceManagementExample {
public static void manageResources() {
try (ResourceManager resourceManager = new ResourceManager()) {
// Register multiple resources
FileInputStream fis1 = new FileInputStream("file1.txt");
FileInputStream fis2 = new FileInputStream("file2.txt");
Socket socket = new Socket("localhost", 8080);
resourceManager.registerCloseable(fis1);
resourceManager.registerCloseable(fis2);
resourceManager.registerCloseable(socket);
// Use resources
processFiles(fis1, fis2);
communicateWithServer(socket);
// All resources will be closed automatically
} catch (IOException e) {
System.err.println("Error managing resources: " + e.getMessage());
}
}
public static void manageAsyncResources() {
AsyncResource resource1 = new AsyncResource();
AsyncResource resource2 = new AsyncResource();
try {
// Use resources
resource1.doWork();
resource2.doWork();
// Close asynchronously
CompletableFuture<Void> closeAll = CompletableFuture.allOf(
resource1.closeAsync(),
resource2.closeAsync()
);
closeAll.get(1, TimeUnit.MINUTES);
System.out.println("All async resources closed");
} catch (Exception e) {
System.err.println("Error managing async resources: " + e.getMessage());
}
}
private static void processFiles(FileInputStream... streams) {
// Process files
}
private static void communicateWithServer(Socket socket) {
// Communicate with server
}
}import org.apache.flink.util.RefCounted;
// Reference counted resource
public class RefCountedResource implements RefCounted {
private final AtomicInteger refCount = new AtomicInteger(1);
private final String resourceName;
private volatile boolean disposed = false;
public RefCountedResource(String resourceName) {
this.resourceName = resourceName;
System.out.println("Created resource: " + resourceName);
}
@Override
public void retain() {
if (disposed) {
throw new IllegalStateException("Resource already disposed: " + resourceName);
}
int newCount = refCount.incrementAndGet();
System.out.println("Retained " + resourceName + ", ref count: " + newCount);
}
@Override
public boolean release() {
if (disposed) {
return false;
}
int newCount = refCount.decrementAndGet();
System.out.println("Released " + resourceName + ", ref count: " + newCount);
if (newCount == 0) {
dispose();
return true;
} else if (newCount < 0) {
throw new IllegalStateException("Reference count became negative: " + newCount);
}
return false;
}
public void use() {
if (disposed) {
throw new IllegalStateException("Cannot use disposed resource: " + resourceName);
}
System.out.println("Using resource: " + resourceName);
// Simulate resource usage
}
private void dispose() {
if (!disposed) {
disposed = true;
System.out.println("Disposed resource: " + resourceName);
// Cleanup logic here
}
}
public boolean isDisposed() {
return disposed;
}
}
// Reference counted resource manager
public class RefCountedResourceManager {
public static void demonstrateRefCounting() {
RefCountedResource resource = new RefCountedResource("SharedBuffer");
// Simulate multiple consumers
Thread consumer1 = new Thread(() -> {
resource.retain();
try {
resource.use();
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
resource.release();
}
});
Thread consumer2 = new Thread(() -> {
resource.retain();
try {
resource.use();
Thread.sleep(1500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
resource.release();
}
});
consumer1.start();
consumer2.start();
// Original reference
try {
consumer1.join();
consumer2.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Release original reference
boolean disposed = resource.release();
System.out.println("Resource disposed: " + disposed);
}
}import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ConsumerWithException;
// Functional interfaces that can throw exceptions
public class FunctionalUtilities {
public static <T, R> Function<T, R> wrapFunction(FunctionWithException<T, R, Exception> function) {
return input -> {
try {
return function.apply(input);
} catch (Exception e) {
throw new RuntimeException("Function execution failed", e);
}
};
}
public static <T> Consumer<T> wrapConsumer(ConsumerWithException<T, Exception> consumer) {
return input -> {
try {
consumer.accept(input);
} catch (Exception e) {
throw new RuntimeException("Consumer execution failed", e);
}
};
}
public static void functionalExceptionHandling() {
List<String> filePaths = Arrays.asList("file1.txt", "file2.txt", "file3.txt");
// Using wrapped function that can throw exceptions
List<Integer> lineCounts = filePaths.stream()
.map(wrapFunction(path -> {
try (BufferedReader reader = Files.newBufferedReader(Paths.get(path))) {
return (int) reader.lines().count();
}
}))
.collect(Collectors.toList());
System.out.println("Line counts: " + lineCounts);
// Using wrapped consumer
filePaths.forEach(wrapConsumer(path -> {
try (BufferedReader reader = Files.newBufferedReader(Paths.get(path))) {
long lines = reader.lines().count();
System.out.println(path + " has " + lines + " lines");
}
}));
}
}import org.apache.flink.util.Visitor;
import org.apache.flink.util.Visitable;
// Tree node that supports visitor pattern
public abstract class TreeNode implements Visitable<TreeNode> {
protected final String name;
public TreeNode(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
public class LeafNode extends TreeNode {
private final String value;
public LeafNode(String name, String value) {
super(name);
this.value = value;
}
public String getValue() {
return value;
}
@Override
public void accept(Visitor<TreeNode> visitor) {
visitor.visit(this);
}
}
public class BranchNode extends TreeNode {
private final List<TreeNode> children;
public BranchNode(String name) {
super(name);
this.children = new ArrayList<>();
}
public void addChild(TreeNode child) {
children.add(child);
}
public List<TreeNode> getChildren() {
return children;
}
@Override
public void accept(Visitor<TreeNode> visitor) {
visitor.visit(this);
for (TreeNode child : children) {
child.accept(visitor);
}
}
}
// Visitor implementations
public class PrintVisitor implements Visitor<TreeNode> {
private int depth = 0;
@Override
public void visit(TreeNode node) {
String indent = " ".repeat(depth);
if (node instanceof LeafNode) {
LeafNode leaf = (LeafNode) node;
System.out.println(indent + leaf.getName() + ": " + leaf.getValue());
} else if (node instanceof BranchNode) {
BranchNode branch = (BranchNode) node;
System.out.println(indent + branch.getName() + "/");
depth++;
// Children will be visited automatically
depth--;
}
}
}
public class CountingVisitor implements Visitor<TreeNode> {
private int leafCount = 0;
private int branchCount = 0;
@Override
public void visit(TreeNode node) {
if (node instanceof LeafNode) {
leafCount++;
} else if (node instanceof BranchNode) {
branchCount++;
}
}
public int getLeafCount() {
return leafCount;
}
public int getBranchCount() {
return branchCount;
}
}
// Usage example
public class VisitorPatternExample {
public static void demonstrateVisitorPattern() {
// Build tree
BranchNode root = new BranchNode("root");
BranchNode config = new BranchNode("config");
config.addChild(new LeafNode("host", "localhost"));
config.addChild(new LeafNode("port", "8080"));
BranchNode data = new BranchNode("data");
data.addChild(new LeafNode("input", "/data/input"));
data.addChild(new LeafNode("output", "/data/output"));
root.addChild(config);
root.addChild(data);
// Use visitors
System.out.println("Tree structure:");
root.accept(new PrintVisitor());
CountingVisitor counter = new CountingVisitor();
root.accept(counter);
System.out.println("Branches: " + counter.getBranchCount());
System.out.println("Leaves: " + counter.getLeafCount());
}
}Apache Flink's utility classes provide essential building blocks for efficient resource management, I/O operations, and common programming patterns. By leveraging these utilities, you can build more robust and efficient Flink applications while following established patterns for resource cleanup, exception handling, and data processing.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-core