A deprecated Apache Flink connector that enables integration with Apache NiFi through the NiFi Site-to-Site protocol for bidirectional streaming data exchange.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-nifi@1.15.0Apache Flink NiFi Connector provides integration between Apache Flink and Apache NiFi through the NiFi Site-to-Site protocol. This connector enables bidirectional streaming data exchange between Flink applications and NiFi clusters, supporting both data ingestion (source) and data publishing (sink) operations.
Status: This connector is deprecated and will be removed in a future Flink release.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-nifi</artifactId>
<version>1.15.4</version>
</dependency>import org.apache.flink.streaming.connectors.nifi.NiFiSource;
import org.apache.flink.streaming.connectors.nifi.NiFiSink;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
// NiFi Site-to-Site client imports
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
// Additional required imports for examples
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.RuntimeContext;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.nifi.NiFiSource;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
import org.apache.nifi.remote.client.SiteToSiteClient;
import java.nio.charset.StandardCharsets;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure NiFi client
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data for Flink")
.requestBatchCount(5)
.buildConfig();
// Create source
NiFiSource nifiSource = new NiFiSource(clientConfig);
DataStream<NiFiDataPacket> stream = env.addSource(nifiSource).setParallelism(2);
// Process data packets
DataStream<String> content = stream.map(packet ->
new String(packet.getContent(), StandardCharsets.UTF_8)
);
env.execute("NiFi Source Job");import org.apache.flink.streaming.connectors.nifi.NiFiSink;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
// Configure NiFi client for sink
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data from Flink")
.buildConfig();
// Create data packet builder
NiFiDataPacketBuilder<String> builder = (data, ctx) ->
new StandardNiFiDataPacket(
data.getBytes(StandardCharsets.UTF_8),
new HashMap<String, String>()
);
// Create sink
NiFiSink<String> nifiSink = new NiFiSink<>(clientConfig, builder);
// Add to stream
DataStream<String> dataStream = env.fromElements("data1", "data2", "data3");
dataStream.addSink(nifiSink);
env.execute("NiFi Sink Job");The NiFi connector is built around several key components:
Pull data from NiFi output ports using the Site-to-Site protocol.
/**
* A source that pulls data from Apache NiFi using the NiFi Site-to-Site client.
* Produces NiFiDataPackets which encapsulate NiFi FlowFile content and attributes.
*
* @deprecated The NiFi Source has been deprecated and will be removed in a future Flink release.
*/
@Deprecated
public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
/**
* Constructs a new NiFiSource using the given client config and default wait time of 1000ms.
*
* @param clientConfig the configuration for building a NiFi SiteToSiteClient
*/
public NiFiSource(SiteToSiteClientConfig clientConfig);
/**
* Constructs a new NiFiSource using the given client config and wait time.
*
* @param clientConfig the configuration for building a NiFi SiteToSiteClient
* @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available
*/
public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs);
/**
* Opens the source and creates the SiteToSiteClient.
*
* @param parameters the configuration parameters from Flink
* @throws Exception if the SiteToSiteClient cannot be created
*/
@Override
public void open(Configuration parameters) throws Exception;
/**
* Main execution loop for pulling data from NiFi.
* Continuously pulls data packets and emits them to the Flink stream.
*
* @param ctx the source context for collecting data packets
* @throws Exception if errors occur during data transfer
*/
@Override
public void run(SourceContext<NiFiDataPacket> ctx) throws Exception;
/**
* Cancels the running source.
*/
@Override
public void cancel();
/**
* Closes the source and client connection.
*/
@Override
public void close() throws Exception;
}Send data to NiFi input ports using the Site-to-Site protocol.
/**
* A sink that delivers data to Apache NiFi using the NiFi Site-to-Site client.
* Requires a NiFiDataPacketBuilder to create NiFiDataPacket instances from incoming data.
*
* @param <T> the type of input data
* @deprecated The NiFi Sink has been deprecated and will be removed in a future Flink release.
*/
@Deprecated
public class NiFiSink<T> extends RichSinkFunction<T> {
/**
* Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.
*
* @param clientConfig the configuration for building a NiFi SiteToSiteClient
* @param builder a builder to produce NiFiDataPackets from incoming data
*/
public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder);
/**
* Opens the sink and creates the SiteToSiteClient.
*
* @param parameters the configuration parameters from Flink
* @throws Exception if the SiteToSiteClient cannot be created
*/
@Override
public void open(Configuration parameters) throws Exception;
/**
* Processes and sends individual records to NiFi.
* Converts input data to NiFiDataPacket using the builder and sends to NiFi.
*
* @param value the record to send to NiFi
* @throws Exception if data cannot be sent or transaction fails
*/
@Override
public void invoke(T value) throws Exception;
/**
* Closes the sink and client connection.
*/
@Override
public void close() throws Exception;
}Core interface for wrapping NiFi FlowFile data and attributes.
/**
* The NiFiDataPacket provides a packaging around a NiFi FlowFile.
* It wraps both a FlowFile's content and its attributes for processing by Flink.
*/
public interface NiFiDataPacket {
/**
* @return the contents of a NiFi FlowFile as byte array
*/
byte[] getContent();
/**
* @return a Map of attributes that are associated with the NiFi FlowFile
*/
Map<String, String> getAttributes();
}Standard implementation of the NiFiDataPacket interface.
/**
* An implementation of NiFiDataPacket that stores content and attributes.
*/
public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
/**
* Constructs a StandardNiFiDataPacket with content and attributes.
*
* @param content the FlowFile content as byte array
* @param attributes the FlowFile attributes as string map
*/
public StandardNiFiDataPacket(byte[] content, Map<String, String> attributes);
/**
* @return the contents of the FlowFile
*/
@Override
public byte[] getContent();
/**
* @return the attributes of the FlowFile
*/
@Override
public Map<String, String> getAttributes();
}Builder interface for creating NiFiDataPackets from input data in sink operations.
/**
* A function that can create a NiFiDataPacket from an incoming instance of the given type.
* Used by NiFiSink to convert Flink stream data to NiFi FlowFile format.
*
* @param <T> the type of input data to convert
*/
public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
/**
* Creates a NiFiDataPacket from input data.
*
* @param t the input data to convert
* @param ctx the Flink runtime context
* @return NiFiDataPacket containing the converted data
*/
NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
}<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-client</artifactId>
<version>1.14.0</version>
</dependency>/**
* Configuration for NiFi Site-to-Site client connection.
* Built using SiteToSiteClient.Builder fluent interface.
*/
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi") // NiFi instance URL
.portName("Port Name") // Input/Output port name
.requestBatchCount(5) // Optional: batch size for requests
.timeout(30, TimeUnit.SECONDS) // Optional: connection timeout
.buildConfig();
/**
* Builder interface for creating SiteToSiteClient configurations.
* Part of the Apache NiFi site-to-site-client library.
*/
public static class SiteToSiteClient.Builder {
public Builder url(String url);
public Builder portName(String portName);
public Builder requestBatchCount(int count);
public Builder timeout(long time, TimeUnit unit);
public Builder keystoreFilename(String keystoreFilename);
public Builder keystorePassword(String keystorePassword);
public Builder truststoreFilename(String truststoreFilename);
public Builder truststorePassword(String truststorePassword);
public SiteToSiteClient build();
public SiteToSiteClientConfig buildConfig();
public Builder fromConfig(SiteToSiteClientConfig config);
}
/**
* Configuration interface for NiFi Site-to-Site client.
* Contains connection parameters and SSL settings.
*/
public interface SiteToSiteClientConfig {
String getUrl();
String getPortName();
Integer getRequestBatchCount();
Long getTimeoutMillis();
String getKeystoreFilename();
String getKeystorePassword();
String getTruststoreFilename();
String getTruststorePassword();
}IllegalStateException if transaction cannot be created// Sink operation failures
IllegalStateException - Unable to create NiFi transaction for sending data
// General connection and I/O issues handled by:
// - org.apache.nifi.remote.exception.TransmissionException
// - java.io.IOException for network connectivity issuesSince this connector is deprecated, consider these alternatives: