CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-nifi

A deprecated Apache Flink connector that enables integration with Apache NiFi through the NiFi Site-to-Site protocol for bidirectional streaming data exchange.

Pending
Overview
Eval results
Files

Apache Flink NiFi Connector

Apache Flink NiFi Connector provides integration between Apache Flink and Apache NiFi through the NiFi Site-to-Site protocol. This connector enables bidirectional streaming data exchange between Flink applications and NiFi clusters, supporting both data ingestion (source) and data publishing (sink) operations.

Status: This connector is deprecated and will be removed in a future Flink release.

Package Information

  • Package Name: flink-connector-nifi
  • Package Type: maven
  • Language: Java
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-nifi</artifactId>
      <version>1.15.4</version>
    </dependency>

Core Imports

import org.apache.flink.streaming.connectors.nifi.NiFiSource;
import org.apache.flink.streaming.connectors.nifi.NiFiSink;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;

// NiFi Site-to-Site client imports
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;

// Additional required imports for examples
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.RuntimeContext;

Basic Usage

Source Example - Reading from NiFi

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.nifi.NiFiSource;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
import org.apache.nifi.remote.client.SiteToSiteClient;
import java.nio.charset.StandardCharsets;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure NiFi client
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
    .url("http://localhost:8080/nifi")
    .portName("Data for Flink")
    .requestBatchCount(5)
    .buildConfig();

// Create source
NiFiSource nifiSource = new NiFiSource(clientConfig);
DataStream<NiFiDataPacket> stream = env.addSource(nifiSource).setParallelism(2);

// Process data packets
DataStream<String> content = stream.map(packet -> 
    new String(packet.getContent(), StandardCharsets.UTF_8)
);

env.execute("NiFi Source Job");

Sink Example - Writing to NiFi

import org.apache.flink.streaming.connectors.nifi.NiFiSink;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;

// Configure NiFi client for sink
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
    .url("http://localhost:8080/nifi")
    .portName("Data from Flink")
    .buildConfig();

// Create data packet builder
NiFiDataPacketBuilder<String> builder = (data, ctx) -> 
    new StandardNiFiDataPacket(
        data.getBytes(StandardCharsets.UTF_8), 
        new HashMap<String, String>()
    );

// Create sink
NiFiSink<String> nifiSink = new NiFiSink<>(clientConfig, builder);

// Add to stream
DataStream<String> dataStream = env.fromElements("data1", "data2", "data3");
dataStream.addSink(nifiSink);

env.execute("NiFi Sink Job");

Architecture

The NiFi connector is built around several key components:

  • NiFi Site-to-Site Protocol: Uses NiFi's native Site-to-Site client for reliable data transfer
  • Transaction Management: Implements NiFi transaction patterns with confirm/complete cycles for reliability
  • Data Packet Abstraction: Wraps NiFi FlowFiles in NiFiDataPacket interface for Flink processing
  • Parallel Processing: Supports Flink's parallel execution with configurable parallelism
  • Builder Pattern: Uses NiFiDataPacketBuilder for flexible data conversion in sink operations

Capabilities

Data Source Operations

Pull data from NiFi output ports using the Site-to-Site protocol.

/**
 * A source that pulls data from Apache NiFi using the NiFi Site-to-Site client.
 * Produces NiFiDataPackets which encapsulate NiFi FlowFile content and attributes.
 * 
 * @deprecated The NiFi Source has been deprecated and will be removed in a future Flink release.
 */
@Deprecated
public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
    
    /**
     * Constructs a new NiFiSource using the given client config and default wait time of 1000ms.
     * 
     * @param clientConfig the configuration for building a NiFi SiteToSiteClient
     */
    public NiFiSource(SiteToSiteClientConfig clientConfig);
    
    /**
     * Constructs a new NiFiSource using the given client config and wait time.
     * 
     * @param clientConfig the configuration for building a NiFi SiteToSiteClient
     * @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available
     */
    public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs);
    
    /**
     * Opens the source and creates the SiteToSiteClient.
     * 
     * @param parameters the configuration parameters from Flink
     * @throws Exception if the SiteToSiteClient cannot be created
     */
    @Override
    public void open(Configuration parameters) throws Exception;
    
    /**
     * Main execution loop for pulling data from NiFi.
     * Continuously pulls data packets and emits them to the Flink stream.
     * 
     * @param ctx the source context for collecting data packets
     * @throws Exception if errors occur during data transfer
     */
    @Override
    public void run(SourceContext<NiFiDataPacket> ctx) throws Exception;
    
    /**
     * Cancels the running source.
     */
    @Override
    public void cancel();
    
    /**
     * Closes the source and client connection.
     */
    @Override
    public void close() throws Exception;
}

Data Sink Operations

Send data to NiFi input ports using the Site-to-Site protocol.

/**
 * A sink that delivers data to Apache NiFi using the NiFi Site-to-Site client.
 * Requires a NiFiDataPacketBuilder to create NiFiDataPacket instances from incoming data.
 * 
 * @param <T> the type of input data
 * @deprecated The NiFi Sink has been deprecated and will be removed in a future Flink release.
 */
@Deprecated
public class NiFiSink<T> extends RichSinkFunction<T> {
    
    /**
     * Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.
     * 
     * @param clientConfig the configuration for building a NiFi SiteToSiteClient
     * @param builder a builder to produce NiFiDataPackets from incoming data
     */
    public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder);
    
    /**
     * Opens the sink and creates the SiteToSiteClient.
     * 
     * @param parameters the configuration parameters from Flink
     * @throws Exception if the SiteToSiteClient cannot be created
     */
    @Override
    public void open(Configuration parameters) throws Exception;
    
    /**
     * Processes and sends individual records to NiFi.
     * Converts input data to NiFiDataPacket using the builder and sends to NiFi.
     * 
     * @param value the record to send to NiFi
     * @throws Exception if data cannot be sent or transaction fails
     */
    @Override
    public void invoke(T value) throws Exception;
    
    /**
     * Closes the sink and client connection.
     */
    @Override
    public void close() throws Exception;
}

Data Packet Interface

Core interface for wrapping NiFi FlowFile data and attributes.

/**
 * The NiFiDataPacket provides a packaging around a NiFi FlowFile.
 * It wraps both a FlowFile's content and its attributes for processing by Flink.
 */
public interface NiFiDataPacket {
    
    /**
     * @return the contents of a NiFi FlowFile as byte array
     */
    byte[] getContent();
    
    /**
     * @return a Map of attributes that are associated with the NiFi FlowFile
     */
    Map<String, String> getAttributes();
}

Standard Data Packet Implementation

Standard implementation of the NiFiDataPacket interface.

/**
 * An implementation of NiFiDataPacket that stores content and attributes.
 */
public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
    
    /**
     * Constructs a StandardNiFiDataPacket with content and attributes.
     * 
     * @param content the FlowFile content as byte array
     * @param attributes the FlowFile attributes as string map
     */
    public StandardNiFiDataPacket(byte[] content, Map<String, String> attributes);
    
    /**
     * @return the contents of the FlowFile
     */
    @Override
    public byte[] getContent();
    
    /**
     * @return the attributes of the FlowFile
     */
    @Override
    public Map<String, String> getAttributes();
}

Data Packet Builder

Builder interface for creating NiFiDataPackets from input data in sink operations.

/**
 * A function that can create a NiFiDataPacket from an incoming instance of the given type.
 * Used by NiFiSink to convert Flink stream data to NiFi FlowFile format.
 * 
 * @param <T> the type of input data to convert
 */
public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
    
    /**
     * Creates a NiFiDataPacket from input data.
     * 
     * @param t the input data to convert
     * @param ctx the Flink runtime context
     * @return NiFiDataPacket containing the converted data
     */
    NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
}

Configuration Requirements

Required Dependencies

<dependency>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-site-to-site-client</artifactId>
    <version>1.14.0</version>
</dependency>

Site-to-Site Client Configuration

/**
 * Configuration for NiFi Site-to-Site client connection.
 * Built using SiteToSiteClient.Builder fluent interface.
 */
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
    .url("http://localhost:8080/nifi")          // NiFi instance URL
    .portName("Port Name")                       // Input/Output port name
    .requestBatchCount(5)                        // Optional: batch size for requests
    .timeout(30, TimeUnit.SECONDS)               // Optional: connection timeout
    .buildConfig();

/**
 * Builder interface for creating SiteToSiteClient configurations.
 * Part of the Apache NiFi site-to-site-client library.
 */
public static class SiteToSiteClient.Builder {
    public Builder url(String url);
    public Builder portName(String portName);
    public Builder requestBatchCount(int count);
    public Builder timeout(long time, TimeUnit unit);
    public Builder keystoreFilename(String keystoreFilename);
    public Builder keystorePassword(String keystorePassword);
    public Builder truststoreFilename(String truststoreFilename);
    public Builder truststorePassword(String truststorePassword);
    public SiteToSiteClient build();
    public SiteToSiteClientConfig buildConfig();
    public Builder fromConfig(SiteToSiteClientConfig config);
}

/**
 * Configuration interface for NiFi Site-to-Site client.
 * Contains connection parameters and SSL settings.
 */
public interface SiteToSiteClientConfig {
    String getUrl();
    String getPortName();
    Integer getRequestBatchCount();
    Long getTimeoutMillis();
    String getKeystoreFilename();
    String getKeystorePassword();
    String getTruststoreFilename();
    String getTruststorePassword();
}

Common Configuration Options

  • url: Base URL of the NiFi instance (required)
  • portName: Name of the NiFi input or output port (required)
  • requestBatchCount: Number of FlowFiles to request in each batch (optional, improves performance)
  • timeout: Connection and data transfer timeout (optional)
  • keystoreFilename/truststoreFilename: SSL configuration for secure connections (optional)

Error Handling

Source Error Handling

  • Transaction Creation Failures: Source logs warnings and retries with configured wait time
  • No Data Available: Source waits and retries, does not throw exceptions
  • Connection Issues: Handled by underlying NiFi Site-to-Site client with automatic retries

Sink Error Handling

  • Transaction Creation Failures: Throws IllegalStateException if transaction cannot be created
  • Send Failures: Transaction exceptions propagate to Flink's fault tolerance mechanisms
  • Connection Issues: Handled by underlying NiFi Site-to-Site client

Common Exceptions

// Sink operation failures
IllegalStateException - Unable to create NiFi transaction for sending data

// General connection and I/O issues handled by:
// - org.apache.nifi.remote.exception.TransmissionException
// - java.io.IOException for network connectivity issues

Processing Guarantees

  • Exactly-once Processing: Achievable when combined with Flink checkpointing and proper NiFi configuration
  • Transaction Safety: Uses NiFi Site-to-Site transaction confirm/complete pattern for reliability
  • Parallel Processing: Supports Flink parallel execution with independent transactions per task
  • Fault Tolerance: Integrates with Flink's checkpoint and recovery mechanisms

Migration Notes

Since this connector is deprecated, consider these alternatives:

  • Custom Flink Connectors: Implement custom source/sink using NiFi REST API or MiNiFi
  • Message Queue Integration: Use Kafka, Pulsar, or other message systems as intermediary
  • File-based Integration: Use shared storage (HDFS, S3) for data exchange
  • Direct API Integration: Use NiFi's REST API for control plane operations

Limitations

  • Deprecated Status: No new features or active development
  • NiFi Version Compatibility: Tied to specific NiFi site-to-site-client version (1.14.0)
  • Configuration Complexity: Requires proper NiFi port configuration and network connectivity
  • Limited Error Visibility: Some errors may be hidden in NiFi Site-to-Site client logging

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-nifi
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-nifi@1.15.x
Badge
tessl/maven-org-apache-flink--flink-connector-nifi badge