or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-flink--flink-connector-nifi-2-10

Apache Flink connector for NiFi that enables streaming data integration between Flink and Apache NiFi using the Site-to-Site protocol

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-nifi_2.10@1.3.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-nifi-2-10@1.3.0

index.mddocs/

Flink NiFi Connector

Apache Flink connector for NiFi that enables streaming data integration between Flink and Apache NiFi using the Site-to-Site protocol. This connector provides both source and sink components for bidirectional data transfer between Flink streaming jobs and NiFi dataflows.

Package Information

  • Package Name: flink-connector-nifi_2.10
  • Group ID: org.apache.flink
  • Language: Java
  • Maven Dependency:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-nifi_2.10</artifactId>
      <version>1.3.3</version>
    </dependency>

Core Imports

import org.apache.flink.streaming.connectors.nifi.NiFiSource;
import org.apache.flink.streaming.connectors.nifi.NiFiSink;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.configuration.ConfigConstants;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Basic Usage

NiFi Source Example

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.nifi.NiFiSource;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;

// Create Flink streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure NiFi Site-to-Site client
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
    .url("http://localhost:8080/nifi")
    .portName("Data for Flink")
    .requestBatchCount(5)
    .buildConfig();

// Create NiFi source
NiFiSource nifiSource = new NiFiSource(clientConfig);
DataStream<NiFiDataPacket> sourceStream = env.addSource(nifiSource).setParallelism(2);

// Process NiFi data packets
DataStream<String> processedStream = sourceStream.map(packet -> {
    return new String(packet.getContent(), Charset.defaultCharset());
});

NiFi Sink Example

import org.apache.flink.streaming.connectors.nifi.NiFiSink;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.ConfigConstants;
import java.util.HashMap;

// Configure NiFi Site-to-Site client
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
    .url("http://localhost:8080/nifi")
    .portName("Data from Flink")
    .buildConfig();

// Create custom data packet builder
NiFiDataPacketBuilder<String> builder = new NiFiDataPacketBuilder<String>() {
    @Override
    public NiFiDataPacket createNiFiDataPacket(String data, RuntimeContext ctx) {
        return new StandardNiFiDataPacket(
            data.getBytes(ConfigConstants.DEFAULT_CHARSET),
            new HashMap<String, String>()
        );
    }
};

// Add sink to data stream
DataStream<String> dataStream = env.fromElements("one", "two", "three");
dataStream.addSink(new NiFiSink<>(clientConfig, builder));

Architecture

The connector follows Flink's standard source/sink pattern and integrates with NiFi's Site-to-Site protocol:

  • NiFiSource: Pulls data from NiFi output ports using Site-to-Site client
  • NiFiSink: Sends data to NiFi input ports via Site-to-Site protocol
  • NiFiDataPacket: Encapsulates NiFi FlowFile content and attributes
  • NiFiDataPacketBuilder: Transforms Flink data into NiFi-compatible format
  • Site-to-Site Integration: Uses NiFi's native remote protocol for reliable data transfer

Capabilities

NiFi Data Source

Streams data from Apache NiFi into Flink using the Site-to-Site protocol. The source function connects to NiFi output ports and pulls data packets containing FlowFile content and attributes.

public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> implements StoppableFunction {
    
    private static final long serialVersionUID = 1L;
    private static final long DEFAULT_WAIT_TIME_MS = 1000;
    private final SiteToSiteClientConfig clientConfig;
    private final long waitTimeMs;
    private transient SiteToSiteClient client;
    private volatile boolean isRunning = true;
    
    /**
     * Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms.
     *
     * @param clientConfig the configuration for building a NiFi SiteToSiteClient
     */
    public NiFiSource(SiteToSiteClientConfig clientConfig);
    
    /**
     * Constructs a new NiFiSource using the given client config and wait time.
     *
     * @param clientConfig the configuration for building a NiFi SiteToSiteClient
     * @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available to pull from NiFi
     */
    public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs);
    
    @Override
    public void open(Configuration parameters) throws Exception;
    
    @Override
    public void run(SourceContext<NiFiDataPacket> ctx) throws Exception;
    
    @Override
    public void cancel();
    
    @Override
    public void close() throws Exception;
    
    @Override
    public void stop();
}

NiFi Data Sink

Sends data from Flink to Apache NiFi using the Site-to-Site protocol. The sink function connects to NiFi input ports and sends data packets created by a custom builder.

public class NiFiSink<T> extends RichSinkFunction<T> {
    
    private SiteToSiteClient client;
    private SiteToSiteClientConfig clientConfig;
    private NiFiDataPacketBuilder<T> builder;
    
    /**
     * Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.
     *
     * @param clientConfig the configuration for building a NiFi SiteToSiteClient
     * @param builder a builder to produce NiFiDataPackets from incoming data
     */
    public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder);
    
    @Override
    public void open(Configuration parameters) throws Exception;
    
    @Override
    public void invoke(T value) throws Exception;
    
    @Override
    public void close() throws Exception;
}

Data Packet Interface

Represents a NiFi FlowFile with content and attributes, providing access to both the data payload and metadata associated with the FlowFile.

public interface NiFiDataPacket {
    
    /**
     * @return the contents of a NiFi FlowFile
     */
    byte[] getContent();
    
    /**
     * @return a Map of attributes that are associated with the NiFi FlowFile
     */
    Map<String, String> getAttributes();
}

Standard Data Packet Implementation

Concrete implementation of NiFiDataPacket that stores FlowFile content as a byte array and attributes as a string map.

public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
    
    private static final long serialVersionUID = 6364005260220243322L;
    private final byte[] content;
    private final Map<String, String> attributes;
    
    public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes);
    
    @Override
    public byte[] getContent();
    
    @Override
    public Map<String, String> getAttributes();
}

Data Packet Builder Interface

Functional interface for transforming Flink data into NiFiDataPacket objects. Custom implementations define how to convert stream elements into NiFi-compatible data packets.

public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
    
    NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
}

Configuration

Site-to-Site Client Configuration

The connector uses NiFi's SiteToSiteClientConfig for connection settings:

SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
    .url("http://nifi-host:8080/nifi")           // NiFi URL
    .portName("Data Port Name")                   // Port name in NiFi
    .requestBatchCount(10)                        // Batch size for requests
    .timeout(30, TimeUnit.SECONDS)                // Connection timeout
    .buildConfig();

Source Configuration Options

  • clientConfig: Required SiteToSiteClientConfig for NiFi connection
  • waitTimeMs: Optional wait time between polling attempts (default: 1000ms)

Sink Configuration Options

  • clientConfig: Required SiteToSiteClientConfig for NiFi connection
  • builder: Required NiFiDataPacketBuilder for data transformation

Error Handling

  • Transaction Failures: Source handles null transactions by waiting and retrying
  • Connection Issues: Both source and sink throw exceptions for connection failures
  • Data Validation: Sink validates data packet creation before sending to NiFi
  • Resource Cleanup: Both components properly close Site-to-Site clients in close() methods