CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-nifi-2-10

Apache Flink connector for NiFi that enables streaming data integration between Flink and Apache NiFi using the Site-to-Site protocol

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

Flink NiFi Connector

Apache Flink connector for NiFi that enables streaming data integration between Flink and Apache NiFi using the Site-to-Site protocol. This connector provides both source and sink components for bidirectional data transfer between Flink streaming jobs and NiFi dataflows.

Package Information

  • Package Name: flink-connector-nifi_2.10
  • Group ID: org.apache.flink
  • Language: Java
  • Maven Dependency:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-nifi_2.10</artifactId>
      <version>1.3.3</version>
    </dependency>

Core Imports

import org.apache.flink.streaming.connectors.nifi.NiFiSource;
import org.apache.flink.streaming.connectors.nifi.NiFiSink;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.configuration.ConfigConstants;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Basic Usage

NiFi Source Example

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.nifi.NiFiSource;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;

// Create Flink streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure NiFi Site-to-Site client
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
    .url("http://localhost:8080/nifi")
    .portName("Data for Flink")
    .requestBatchCount(5)
    .buildConfig();

// Create NiFi source
NiFiSource nifiSource = new NiFiSource(clientConfig);
DataStream<NiFiDataPacket> sourceStream = env.addSource(nifiSource).setParallelism(2);

// Process NiFi data packets
DataStream<String> processedStream = sourceStream.map(packet -> {
    return new String(packet.getContent(), Charset.defaultCharset());
});

NiFi Sink Example

import org.apache.flink.streaming.connectors.nifi.NiFiSink;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.ConfigConstants;
import java.util.HashMap;

// Configure NiFi Site-to-Site client
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
    .url("http://localhost:8080/nifi")
    .portName("Data from Flink")
    .buildConfig();

// Create custom data packet builder
NiFiDataPacketBuilder<String> builder = new NiFiDataPacketBuilder<String>() {
    @Override
    public NiFiDataPacket createNiFiDataPacket(String data, RuntimeContext ctx) {
        return new StandardNiFiDataPacket(
            data.getBytes(ConfigConstants.DEFAULT_CHARSET),
            new HashMap<String, String>()
        );
    }
};

// Add sink to data stream
DataStream<String> dataStream = env.fromElements("one", "two", "three");
dataStream.addSink(new NiFiSink<>(clientConfig, builder));

Architecture

The connector follows Flink's standard source/sink pattern and integrates with NiFi's Site-to-Site protocol:

  • NiFiSource: Pulls data from NiFi output ports using Site-to-Site client
  • NiFiSink: Sends data to NiFi input ports via Site-to-Site protocol
  • NiFiDataPacket: Encapsulates NiFi FlowFile content and attributes
  • NiFiDataPacketBuilder: Transforms Flink data into NiFi-compatible format
  • Site-to-Site Integration: Uses NiFi's native remote protocol for reliable data transfer

Capabilities

NiFi Data Source

Streams data from Apache NiFi into Flink using the Site-to-Site protocol. The source function connects to NiFi output ports and pulls data packets containing FlowFile content and attributes.

public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> implements StoppableFunction {
    
    private static final long serialVersionUID = 1L;
    private static final long DEFAULT_WAIT_TIME_MS = 1000;
    private final SiteToSiteClientConfig clientConfig;
    private final long waitTimeMs;
    private transient SiteToSiteClient client;
    private volatile boolean isRunning = true;
    
    /**
     * Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms.
     *
     * @param clientConfig the configuration for building a NiFi SiteToSiteClient
     */
    public NiFiSource(SiteToSiteClientConfig clientConfig);
    
    /**
     * Constructs a new NiFiSource using the given client config and wait time.
     *
     * @param clientConfig the configuration for building a NiFi SiteToSiteClient
     * @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available to pull from NiFi
     */
    public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs);
    
    @Override
    public void open(Configuration parameters) throws Exception;
    
    @Override
    public void run(SourceContext<NiFiDataPacket> ctx) throws Exception;
    
    @Override
    public void cancel();
    
    @Override
    public void close() throws Exception;
    
    @Override
    public void stop();
}

NiFi Data Sink

Sends data from Flink to Apache NiFi using the Site-to-Site protocol. The sink function connects to NiFi input ports and sends data packets created by a custom builder.

public class NiFiSink<T> extends RichSinkFunction<T> {
    
    private SiteToSiteClient client;
    private SiteToSiteClientConfig clientConfig;
    private NiFiDataPacketBuilder<T> builder;
    
    /**
     * Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.
     *
     * @param clientConfig the configuration for building a NiFi SiteToSiteClient
     * @param builder a builder to produce NiFiDataPackets from incoming data
     */
    public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder);
    
    @Override
    public void open(Configuration parameters) throws Exception;
    
    @Override
    public void invoke(T value) throws Exception;
    
    @Override
    public void close() throws Exception;
}

Data Packet Interface

Represents a NiFi FlowFile with content and attributes, providing access to both the data payload and metadata associated with the FlowFile.

public interface NiFiDataPacket {
    
    /**
     * @return the contents of a NiFi FlowFile
     */
    byte[] getContent();
    
    /**
     * @return a Map of attributes that are associated with the NiFi FlowFile
     */
    Map<String, String> getAttributes();
}

Standard Data Packet Implementation

Concrete implementation of NiFiDataPacket that stores FlowFile content as a byte array and attributes as a string map.

public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
    
    private static final long serialVersionUID = 6364005260220243322L;
    private final byte[] content;
    private final Map<String, String> attributes;
    
    public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes);
    
    @Override
    public byte[] getContent();
    
    @Override
    public Map<String, String> getAttributes();
}

Data Packet Builder Interface

Functional interface for transforming Flink data into NiFiDataPacket objects. Custom implementations define how to convert stream elements into NiFi-compatible data packets.

public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
    
    NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
}

Configuration

Site-to-Site Client Configuration

The connector uses NiFi's SiteToSiteClientConfig for connection settings:

SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
    .url("http://nifi-host:8080/nifi")           // NiFi URL
    .portName("Data Port Name")                   // Port name in NiFi
    .requestBatchCount(10)                        // Batch size for requests
    .timeout(30, TimeUnit.SECONDS)                // Connection timeout
    .buildConfig();

Source Configuration Options

  • clientConfig: Required SiteToSiteClientConfig for NiFi connection
  • waitTimeMs: Optional wait time between polling attempts (default: 1000ms)

Sink Configuration Options

  • clientConfig: Required SiteToSiteClientConfig for NiFi connection
  • builder: Required NiFiDataPacketBuilder for data transformation

Error Handling

  • Transaction Failures: Source handles null transactions by waiting and retrying
  • Connection Issues: Both source and sink throw exceptions for connection failures
  • Data Validation: Sink validates data packet creation before sending to NiFi
  • Resource Cleanup: Both components properly close Site-to-Site clients in close() methods
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-nifi_2.10@1.3.x
Publish Source
CLI
Badge
tessl/maven-org-apache-flink--flink-connector-nifi-2-10 badge