Apache Flink connector for NiFi that enables streaming data integration between Flink and Apache NiFi using the Site-to-Site protocol
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-nifi-2-10@1.3.0Apache Flink connector for NiFi that enables streaming data integration between Flink and Apache NiFi using the Site-to-Site protocol. This connector provides both source and sink components for bidirectional data transfer between Flink streaming jobs and NiFi dataflows.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-nifi_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.streaming.connectors.nifi.NiFiSource;
import org.apache.flink.streaming.connectors.nifi.NiFiSink;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.configuration.ConfigConstants;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.nifi.NiFiSource;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
// Create Flink streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure NiFi Site-to-Site client
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data for Flink")
.requestBatchCount(5)
.buildConfig();
// Create NiFi source
NiFiSource nifiSource = new NiFiSource(clientConfig);
DataStream<NiFiDataPacket> sourceStream = env.addSource(nifiSource).setParallelism(2);
// Process NiFi data packets
DataStream<String> processedStream = sourceStream.map(packet -> {
return new String(packet.getContent(), Charset.defaultCharset());
});import org.apache.flink.streaming.connectors.nifi.NiFiSink;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.ConfigConstants;
import java.util.HashMap;
// Configure NiFi Site-to-Site client
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data from Flink")
.buildConfig();
// Create custom data packet builder
NiFiDataPacketBuilder<String> builder = new NiFiDataPacketBuilder<String>() {
@Override
public NiFiDataPacket createNiFiDataPacket(String data, RuntimeContext ctx) {
return new StandardNiFiDataPacket(
data.getBytes(ConfigConstants.DEFAULT_CHARSET),
new HashMap<String, String>()
);
}
};
// Add sink to data stream
DataStream<String> dataStream = env.fromElements("one", "two", "three");
dataStream.addSink(new NiFiSink<>(clientConfig, builder));The connector follows Flink's standard source/sink pattern and integrates with NiFi's Site-to-Site protocol:
Streams data from Apache NiFi into Flink using the Site-to-Site protocol. The source function connects to NiFi output ports and pulls data packets containing FlowFile content and attributes.
public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> implements StoppableFunction {
private static final long serialVersionUID = 1L;
private static final long DEFAULT_WAIT_TIME_MS = 1000;
private final SiteToSiteClientConfig clientConfig;
private final long waitTimeMs;
private transient SiteToSiteClient client;
private volatile boolean isRunning = true;
/**
* Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms.
*
* @param clientConfig the configuration for building a NiFi SiteToSiteClient
*/
public NiFiSource(SiteToSiteClientConfig clientConfig);
/**
* Constructs a new NiFiSource using the given client config and wait time.
*
* @param clientConfig the configuration for building a NiFi SiteToSiteClient
* @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available to pull from NiFi
*/
public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs);
@Override
public void open(Configuration parameters) throws Exception;
@Override
public void run(SourceContext<NiFiDataPacket> ctx) throws Exception;
@Override
public void cancel();
@Override
public void close() throws Exception;
@Override
public void stop();
}Sends data from Flink to Apache NiFi using the Site-to-Site protocol. The sink function connects to NiFi input ports and sends data packets created by a custom builder.
public class NiFiSink<T> extends RichSinkFunction<T> {
private SiteToSiteClient client;
private SiteToSiteClientConfig clientConfig;
private NiFiDataPacketBuilder<T> builder;
/**
* Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.
*
* @param clientConfig the configuration for building a NiFi SiteToSiteClient
* @param builder a builder to produce NiFiDataPackets from incoming data
*/
public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder);
@Override
public void open(Configuration parameters) throws Exception;
@Override
public void invoke(T value) throws Exception;
@Override
public void close() throws Exception;
}Represents a NiFi FlowFile with content and attributes, providing access to both the data payload and metadata associated with the FlowFile.
public interface NiFiDataPacket {
/**
* @return the contents of a NiFi FlowFile
*/
byte[] getContent();
/**
* @return a Map of attributes that are associated with the NiFi FlowFile
*/
Map<String, String> getAttributes();
}Concrete implementation of NiFiDataPacket that stores FlowFile content as a byte array and attributes as a string map.
public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
private static final long serialVersionUID = 6364005260220243322L;
private final byte[] content;
private final Map<String, String> attributes;
public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes);
@Override
public byte[] getContent();
@Override
public Map<String, String> getAttributes();
}Functional interface for transforming Flink data into NiFiDataPacket objects. Custom implementations define how to convert stream elements into NiFi-compatible data packets.
public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
}The connector uses NiFi's SiteToSiteClientConfig for connection settings:
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://nifi-host:8080/nifi") // NiFi URL
.portName("Data Port Name") // Port name in NiFi
.requestBatchCount(10) // Batch size for requests
.timeout(30, TimeUnit.SECONDS) // Connection timeout
.buildConfig();