Apache Flink connector for NiFi that enables streaming data integration between Flink and Apache NiFi using the Site-to-Site protocol
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-nifi-2-10@1.3.00
# Flink NiFi Connector
1
2
Apache Flink connector for NiFi that enables streaming data integration between Flink and Apache NiFi using the Site-to-Site protocol. This connector provides both source and sink components for bidirectional data transfer between Flink streaming jobs and NiFi dataflows.
3
4
## Package Information
5
6
- **Package Name**: flink-connector-nifi_2.10
7
- **Group ID**: org.apache.flink
8
- **Language**: Java
9
- **Maven Dependency**:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-connector-nifi_2.10</artifactId>
14
<version>1.3.3</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```java
21
import org.apache.flink.streaming.connectors.nifi.NiFiSource;
22
import org.apache.flink.streaming.connectors.nifi.NiFiSink;
23
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
24
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
25
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
26
import org.apache.nifi.remote.client.SiteToSiteClient;
27
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
28
import org.apache.flink.api.common.functions.RuntimeContext;
29
import org.apache.flink.api.common.functions.StoppableFunction;
30
import org.apache.flink.configuration.Configuration;
31
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
32
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
33
import org.apache.flink.configuration.ConfigConstants;
34
import java.nio.charset.Charset;
35
import java.util.HashMap;
36
import java.util.Map;
37
import java.util.concurrent.TimeUnit;
38
```
39
40
## Basic Usage
41
42
### NiFi Source Example
43
44
```java
45
import org.apache.flink.streaming.api.datastream.DataStream;
46
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
47
import org.apache.flink.streaming.connectors.nifi.NiFiSource;
48
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
49
import org.apache.nifi.remote.client.SiteToSiteClient;
50
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
51
52
// Create Flink streaming environment
53
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
54
55
// Configure NiFi Site-to-Site client
56
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
57
.url("http://localhost:8080/nifi")
58
.portName("Data for Flink")
59
.requestBatchCount(5)
60
.buildConfig();
61
62
// Create NiFi source
63
NiFiSource nifiSource = new NiFiSource(clientConfig);
64
DataStream<NiFiDataPacket> sourceStream = env.addSource(nifiSource).setParallelism(2);
65
66
// Process NiFi data packets
67
DataStream<String> processedStream = sourceStream.map(packet -> {
68
return new String(packet.getContent(), Charset.defaultCharset());
69
});
70
```
71
72
### NiFi Sink Example
73
74
```java
75
import org.apache.flink.streaming.connectors.nifi.NiFiSink;
76
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
77
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
78
import org.apache.flink.api.common.functions.RuntimeContext;
79
import org.apache.flink.configuration.ConfigConstants;
80
import java.util.HashMap;
81
82
// Configure NiFi Site-to-Site client
83
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
84
.url("http://localhost:8080/nifi")
85
.portName("Data from Flink")
86
.buildConfig();
87
88
// Create custom data packet builder
89
NiFiDataPacketBuilder<String> builder = new NiFiDataPacketBuilder<String>() {
90
@Override
91
public NiFiDataPacket createNiFiDataPacket(String data, RuntimeContext ctx) {
92
return new StandardNiFiDataPacket(
93
data.getBytes(ConfigConstants.DEFAULT_CHARSET),
94
new HashMap<String, String>()
95
);
96
}
97
};
98
99
// Add sink to data stream
100
DataStream<String> dataStream = env.fromElements("one", "two", "three");
101
dataStream.addSink(new NiFiSink<>(clientConfig, builder));
102
```
103
104
## Architecture
105
106
The connector follows Flink's standard source/sink pattern and integrates with NiFi's Site-to-Site protocol:
107
108
- **NiFiSource**: Pulls data from NiFi output ports using Site-to-Site client
109
- **NiFiSink**: Sends data to NiFi input ports via Site-to-Site protocol
110
- **NiFiDataPacket**: Encapsulates NiFi FlowFile content and attributes
111
- **NiFiDataPacketBuilder**: Transforms Flink data into NiFi-compatible format
112
- **Site-to-Site Integration**: Uses NiFi's native remote protocol for reliable data transfer
113
114
## Capabilities
115
116
### NiFi Data Source
117
118
Streams data from Apache NiFi into Flink using the Site-to-Site protocol. The source function connects to NiFi output ports and pulls data packets containing FlowFile content and attributes.
119
120
```java { .api }
121
public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> implements StoppableFunction {
122
123
private static final long serialVersionUID = 1L;
124
private static final long DEFAULT_WAIT_TIME_MS = 1000;
125
private final SiteToSiteClientConfig clientConfig;
126
private final long waitTimeMs;
127
private transient SiteToSiteClient client;
128
private volatile boolean isRunning = true;
129
130
/**
131
* Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms.
132
*
133
* @param clientConfig the configuration for building a NiFi SiteToSiteClient
134
*/
135
public NiFiSource(SiteToSiteClientConfig clientConfig);
136
137
/**
138
* Constructs a new NiFiSource using the given client config and wait time.
139
*
140
* @param clientConfig the configuration for building a NiFi SiteToSiteClient
141
* @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available to pull from NiFi
142
*/
143
public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs);
144
145
@Override
146
public void open(Configuration parameters) throws Exception;
147
148
@Override
149
public void run(SourceContext<NiFiDataPacket> ctx) throws Exception;
150
151
@Override
152
public void cancel();
153
154
@Override
155
public void close() throws Exception;
156
157
@Override
158
public void stop();
159
}
160
```
161
162
### NiFi Data Sink
163
164
Sends data from Flink to Apache NiFi using the Site-to-Site protocol. The sink function connects to NiFi input ports and sends data packets created by a custom builder.
165
166
```java { .api }
167
public class NiFiSink<T> extends RichSinkFunction<T> {
168
169
private SiteToSiteClient client;
170
private SiteToSiteClientConfig clientConfig;
171
private NiFiDataPacketBuilder<T> builder;
172
173
/**
174
* Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.
175
*
176
* @param clientConfig the configuration for building a NiFi SiteToSiteClient
177
* @param builder a builder to produce NiFiDataPackets from incoming data
178
*/
179
public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder);
180
181
@Override
182
public void open(Configuration parameters) throws Exception;
183
184
@Override
185
public void invoke(T value) throws Exception;
186
187
@Override
188
public void close() throws Exception;
189
}
190
```
191
192
### Data Packet Interface
193
194
Represents a NiFi FlowFile with content and attributes, providing access to both the data payload and metadata associated with the FlowFile.
195
196
```java { .api }
197
public interface NiFiDataPacket {
198
199
/**
200
* @return the contents of a NiFi FlowFile
201
*/
202
byte[] getContent();
203
204
/**
205
* @return a Map of attributes that are associated with the NiFi FlowFile
206
*/
207
Map<String, String> getAttributes();
208
}
209
```
210
211
### Standard Data Packet Implementation
212
213
Concrete implementation of NiFiDataPacket that stores FlowFile content as a byte array and attributes as a string map.
214
215
```java { .api }
216
public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
217
218
private static final long serialVersionUID = 6364005260220243322L;
219
private final byte[] content;
220
private final Map<String, String> attributes;
221
222
public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes);
223
224
@Override
225
public byte[] getContent();
226
227
@Override
228
public Map<String, String> getAttributes();
229
}
230
```
231
232
### Data Packet Builder Interface
233
234
Functional interface for transforming Flink data into NiFiDataPacket objects. Custom implementations define how to convert stream elements into NiFi-compatible data packets.
235
236
```java { .api }
237
public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
238
239
NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
240
}
241
```
242
243
## Configuration
244
245
### Site-to-Site Client Configuration
246
247
The connector uses NiFi's SiteToSiteClientConfig for connection settings:
248
249
```java
250
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
251
.url("http://nifi-host:8080/nifi") // NiFi URL
252
.portName("Data Port Name") // Port name in NiFi
253
.requestBatchCount(10) // Batch size for requests
254
.timeout(30, TimeUnit.SECONDS) // Connection timeout
255
.buildConfig();
256
```
257
258
### Source Configuration Options
259
260
- **clientConfig**: Required SiteToSiteClientConfig for NiFi connection
261
- **waitTimeMs**: Optional wait time between polling attempts (default: 1000ms)
262
263
### Sink Configuration Options
264
265
- **clientConfig**: Required SiteToSiteClientConfig for NiFi connection
266
- **builder**: Required NiFiDataPacketBuilder for data transformation
267
268
## Error Handling
269
270
- **Transaction Failures**: Source handles null transactions by waiting and retrying
271
- **Connection Issues**: Both source and sink throw exceptions for connection failures
272
- **Data Validation**: Sink validates data packet creation before sending to NiFi
273
- **Resource Cleanup**: Both components properly close Site-to-Site clients in close() methods