or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-connector-nifi-2-10

Apache Flink connector for NiFi that enables streaming data integration between Flink and Apache NiFi using the Site-to-Site protocol

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-nifi_2.10@1.3.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-nifi-2-10@1.3.0

0

# Flink NiFi Connector

1

2

Apache Flink connector for NiFi that enables streaming data integration between Flink and Apache NiFi using the Site-to-Site protocol. This connector provides both source and sink components for bidirectional data transfer between Flink streaming jobs and NiFi dataflows.

3

4

## Package Information

5

6

- **Package Name**: flink-connector-nifi_2.10

7

- **Group ID**: org.apache.flink

8

- **Language**: Java

9

- **Maven Dependency**:

10

```xml

11

<dependency>

12

<groupId>org.apache.flink</groupId>

13

<artifactId>flink-connector-nifi_2.10</artifactId>

14

<version>1.3.3</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

```java

21

import org.apache.flink.streaming.connectors.nifi.NiFiSource;

22

import org.apache.flink.streaming.connectors.nifi.NiFiSink;

23

import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;

24

import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;

25

import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;

26

import org.apache.nifi.remote.client.SiteToSiteClient;

27

import org.apache.nifi.remote.client.SiteToSiteClientConfig;

28

import org.apache.flink.api.common.functions.RuntimeContext;

29

import org.apache.flink.api.common.functions.StoppableFunction;

30

import org.apache.flink.configuration.Configuration;

31

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

32

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

33

import org.apache.flink.configuration.ConfigConstants;

34

import java.nio.charset.Charset;

35

import java.util.HashMap;

36

import java.util.Map;

37

import java.util.concurrent.TimeUnit;

38

```

39

40

## Basic Usage

41

42

### NiFi Source Example

43

44

```java

45

import org.apache.flink.streaming.api.datastream.DataStream;

46

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

47

import org.apache.flink.streaming.connectors.nifi.NiFiSource;

48

import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;

49

import org.apache.nifi.remote.client.SiteToSiteClient;

50

import org.apache.nifi.remote.client.SiteToSiteClientConfig;

51

52

// Create Flink streaming environment

53

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

54

55

// Configure NiFi Site-to-Site client

56

SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()

57

.url("http://localhost:8080/nifi")

58

.portName("Data for Flink")

59

.requestBatchCount(5)

60

.buildConfig();

61

62

// Create NiFi source

63

NiFiSource nifiSource = new NiFiSource(clientConfig);

64

DataStream<NiFiDataPacket> sourceStream = env.addSource(nifiSource).setParallelism(2);

65

66

// Process NiFi data packets

67

DataStream<String> processedStream = sourceStream.map(packet -> {

68

return new String(packet.getContent(), Charset.defaultCharset());

69

});

70

```

71

72

### NiFi Sink Example

73

74

```java

75

import org.apache.flink.streaming.connectors.nifi.NiFiSink;

76

import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;

77

import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;

78

import org.apache.flink.api.common.functions.RuntimeContext;

79

import org.apache.flink.configuration.ConfigConstants;

80

import java.util.HashMap;

81

82

// Configure NiFi Site-to-Site client

83

SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()

84

.url("http://localhost:8080/nifi")

85

.portName("Data from Flink")

86

.buildConfig();

87

88

// Create custom data packet builder

89

NiFiDataPacketBuilder<String> builder = new NiFiDataPacketBuilder<String>() {

90

@Override

91

public NiFiDataPacket createNiFiDataPacket(String data, RuntimeContext ctx) {

92

return new StandardNiFiDataPacket(

93

data.getBytes(ConfigConstants.DEFAULT_CHARSET),

94

new HashMap<String, String>()

95

);

96

}

97

};

98

99

// Add sink to data stream

100

DataStream<String> dataStream = env.fromElements("one", "two", "three");

101

dataStream.addSink(new NiFiSink<>(clientConfig, builder));

102

```

103

104

## Architecture

105

106

The connector follows Flink's standard source/sink pattern and integrates with NiFi's Site-to-Site protocol:

107

108

- **NiFiSource**: Pulls data from NiFi output ports using Site-to-Site client

109

- **NiFiSink**: Sends data to NiFi input ports via Site-to-Site protocol

110

- **NiFiDataPacket**: Encapsulates NiFi FlowFile content and attributes

111

- **NiFiDataPacketBuilder**: Transforms Flink data into NiFi-compatible format

112

- **Site-to-Site Integration**: Uses NiFi's native remote protocol for reliable data transfer

113

114

## Capabilities

115

116

### NiFi Data Source

117

118

Streams data from Apache NiFi into Flink using the Site-to-Site protocol. The source function connects to NiFi output ports and pulls data packets containing FlowFile content and attributes.

119

120

```java { .api }

121

public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> implements StoppableFunction {

122

123

private static final long serialVersionUID = 1L;

124

private static final long DEFAULT_WAIT_TIME_MS = 1000;

125

private final SiteToSiteClientConfig clientConfig;

126

private final long waitTimeMs;

127

private transient SiteToSiteClient client;

128

private volatile boolean isRunning = true;

129

130

/**

131

* Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms.

132

*

133

* @param clientConfig the configuration for building a NiFi SiteToSiteClient

134

*/

135

public NiFiSource(SiteToSiteClientConfig clientConfig);

136

137

/**

138

* Constructs a new NiFiSource using the given client config and wait time.

139

*

140

* @param clientConfig the configuration for building a NiFi SiteToSiteClient

141

* @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available to pull from NiFi

142

*/

143

public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs);

144

145

@Override

146

public void open(Configuration parameters) throws Exception;

147

148

@Override

149

public void run(SourceContext<NiFiDataPacket> ctx) throws Exception;

150

151

@Override

152

public void cancel();

153

154

@Override

155

public void close() throws Exception;

156

157

@Override

158

public void stop();

159

}

160

```

161

162

### NiFi Data Sink

163

164

Sends data from Flink to Apache NiFi using the Site-to-Site protocol. The sink function connects to NiFi input ports and sends data packets created by a custom builder.

165

166

```java { .api }

167

public class NiFiSink<T> extends RichSinkFunction<T> {

168

169

private SiteToSiteClient client;

170

private SiteToSiteClientConfig clientConfig;

171

private NiFiDataPacketBuilder<T> builder;

172

173

/**

174

* Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.

175

*

176

* @param clientConfig the configuration for building a NiFi SiteToSiteClient

177

* @param builder a builder to produce NiFiDataPackets from incoming data

178

*/

179

public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder);

180

181

@Override

182

public void open(Configuration parameters) throws Exception;

183

184

@Override

185

public void invoke(T value) throws Exception;

186

187

@Override

188

public void close() throws Exception;

189

}

190

```

191

192

### Data Packet Interface

193

194

Represents a NiFi FlowFile with content and attributes, providing access to both the data payload and metadata associated with the FlowFile.

195

196

```java { .api }

197

public interface NiFiDataPacket {

198

199

/**

200

* @return the contents of a NiFi FlowFile

201

*/

202

byte[] getContent();

203

204

/**

205

* @return a Map of attributes that are associated with the NiFi FlowFile

206

*/

207

Map<String, String> getAttributes();

208

}

209

```

210

211

### Standard Data Packet Implementation

212

213

Concrete implementation of NiFiDataPacket that stores FlowFile content as a byte array and attributes as a string map.

214

215

```java { .api }

216

public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {

217

218

private static final long serialVersionUID = 6364005260220243322L;

219

private final byte[] content;

220

private final Map<String, String> attributes;

221

222

public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes);

223

224

@Override

225

public byte[] getContent();

226

227

@Override

228

public Map<String, String> getAttributes();

229

}

230

```

231

232

### Data Packet Builder Interface

233

234

Functional interface for transforming Flink data into NiFiDataPacket objects. Custom implementations define how to convert stream elements into NiFi-compatible data packets.

235

236

```java { .api }

237

public interface NiFiDataPacketBuilder<T> extends Function, Serializable {

238

239

NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);

240

}

241

```

242

243

## Configuration

244

245

### Site-to-Site Client Configuration

246

247

The connector uses NiFi's SiteToSiteClientConfig for connection settings:

248

249

```java

250

SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()

251

.url("http://nifi-host:8080/nifi") // NiFi URL

252

.portName("Data Port Name") // Port name in NiFi

253

.requestBatchCount(10) // Batch size for requests

254

.timeout(30, TimeUnit.SECONDS) // Connection timeout

255

.buildConfig();

256

```

257

258

### Source Configuration Options

259

260

- **clientConfig**: Required SiteToSiteClientConfig for NiFi connection

261

- **waitTimeMs**: Optional wait time between polling attempts (default: 1000ms)

262

263

### Sink Configuration Options

264

265

- **clientConfig**: Required SiteToSiteClientConfig for NiFi connection

266

- **builder**: Required NiFiDataPacketBuilder for data transformation

267

268

## Error Handling

269

270

- **Transaction Failures**: Source handles null transactions by waiting and retrying

271

- **Connection Issues**: Both source and sink throw exceptions for connection failures

272

- **Data Validation**: Sink validates data packet creation before sending to NiFi

273

- **Resource Cleanup**: Both components properly close Site-to-Site clients in close() methods