Apache Flink Streaming Java API - Core library for building streaming data processing applications in Java, providing DataStream API, windowing operations, state management, event time processing, and fault-tolerant stream processing capabilities
—
Process functions provide the most flexible way to process streams in Apache Flink, offering access to timers, state, side outputs, and watermarks. They enable complex event processing patterns and stateful computations.
The base process function for transforming elements with access to context and timers.
/**
* Process each element with access to context and timer services
* @param value - input element
* @param ctx - processing context
* @param out - collector for output elements
*/
abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
/**
* Handle timer events
* @param timestamp - timer timestamp
* @param ctx - timer context
* @param out - collector for output elements
*/
void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;Usage Examples:
DataStream<String> result = input.process(new ProcessFunction<Event, String>() {
@Override
public void processElement(Event event, Context ctx, Collector<String> out) {
// Process element
out.collect("Processed: " + event.getValue());
// Register timer for 1 minute later
ctx.timerService().registerProcessingTimeTimer(
ctx.timerService().currentProcessingTime() + 60000
);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
out.collect("Timer fired at: " + timestamp);
}
});Process function for keyed streams with access to keyed state and per-key timers.
/**
* Process each element in a keyed stream
* @param value - input element
* @param ctx - keyed processing context with access to current key
* @param out - collector for output elements
*/
abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
/**
* Handle timer events for keyed streams
* @param timestamp - timer timestamp
* @param ctx - keyed timer context
* @param out - collector for output elements
*/
void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;Usage Examples:
DataStream<Alert> alerts = keyedStream.process(
new KeyedProcessFunction<String, Event, Alert>() {
private ValueState<Long> lastEventTime;
private ValueState<Integer> eventCount;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Long> timeDescriptor =
new ValueStateDescriptor<>("lastEventTime", Long.class);
lastEventTime = getRuntimeContext().getState(timeDescriptor);
ValueStateDescriptor<Integer> countDescriptor =
new ValueStateDescriptor<>("eventCount", Integer.class);
eventCount = getRuntimeContext().getState(countDescriptor);
}
@Override
public void processElement(Event event, Context ctx, Collector<Alert> out) throws Exception {
Long lastTime = lastEventTime.value();
Integer count = eventCount.value();
if (count == null) {
count = 0;
}
// Update state
lastEventTime.update(event.getTimestamp());
eventCount.update(count + 1);
// Check for rapid events
if (lastTime != null && event.getTimestamp() - lastTime < 1000) {
out.collect(new Alert("Rapid events for key: " + ctx.getCurrentKey()));
}
// Set cleanup timer
ctx.timerService().registerEventTimeTimer(event.getTimestamp() + 300000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
// Clear state on timer
lastEventTime.clear();
eventCount.clear();
}
}
);Process function for connected streams, enabling joint processing of two different stream types.
/**
* Process element from first stream
* @param value - element from first stream
* @param ctx - processing context
* @param out - collector for output elements
*/
abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
/**
* Process element from second stream
* @param value - element from second stream
* @param ctx - processing context
* @param out - collector for output elements
*/
abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
/**
* Handle timer events for connected streams
*/
void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;Usage Examples:
ConnectedStreams<Order, Payment> connected = orders.connect(payments);
DataStream<Transaction> transactions = connected.keyBy(
order -> order.getOrderId(),
payment -> payment.getOrderId()
).process(new CoProcessFunction<Order, Payment, Transaction>() {
private ValueState<Order> orderState;
private ValueState<Payment> paymentState;
@Override
public void open(Configuration parameters) {
orderState = getRuntimeContext().getState(
new ValueStateDescriptor<>("order", Order.class));
paymentState = getRuntimeContext().getState(
new ValueStateDescriptor<>("payment", Payment.class));
}
@Override
public void processElement1(Order order, Context ctx, Collector<Transaction> out) throws Exception {
Payment payment = paymentState.value();
if (payment != null) {
// Both order and payment available
out.collect(new Transaction(order, payment));
paymentState.clear();
} else {
// Store order and wait for payment
orderState.update(order);
// Set timeout timer
ctx.timerService().registerEventTimeTimer(order.getTimestamp() + 300000);
}
}
@Override
public void processElement2(Payment payment, Context ctx, Collector<Transaction> out) throws Exception {
Order order = orderState.value();
if (order != null) {
// Both order and payment available
out.collect(new Transaction(order, payment));
orderState.clear();
} else {
// Store payment and wait for order
paymentState.update(payment);
// Set timeout timer
ctx.timerService().registerEventTimeTimer(payment.getTimestamp() + 300000);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Transaction> out) {
// Timeout - clear any remaining state
orderState.clear();
paymentState.clear();
}
});Process function for keyed connected streams, combining the capabilities of CoProcessFunction with keyed state management and timer functionality.
/**
* Process element from first keyed stream
* @param value - element from first stream
* @param ctx - processing context with key access
* @param out - collector for output elements
*/
abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
/**
* Process element from second keyed stream
* @param value - element from second stream
* @param ctx - processing context with key access
* @param out - collector for output elements
*/
abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
/**
* Handle timer events for keyed connected streams
* @param timestamp - timer timestamp
* @param ctx - timer context with key access
* @param out - collector for output elements
*/
void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
/**
* Context for keyed connected stream processing
*/
abstract class Context {
/**
* Get timestamp of current element
*/
abstract Long timestamp();
/**
* Get timer service for registering timers
*/
abstract TimerService timerService();
/**
* Output to side output
*/
abstract <X> void output(OutputTag<X> outputTag, X value);
/**
* Get current key
*/
abstract K getCurrentKey();
}Usage Examples:
ConnectedStreams<Order, Shipment> connected = orders.connect(shipments);
DataStream<OrderStatus> statusUpdates = connected.keyBy(
order -> order.getOrderId(),
shipment -> shipment.getOrderId()
).process(new KeyedCoProcessFunction<String, Order, Shipment, OrderStatus>() {
private ValueState<Order> orderState;
private ValueState<Shipment> shipmentState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Order> orderDescriptor =
new ValueStateDescriptor<>("order", Order.class);
ValueStateDescriptor<Shipment> shipmentDescriptor =
new ValueStateDescriptor<>("shipment", Shipment.class);
orderState = getRuntimeContext().getState(orderDescriptor);
shipmentState = getRuntimeContext().getState(shipmentDescriptor);
}
@Override
public void processElement1(Order order, Context ctx, Collector<OrderStatus> out) throws Exception {
String orderId = ctx.getCurrentKey();
orderState.update(order);
Shipment shipment = shipmentState.value();
if (shipment != null) {
// Order and shipment both available
out.collect(new OrderStatus(orderId, "SHIPPED", order, shipment));
shipmentState.clear();
} else {
// Order received, waiting for shipment
out.collect(new OrderStatus(orderId, "PROCESSING", order, null));
// Set timeout for order processing
ctx.timerService().registerEventTimeTimer(order.getOrderTime() + Duration.ofHours(24).toMillis());
}
}
@Override
public void processElement2(Shipment shipment, Context ctx, Collector<OrderStatus> out) throws Exception {
String orderId = ctx.getCurrentKey();
Order order = orderState.value();
if (order != null) {
// Order and shipment both available
out.collect(new OrderStatus(orderId, "SHIPPED", order, shipment));
orderState.clear();
} else {
// Shipment before order (unusual case)
shipmentState.update(shipment);
out.collect(new OrderStatus(orderId, "SHIPPED_EARLY", null, shipment));
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OrderStatus> out) throws Exception {
String orderId = ctx.getCurrentKey();
Order order = orderState.value();
if (order != null) {
// Order timeout - no shipment received
out.collect(new OrderStatus(orderId, "TIMEOUT", order, null));
orderState.clear();
}
// Clean up any remaining shipment state
shipmentState.clear();
}
});Process function for windowed streams with access to window metadata and state.
/**
* Process all elements in a window
* @param key - window key
* @param context - window context with metadata
* @param elements - all elements in the window
* @param out - collector for output elements
*/
abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
/**
* Clear any window state when window is purged
* @param context - window context
*/
void clear(Context context) throws Exception;Usage Examples:
DataStream<WindowResult> windowResults = keyedStream
.timeWindow(Time.minutes(5))
.process(new ProcessWindowFunction<Event, WindowResult, String, TimeWindow>() {
@Override
public void process(
String key,
Context context,
Iterable<Event> elements,
Collector<WindowResult> out
) throws Exception {
int count = 0;
double sum = 0.0;
long minTimestamp = Long.MAX_VALUE;
long maxTimestamp = Long.MIN_VALUE;
for (Event event : elements) {
count++;
sum += event.getValue();
minTimestamp = Math.min(minTimestamp, event.getTimestamp());
maxTimestamp = Math.max(maxTimestamp, event.getTimestamp());
}
WindowResult result = new WindowResult(
key,
context.window().getStart(),
context.window().getEnd(),
count,
sum / count, // average
minTimestamp,
maxTimestamp,
context.currentWatermark()
);
out.collect(result);
}
});Use side outputs to emit multiple types of data from a single process function.
// Emit to side output within process function
ctx.output(OutputTag<X> outputTag, X value);
// Retrieve side output from operator result
DataStream<X> getSideOutput(OutputTag<X> sideOutputTag);Usage Examples:
// Define side output tags
final OutputTag<String> errorTag = new OutputTag<String>("errors"){};
final OutputTag<String> warningTag = new OutputTag<String>("warnings"){};
SingleOutputStreamOperator<String> mainStream = input.process(
new ProcessFunction<Event, String>() {
@Override
public void processElement(Event event, Context ctx, Collector<String> out) {
if (event.isError()) {
ctx.output(errorTag, "Error: " + event.getMessage());
} else if (event.isWarning()) {
ctx.output(warningTag, "Warning: " + event.getMessage());
} else {
out.collect("Info: " + event.getMessage());
}
}
}
);
// Get side output streams
DataStream<String> errors = mainStream.getSideOutput(errorTag);
DataStream<String> warnings = mainStream.getSideOutput(warningTag);Access timer services for time-based processing and cleanup.
// Timer service methods available in process function context
TimerService timerService();
// Timer service interface
interface TimerService {
long currentProcessingTime();
long currentWatermark();
void registerProcessingTimeTimer(long time);
void registerEventTimeTimer(long time);
void deleteProcessingTimeTimer(long time);
void deleteEventTimeTimer(long time);
}Usage Examples:
public class TimerExampleFunction extends KeyedProcessFunction<String, Event, String> {
@Override
public void processElement(Event event, Context ctx, Collector<String> out) throws Exception {
TimerService timerService = ctx.timerService();
// Get current times
long processingTime = timerService.currentProcessingTime();
long watermark = timerService.currentWatermark();
// Register timers
long processingTimeTimer = processingTime + 60000; // 1 minute later
long eventTimeTimer = event.getTimestamp() + 300000; // 5 minutes after event
timerService.registerProcessingTimeTimer(processingTimeTimer);
timerService.registerEventTimeTimer(eventTimeTimer);
// Store timer timestamps for potential deletion
// (using state to remember timer timestamps)
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
if (ctx.timeDomain() == TimeDomain.PROCESSING_TIME) {
out.collect("Processing time timer fired at: " + timestamp);
} else {
out.collect("Event time timer fired at: " + timestamp);
}
}
}// Base process function
abstract class ProcessFunction<I, O> extends AbstractRichFunction {
abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
abstract class Context {
abstract Long timestamp();
abstract TimerService timerService();
abstract <X> void output(OutputTag<X> outputTag, X value);
}
abstract class OnTimerContext extends Context {
abstract TimeDomain timeDomain();
}
}
// Keyed process function
abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
abstract class Context {
abstract Long timestamp();
abstract TimerService timerService();
abstract <X> void output(OutputTag<X> outputTag, X value);
abstract K getCurrentKey();
}
abstract class OnTimerContext extends Context {
abstract TimeDomain timeDomain();
}
}
// Co-process function
abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
abstract class Context {
abstract Long timestamp();
abstract TimerService timerService();
abstract <X> void output(OutputTag<X> outputTag, X value);
}
abstract class OnTimerContext extends Context {
abstract TimeDomain timeDomain();
}
}
// Process window function
abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
void clear(Context context) throws Exception;
abstract class Context implements Serializable {
abstract W window();
abstract long currentProcessingTime();
abstract long currentWatermark();
abstract KeyedStateStore windowState();
abstract KeyedStateStore globalState();
abstract <X> void output(OutputTag<X> outputTag, X value);
}
}// Timer service
interface TimerService {
long currentProcessingTime();
long currentWatermark();
void registerProcessingTimeTimer(long time);
void registerEventTimeTimer(long time);
void deleteProcessingTimeTimer(long time);
void deleteEventTimeTimer(long time);
}
// Time domain
enum TimeDomain {
EVENT_TIME,
PROCESSING_TIME
}
// Output tag for side outputs
class OutputTag<T> {
public OutputTag(String id);
public OutputTag(String id, TypeInformation<T> typeInfo);
String getId();
TypeInformation<T> getTypeInfo();
}
// Collector interface
interface Collector<T> {
void collect(T record);
void close();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-java-2-11