or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-flink--flink-connector-nifi

A deprecated Apache Flink connector that enables integration with Apache NiFi through the NiFi Site-to-Site protocol for bidirectional streaming data exchange.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-nifi@1.15.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-nifi@1.15.0

index.mddocs/

Apache Flink NiFi Connector

Apache Flink NiFi Connector provides integration between Apache Flink and Apache NiFi through the NiFi Site-to-Site protocol. This connector enables bidirectional streaming data exchange between Flink applications and NiFi clusters, supporting both data ingestion (source) and data publishing (sink) operations.

Status: This connector is deprecated and will be removed in a future Flink release.

Package Information

  • Package Name: flink-connector-nifi
  • Package Type: maven
  • Language: Java
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-nifi</artifactId>
      <version>1.15.4</version>
    </dependency>

Core Imports

import org.apache.flink.streaming.connectors.nifi.NiFiSource;
import org.apache.flink.streaming.connectors.nifi.NiFiSink;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;

// NiFi Site-to-Site client imports
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;

// Additional required imports for examples
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.RuntimeContext;

Basic Usage

Source Example - Reading from NiFi

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.nifi.NiFiSource;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
import org.apache.nifi.remote.client.SiteToSiteClient;
import java.nio.charset.StandardCharsets;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure NiFi client
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
    .url("http://localhost:8080/nifi")
    .portName("Data for Flink")
    .requestBatchCount(5)
    .buildConfig();

// Create source
NiFiSource nifiSource = new NiFiSource(clientConfig);
DataStream<NiFiDataPacket> stream = env.addSource(nifiSource).setParallelism(2);

// Process data packets
DataStream<String> content = stream.map(packet -> 
    new String(packet.getContent(), StandardCharsets.UTF_8)
);

env.execute("NiFi Source Job");

Sink Example - Writing to NiFi

import org.apache.flink.streaming.connectors.nifi.NiFiSink;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;

// Configure NiFi client for sink
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
    .url("http://localhost:8080/nifi")
    .portName("Data from Flink")
    .buildConfig();

// Create data packet builder
NiFiDataPacketBuilder<String> builder = (data, ctx) -> 
    new StandardNiFiDataPacket(
        data.getBytes(StandardCharsets.UTF_8), 
        new HashMap<String, String>()
    );

// Create sink
NiFiSink<String> nifiSink = new NiFiSink<>(clientConfig, builder);

// Add to stream
DataStream<String> dataStream = env.fromElements("data1", "data2", "data3");
dataStream.addSink(nifiSink);

env.execute("NiFi Sink Job");

Architecture

The NiFi connector is built around several key components:

  • NiFi Site-to-Site Protocol: Uses NiFi's native Site-to-Site client for reliable data transfer
  • Transaction Management: Implements NiFi transaction patterns with confirm/complete cycles for reliability
  • Data Packet Abstraction: Wraps NiFi FlowFiles in NiFiDataPacket interface for Flink processing
  • Parallel Processing: Supports Flink's parallel execution with configurable parallelism
  • Builder Pattern: Uses NiFiDataPacketBuilder for flexible data conversion in sink operations

Capabilities

Data Source Operations

Pull data from NiFi output ports using the Site-to-Site protocol.

/**
 * A source that pulls data from Apache NiFi using the NiFi Site-to-Site client.
 * Produces NiFiDataPackets which encapsulate NiFi FlowFile content and attributes.
 * 
 * @deprecated The NiFi Source has been deprecated and will be removed in a future Flink release.
 */
@Deprecated
public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
    
    /**
     * Constructs a new NiFiSource using the given client config and default wait time of 1000ms.
     * 
     * @param clientConfig the configuration for building a NiFi SiteToSiteClient
     */
    public NiFiSource(SiteToSiteClientConfig clientConfig);
    
    /**
     * Constructs a new NiFiSource using the given client config and wait time.
     * 
     * @param clientConfig the configuration for building a NiFi SiteToSiteClient
     * @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available
     */
    public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs);
    
    /**
     * Opens the source and creates the SiteToSiteClient.
     * 
     * @param parameters the configuration parameters from Flink
     * @throws Exception if the SiteToSiteClient cannot be created
     */
    @Override
    public void open(Configuration parameters) throws Exception;
    
    /**
     * Main execution loop for pulling data from NiFi.
     * Continuously pulls data packets and emits them to the Flink stream.
     * 
     * @param ctx the source context for collecting data packets
     * @throws Exception if errors occur during data transfer
     */
    @Override
    public void run(SourceContext<NiFiDataPacket> ctx) throws Exception;
    
    /**
     * Cancels the running source.
     */
    @Override
    public void cancel();
    
    /**
     * Closes the source and client connection.
     */
    @Override
    public void close() throws Exception;
}

Data Sink Operations

Send data to NiFi input ports using the Site-to-Site protocol.

/**
 * A sink that delivers data to Apache NiFi using the NiFi Site-to-Site client.
 * Requires a NiFiDataPacketBuilder to create NiFiDataPacket instances from incoming data.
 * 
 * @param <T> the type of input data
 * @deprecated The NiFi Sink has been deprecated and will be removed in a future Flink release.
 */
@Deprecated
public class NiFiSink<T> extends RichSinkFunction<T> {
    
    /**
     * Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.
     * 
     * @param clientConfig the configuration for building a NiFi SiteToSiteClient
     * @param builder a builder to produce NiFiDataPackets from incoming data
     */
    public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder);
    
    /**
     * Opens the sink and creates the SiteToSiteClient.
     * 
     * @param parameters the configuration parameters from Flink
     * @throws Exception if the SiteToSiteClient cannot be created
     */
    @Override
    public void open(Configuration parameters) throws Exception;
    
    /**
     * Processes and sends individual records to NiFi.
     * Converts input data to NiFiDataPacket using the builder and sends to NiFi.
     * 
     * @param value the record to send to NiFi
     * @throws Exception if data cannot be sent or transaction fails
     */
    @Override
    public void invoke(T value) throws Exception;
    
    /**
     * Closes the sink and client connection.
     */
    @Override
    public void close() throws Exception;
}

Data Packet Interface

Core interface for wrapping NiFi FlowFile data and attributes.

/**
 * The NiFiDataPacket provides a packaging around a NiFi FlowFile.
 * It wraps both a FlowFile's content and its attributes for processing by Flink.
 */
public interface NiFiDataPacket {
    
    /**
     * @return the contents of a NiFi FlowFile as byte array
     */
    byte[] getContent();
    
    /**
     * @return a Map of attributes that are associated with the NiFi FlowFile
     */
    Map<String, String> getAttributes();
}

Standard Data Packet Implementation

Standard implementation of the NiFiDataPacket interface.

/**
 * An implementation of NiFiDataPacket that stores content and attributes.
 */
public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
    
    /**
     * Constructs a StandardNiFiDataPacket with content and attributes.
     * 
     * @param content the FlowFile content as byte array
     * @param attributes the FlowFile attributes as string map
     */
    public StandardNiFiDataPacket(byte[] content, Map<String, String> attributes);
    
    /**
     * @return the contents of the FlowFile
     */
    @Override
    public byte[] getContent();
    
    /**
     * @return the attributes of the FlowFile
     */
    @Override
    public Map<String, String> getAttributes();
}

Data Packet Builder

Builder interface for creating NiFiDataPackets from input data in sink operations.

/**
 * A function that can create a NiFiDataPacket from an incoming instance of the given type.
 * Used by NiFiSink to convert Flink stream data to NiFi FlowFile format.
 * 
 * @param <T> the type of input data to convert
 */
public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
    
    /**
     * Creates a NiFiDataPacket from input data.
     * 
     * @param t the input data to convert
     * @param ctx the Flink runtime context
     * @return NiFiDataPacket containing the converted data
     */
    NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
}

Configuration Requirements

Required Dependencies

<dependency>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-site-to-site-client</artifactId>
    <version>1.14.0</version>
</dependency>

Site-to-Site Client Configuration

/**
 * Configuration for NiFi Site-to-Site client connection.
 * Built using SiteToSiteClient.Builder fluent interface.
 */
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
    .url("http://localhost:8080/nifi")          // NiFi instance URL
    .portName("Port Name")                       // Input/Output port name
    .requestBatchCount(5)                        // Optional: batch size for requests
    .timeout(30, TimeUnit.SECONDS)               // Optional: connection timeout
    .buildConfig();

/**
 * Builder interface for creating SiteToSiteClient configurations.
 * Part of the Apache NiFi site-to-site-client library.
 */
public static class SiteToSiteClient.Builder {
    public Builder url(String url);
    public Builder portName(String portName);
    public Builder requestBatchCount(int count);
    public Builder timeout(long time, TimeUnit unit);
    public Builder keystoreFilename(String keystoreFilename);
    public Builder keystorePassword(String keystorePassword);
    public Builder truststoreFilename(String truststoreFilename);
    public Builder truststorePassword(String truststorePassword);
    public SiteToSiteClient build();
    public SiteToSiteClientConfig buildConfig();
    public Builder fromConfig(SiteToSiteClientConfig config);
}

/**
 * Configuration interface for NiFi Site-to-Site client.
 * Contains connection parameters and SSL settings.
 */
public interface SiteToSiteClientConfig {
    String getUrl();
    String getPortName();
    Integer getRequestBatchCount();
    Long getTimeoutMillis();
    String getKeystoreFilename();
    String getKeystorePassword();
    String getTruststoreFilename();
    String getTruststorePassword();
}

Common Configuration Options

  • url: Base URL of the NiFi instance (required)
  • portName: Name of the NiFi input or output port (required)
  • requestBatchCount: Number of FlowFiles to request in each batch (optional, improves performance)
  • timeout: Connection and data transfer timeout (optional)
  • keystoreFilename/truststoreFilename: SSL configuration for secure connections (optional)

Error Handling

Source Error Handling

  • Transaction Creation Failures: Source logs warnings and retries with configured wait time
  • No Data Available: Source waits and retries, does not throw exceptions
  • Connection Issues: Handled by underlying NiFi Site-to-Site client with automatic retries

Sink Error Handling

  • Transaction Creation Failures: Throws IllegalStateException if transaction cannot be created
  • Send Failures: Transaction exceptions propagate to Flink's fault tolerance mechanisms
  • Connection Issues: Handled by underlying NiFi Site-to-Site client

Common Exceptions

// Sink operation failures
IllegalStateException - Unable to create NiFi transaction for sending data

// General connection and I/O issues handled by:
// - org.apache.nifi.remote.exception.TransmissionException
// - java.io.IOException for network connectivity issues

Processing Guarantees

  • Exactly-once Processing: Achievable when combined with Flink checkpointing and proper NiFi configuration
  • Transaction Safety: Uses NiFi Site-to-Site transaction confirm/complete pattern for reliability
  • Parallel Processing: Supports Flink parallel execution with independent transactions per task
  • Fault Tolerance: Integrates with Flink's checkpoint and recovery mechanisms

Migration Notes

Since this connector is deprecated, consider these alternatives:

  • Custom Flink Connectors: Implement custom source/sink using NiFi REST API or MiNiFi
  • Message Queue Integration: Use Kafka, Pulsar, or other message systems as intermediary
  • File-based Integration: Use shared storage (HDFS, S3) for data exchange
  • Direct API Integration: Use NiFi's REST API for control plane operations

Limitations

  • Deprecated Status: No new features or active development
  • NiFi Version Compatibility: Tied to specific NiFi site-to-site-client version (1.14.0)
  • Configuration Complexity: Requires proper NiFi port configuration and network connectivity
  • Limited Error Visibility: Some errors may be hidden in NiFi Site-to-Site client logging