Apache Flink connector for NiFi that enables streaming data integration between Flink and Apache NiFi using the Site-to-Site protocol
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Flink connector for NiFi that enables streaming data integration between Flink and Apache NiFi using the Site-to-Site protocol. This connector provides both source and sink components for bidirectional data transfer between Flink streaming jobs and NiFi dataflows.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-nifi_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.streaming.connectors.nifi.NiFiSource;
import org.apache.flink.streaming.connectors.nifi.NiFiSink;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.configuration.ConfigConstants;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.nifi.NiFiSource;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
// Create Flink streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure NiFi Site-to-Site client
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data for Flink")
.requestBatchCount(5)
.buildConfig();
// Create NiFi source
NiFiSource nifiSource = new NiFiSource(clientConfig);
DataStream<NiFiDataPacket> sourceStream = env.addSource(nifiSource).setParallelism(2);
// Process NiFi data packets
DataStream<String> processedStream = sourceStream.map(packet -> {
return new String(packet.getContent(), Charset.defaultCharset());
});import org.apache.flink.streaming.connectors.nifi.NiFiSink;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.ConfigConstants;
import java.util.HashMap;
// Configure NiFi Site-to-Site client
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data from Flink")
.buildConfig();
// Create custom data packet builder
NiFiDataPacketBuilder<String> builder = new NiFiDataPacketBuilder<String>() {
@Override
public NiFiDataPacket createNiFiDataPacket(String data, RuntimeContext ctx) {
return new StandardNiFiDataPacket(
data.getBytes(ConfigConstants.DEFAULT_CHARSET),
new HashMap<String, String>()
);
}
};
// Add sink to data stream
DataStream<String> dataStream = env.fromElements("one", "two", "three");
dataStream.addSink(new NiFiSink<>(clientConfig, builder));The connector follows Flink's standard source/sink pattern and integrates with NiFi's Site-to-Site protocol:
Streams data from Apache NiFi into Flink using the Site-to-Site protocol. The source function connects to NiFi output ports and pulls data packets containing FlowFile content and attributes.
public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> implements StoppableFunction {
private static final long serialVersionUID = 1L;
private static final long DEFAULT_WAIT_TIME_MS = 1000;
private final SiteToSiteClientConfig clientConfig;
private final long waitTimeMs;
private transient SiteToSiteClient client;
private volatile boolean isRunning = true;
/**
* Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms.
*
* @param clientConfig the configuration for building a NiFi SiteToSiteClient
*/
public NiFiSource(SiteToSiteClientConfig clientConfig);
/**
* Constructs a new NiFiSource using the given client config and wait time.
*
* @param clientConfig the configuration for building a NiFi SiteToSiteClient
* @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available to pull from NiFi
*/
public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs);
@Override
public void open(Configuration parameters) throws Exception;
@Override
public void run(SourceContext<NiFiDataPacket> ctx) throws Exception;
@Override
public void cancel();
@Override
public void close() throws Exception;
@Override
public void stop();
}Sends data from Flink to Apache NiFi using the Site-to-Site protocol. The sink function connects to NiFi input ports and sends data packets created by a custom builder.
public class NiFiSink<T> extends RichSinkFunction<T> {
private SiteToSiteClient client;
private SiteToSiteClientConfig clientConfig;
private NiFiDataPacketBuilder<T> builder;
/**
* Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.
*
* @param clientConfig the configuration for building a NiFi SiteToSiteClient
* @param builder a builder to produce NiFiDataPackets from incoming data
*/
public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder);
@Override
public void open(Configuration parameters) throws Exception;
@Override
public void invoke(T value) throws Exception;
@Override
public void close() throws Exception;
}Represents a NiFi FlowFile with content and attributes, providing access to both the data payload and metadata associated with the FlowFile.
public interface NiFiDataPacket {
/**
* @return the contents of a NiFi FlowFile
*/
byte[] getContent();
/**
* @return a Map of attributes that are associated with the NiFi FlowFile
*/
Map<String, String> getAttributes();
}Concrete implementation of NiFiDataPacket that stores FlowFile content as a byte array and attributes as a string map.
public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
private static final long serialVersionUID = 6364005260220243322L;
private final byte[] content;
private final Map<String, String> attributes;
public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes);
@Override
public byte[] getContent();
@Override
public Map<String, String> getAttributes();
}Functional interface for transforming Flink data into NiFiDataPacket objects. Custom implementations define how to convert stream elements into NiFi-compatible data packets.
public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
}The connector uses NiFi's SiteToSiteClientConfig for connection settings:
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://nifi-host:8080/nifi") // NiFi URL
.portName("Data Port Name") // Port name in NiFi
.requestBatchCount(10) // Batch size for requests
.timeout(30, TimeUnit.SECONDS) // Connection timeout
.buildConfig();