A deprecated Apache Flink connector that enables integration with Apache NiFi through the NiFi Site-to-Site protocol for bidirectional streaming data exchange.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-nifi@1.15.00
# Apache Flink NiFi Connector
1
2
Apache Flink NiFi Connector provides integration between Apache Flink and Apache NiFi through the NiFi Site-to-Site protocol. This connector enables bidirectional streaming data exchange between Flink applications and NiFi clusters, supporting both data ingestion (source) and data publishing (sink) operations.
3
4
**Status**: This connector is deprecated and will be removed in a future Flink release.
5
6
## Package Information
7
8
- **Package Name**: flink-connector-nifi
9
- **Package Type**: maven
10
- **Language**: Java
11
- **Installation**:
12
```xml
13
<dependency>
14
<groupId>org.apache.flink</groupId>
15
<artifactId>flink-connector-nifi</artifactId>
16
<version>1.15.4</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```java
23
import org.apache.flink.streaming.connectors.nifi.NiFiSource;
24
import org.apache.flink.streaming.connectors.nifi.NiFiSink;
25
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
26
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
27
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
28
29
// NiFi Site-to-Site client imports
30
import org.apache.nifi.remote.client.SiteToSiteClient;
31
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
32
33
// Additional required imports for examples
34
import java.nio.charset.StandardCharsets;
35
import java.util.HashMap;
36
import java.util.concurrent.TimeUnit;
37
import org.apache.flink.api.common.functions.RuntimeContext;
38
```
39
40
## Basic Usage
41
42
### Source Example - Reading from NiFi
43
44
```java
45
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
46
import org.apache.flink.streaming.api.datastream.DataStream;
47
import org.apache.flink.streaming.connectors.nifi.NiFiSource;
48
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
49
import org.apache.nifi.remote.client.SiteToSiteClient;
50
import java.nio.charset.StandardCharsets;
51
52
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
53
54
// Configure NiFi client
55
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
56
.url("http://localhost:8080/nifi")
57
.portName("Data for Flink")
58
.requestBatchCount(5)
59
.buildConfig();
60
61
// Create source
62
NiFiSource nifiSource = new NiFiSource(clientConfig);
63
DataStream<NiFiDataPacket> stream = env.addSource(nifiSource).setParallelism(2);
64
65
// Process data packets
66
DataStream<String> content = stream.map(packet ->
67
new String(packet.getContent(), StandardCharsets.UTF_8)
68
);
69
70
env.execute("NiFi Source Job");
71
```
72
73
### Sink Example - Writing to NiFi
74
75
```java
76
import org.apache.flink.streaming.connectors.nifi.NiFiSink;
77
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
78
import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
79
import java.nio.charset.StandardCharsets;
80
import java.util.HashMap;
81
82
// Configure NiFi client for sink
83
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
84
.url("http://localhost:8080/nifi")
85
.portName("Data from Flink")
86
.buildConfig();
87
88
// Create data packet builder
89
NiFiDataPacketBuilder<String> builder = (data, ctx) ->
90
new StandardNiFiDataPacket(
91
data.getBytes(StandardCharsets.UTF_8),
92
new HashMap<String, String>()
93
);
94
95
// Create sink
96
NiFiSink<String> nifiSink = new NiFiSink<>(clientConfig, builder);
97
98
// Add to stream
99
DataStream<String> dataStream = env.fromElements("data1", "data2", "data3");
100
dataStream.addSink(nifiSink);
101
102
env.execute("NiFi Sink Job");
103
```
104
105
## Architecture
106
107
The NiFi connector is built around several key components:
108
109
- **NiFi Site-to-Site Protocol**: Uses NiFi's native Site-to-Site client for reliable data transfer
110
- **Transaction Management**: Implements NiFi transaction patterns with confirm/complete cycles for reliability
111
- **Data Packet Abstraction**: Wraps NiFi FlowFiles in NiFiDataPacket interface for Flink processing
112
- **Parallel Processing**: Supports Flink's parallel execution with configurable parallelism
113
- **Builder Pattern**: Uses NiFiDataPacketBuilder for flexible data conversion in sink operations
114
115
## Capabilities
116
117
### Data Source Operations
118
119
Pull data from NiFi output ports using the Site-to-Site protocol.
120
121
```java { .api }
122
/**
123
* A source that pulls data from Apache NiFi using the NiFi Site-to-Site client.
124
* Produces NiFiDataPackets which encapsulate NiFi FlowFile content and attributes.
125
*
126
* @deprecated The NiFi Source has been deprecated and will be removed in a future Flink release.
127
*/
128
@Deprecated
129
public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
130
131
/**
132
* Constructs a new NiFiSource using the given client config and default wait time of 1000ms.
133
*
134
* @param clientConfig the configuration for building a NiFi SiteToSiteClient
135
*/
136
public NiFiSource(SiteToSiteClientConfig clientConfig);
137
138
/**
139
* Constructs a new NiFiSource using the given client config and wait time.
140
*
141
* @param clientConfig the configuration for building a NiFi SiteToSiteClient
142
* @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available
143
*/
144
public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs);
145
146
/**
147
* Opens the source and creates the SiteToSiteClient.
148
*
149
* @param parameters the configuration parameters from Flink
150
* @throws Exception if the SiteToSiteClient cannot be created
151
*/
152
@Override
153
public void open(Configuration parameters) throws Exception;
154
155
/**
156
* Main execution loop for pulling data from NiFi.
157
* Continuously pulls data packets and emits them to the Flink stream.
158
*
159
* @param ctx the source context for collecting data packets
160
* @throws Exception if errors occur during data transfer
161
*/
162
@Override
163
public void run(SourceContext<NiFiDataPacket> ctx) throws Exception;
164
165
/**
166
* Cancels the running source.
167
*/
168
@Override
169
public void cancel();
170
171
/**
172
* Closes the source and client connection.
173
*/
174
@Override
175
public void close() throws Exception;
176
}
177
```
178
179
### Data Sink Operations
180
181
Send data to NiFi input ports using the Site-to-Site protocol.
182
183
```java { .api }
184
/**
185
* A sink that delivers data to Apache NiFi using the NiFi Site-to-Site client.
186
* Requires a NiFiDataPacketBuilder to create NiFiDataPacket instances from incoming data.
187
*
188
* @param <T> the type of input data
189
* @deprecated The NiFi Sink has been deprecated and will be removed in a future Flink release.
190
*/
191
@Deprecated
192
public class NiFiSink<T> extends RichSinkFunction<T> {
193
194
/**
195
* Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.
196
*
197
* @param clientConfig the configuration for building a NiFi SiteToSiteClient
198
* @param builder a builder to produce NiFiDataPackets from incoming data
199
*/
200
public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder);
201
202
/**
203
* Opens the sink and creates the SiteToSiteClient.
204
*
205
* @param parameters the configuration parameters from Flink
206
* @throws Exception if the SiteToSiteClient cannot be created
207
*/
208
@Override
209
public void open(Configuration parameters) throws Exception;
210
211
/**
212
* Processes and sends individual records to NiFi.
213
* Converts input data to NiFiDataPacket using the builder and sends to NiFi.
214
*
215
* @param value the record to send to NiFi
216
* @throws Exception if data cannot be sent or transaction fails
217
*/
218
@Override
219
public void invoke(T value) throws Exception;
220
221
/**
222
* Closes the sink and client connection.
223
*/
224
@Override
225
public void close() throws Exception;
226
}
227
```
228
229
### Data Packet Interface
230
231
Core interface for wrapping NiFi FlowFile data and attributes.
232
233
```java { .api }
234
/**
235
* The NiFiDataPacket provides a packaging around a NiFi FlowFile.
236
* It wraps both a FlowFile's content and its attributes for processing by Flink.
237
*/
238
public interface NiFiDataPacket {
239
240
/**
241
* @return the contents of a NiFi FlowFile as byte array
242
*/
243
byte[] getContent();
244
245
/**
246
* @return a Map of attributes that are associated with the NiFi FlowFile
247
*/
248
Map<String, String> getAttributes();
249
}
250
```
251
252
### Standard Data Packet Implementation
253
254
Standard implementation of the NiFiDataPacket interface.
255
256
```java { .api }
257
/**
258
* An implementation of NiFiDataPacket that stores content and attributes.
259
*/
260
public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
261
262
/**
263
* Constructs a StandardNiFiDataPacket with content and attributes.
264
*
265
* @param content the FlowFile content as byte array
266
* @param attributes the FlowFile attributes as string map
267
*/
268
public StandardNiFiDataPacket(byte[] content, Map<String, String> attributes);
269
270
/**
271
* @return the contents of the FlowFile
272
*/
273
@Override
274
public byte[] getContent();
275
276
/**
277
* @return the attributes of the FlowFile
278
*/
279
@Override
280
public Map<String, String> getAttributes();
281
}
282
```
283
284
### Data Packet Builder
285
286
Builder interface for creating NiFiDataPackets from input data in sink operations.
287
288
```java { .api }
289
/**
290
* A function that can create a NiFiDataPacket from an incoming instance of the given type.
291
* Used by NiFiSink to convert Flink stream data to NiFi FlowFile format.
292
*
293
* @param <T> the type of input data to convert
294
*/
295
public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
296
297
/**
298
* Creates a NiFiDataPacket from input data.
299
*
300
* @param t the input data to convert
301
* @param ctx the Flink runtime context
302
* @return NiFiDataPacket containing the converted data
303
*/
304
NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
305
}
306
```
307
308
## Configuration Requirements
309
310
### Required Dependencies
311
312
```xml
313
<dependency>
314
<groupId>org.apache.nifi</groupId>
315
<artifactId>nifi-site-to-site-client</artifactId>
316
<version>1.14.0</version>
317
</dependency>
318
```
319
320
### Site-to-Site Client Configuration
321
322
```java { .api }
323
/**
324
* Configuration for NiFi Site-to-Site client connection.
325
* Built using SiteToSiteClient.Builder fluent interface.
326
*/
327
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
328
.url("http://localhost:8080/nifi") // NiFi instance URL
329
.portName("Port Name") // Input/Output port name
330
.requestBatchCount(5) // Optional: batch size for requests
331
.timeout(30, TimeUnit.SECONDS) // Optional: connection timeout
332
.buildConfig();
333
334
/**
335
* Builder interface for creating SiteToSiteClient configurations.
336
* Part of the Apache NiFi site-to-site-client library.
337
*/
338
public static class SiteToSiteClient.Builder {
339
public Builder url(String url);
340
public Builder portName(String portName);
341
public Builder requestBatchCount(int count);
342
public Builder timeout(long time, TimeUnit unit);
343
public Builder keystoreFilename(String keystoreFilename);
344
public Builder keystorePassword(String keystorePassword);
345
public Builder truststoreFilename(String truststoreFilename);
346
public Builder truststorePassword(String truststorePassword);
347
public SiteToSiteClient build();
348
public SiteToSiteClientConfig buildConfig();
349
public Builder fromConfig(SiteToSiteClientConfig config);
350
}
351
352
/**
353
* Configuration interface for NiFi Site-to-Site client.
354
* Contains connection parameters and SSL settings.
355
*/
356
public interface SiteToSiteClientConfig {
357
String getUrl();
358
String getPortName();
359
Integer getRequestBatchCount();
360
Long getTimeoutMillis();
361
String getKeystoreFilename();
362
String getKeystorePassword();
363
String getTruststoreFilename();
364
String getTruststorePassword();
365
}
366
```
367
368
### Common Configuration Options
369
370
- **url**: Base URL of the NiFi instance (required)
371
- **portName**: Name of the NiFi input or output port (required)
372
- **requestBatchCount**: Number of FlowFiles to request in each batch (optional, improves performance)
373
- **timeout**: Connection and data transfer timeout (optional)
374
- **keystoreFilename/truststoreFilename**: SSL configuration for secure connections (optional)
375
376
## Error Handling
377
378
### Source Error Handling
379
380
- **Transaction Creation Failures**: Source logs warnings and retries with configured wait time
381
- **No Data Available**: Source waits and retries, does not throw exceptions
382
- **Connection Issues**: Handled by underlying NiFi Site-to-Site client with automatic retries
383
384
### Sink Error Handling
385
386
- **Transaction Creation Failures**: Throws `IllegalStateException` if transaction cannot be created
387
- **Send Failures**: Transaction exceptions propagate to Flink's fault tolerance mechanisms
388
- **Connection Issues**: Handled by underlying NiFi Site-to-Site client
389
390
### Common Exceptions
391
392
```java
393
// Sink operation failures
394
IllegalStateException - Unable to create NiFi transaction for sending data
395
396
// General connection and I/O issues handled by:
397
// - org.apache.nifi.remote.exception.TransmissionException
398
// - java.io.IOException for network connectivity issues
399
```
400
401
## Processing Guarantees
402
403
- **Exactly-once Processing**: Achievable when combined with Flink checkpointing and proper NiFi configuration
404
- **Transaction Safety**: Uses NiFi Site-to-Site transaction confirm/complete pattern for reliability
405
- **Parallel Processing**: Supports Flink parallel execution with independent transactions per task
406
- **Fault Tolerance**: Integrates with Flink's checkpoint and recovery mechanisms
407
408
## Migration Notes
409
410
Since this connector is deprecated, consider these alternatives:
411
412
- **Custom Flink Connectors**: Implement custom source/sink using NiFi REST API or MiNiFi
413
- **Message Queue Integration**: Use Kafka, Pulsar, or other message systems as intermediary
414
- **File-based Integration**: Use shared storage (HDFS, S3) for data exchange
415
- **Direct API Integration**: Use NiFi's REST API for control plane operations
416
417
## Limitations
418
419
- **Deprecated Status**: No new features or active development
420
- **NiFi Version Compatibility**: Tied to specific NiFi site-to-site-client version (1.14.0)
421
- **Configuration Complexity**: Requires proper NiFi port configuration and network connectivity
422
- **Limited Error Visibility**: Some errors may be hidden in NiFi Site-to-Site client logging