or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Apache Flink NiFi Connector

1

2

Apache Flink NiFi Connector provides integration between Apache Flink and Apache NiFi through the NiFi Site-to-Site protocol. This connector enables bidirectional streaming data exchange between Flink applications and NiFi clusters, supporting both data ingestion (source) and data publishing (sink) operations.

3

4

**Status**: This connector is deprecated and will be removed in a future Flink release.

5

6

## Package Information

7

8

- **Package Name**: flink-connector-nifi

9

- **Package Type**: maven

10

- **Language**: Java

11

- **Installation**:

12

```xml

13

<dependency>

14

<groupId>org.apache.flink</groupId>

15

<artifactId>flink-connector-nifi</artifactId>

16

<version>1.15.4</version>

17

</dependency>

18

```

19

20

## Core Imports

21

22

```java

23

import org.apache.flink.streaming.connectors.nifi.NiFiSource;

24

import org.apache.flink.streaming.connectors.nifi.NiFiSink;

25

import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;

26

import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;

27

import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;

28

29

// NiFi Site-to-Site client imports

30

import org.apache.nifi.remote.client.SiteToSiteClient;

31

import org.apache.nifi.remote.client.SiteToSiteClientConfig;

32

33

// Additional required imports for examples

34

import java.nio.charset.StandardCharsets;

35

import java.util.HashMap;

36

import java.util.concurrent.TimeUnit;

37

import org.apache.flink.api.common.functions.RuntimeContext;

38

```

39

40

## Basic Usage

41

42

### Source Example - Reading from NiFi

43

44

```java

45

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

46

import org.apache.flink.streaming.api.datastream.DataStream;

47

import org.apache.flink.streaming.connectors.nifi.NiFiSource;

48

import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;

49

import org.apache.nifi.remote.client.SiteToSiteClient;

50

import java.nio.charset.StandardCharsets;

51

52

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

53

54

// Configure NiFi client

55

SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()

56

.url("http://localhost:8080/nifi")

57

.portName("Data for Flink")

58

.requestBatchCount(5)

59

.buildConfig();

60

61

// Create source

62

NiFiSource nifiSource = new NiFiSource(clientConfig);

63

DataStream<NiFiDataPacket> stream = env.addSource(nifiSource).setParallelism(2);

64

65

// Process data packets

66

DataStream<String> content = stream.map(packet ->

67

new String(packet.getContent(), StandardCharsets.UTF_8)

68

);

69

70

env.execute("NiFi Source Job");

71

```

72

73

### Sink Example - Writing to NiFi

74

75

```java

76

import org.apache.flink.streaming.connectors.nifi.NiFiSink;

77

import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;

78

import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;

79

import java.nio.charset.StandardCharsets;

80

import java.util.HashMap;

81

82

// Configure NiFi client for sink

83

SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()

84

.url("http://localhost:8080/nifi")

85

.portName("Data from Flink")

86

.buildConfig();

87

88

// Create data packet builder

89

NiFiDataPacketBuilder<String> builder = (data, ctx) ->

90

new StandardNiFiDataPacket(

91

data.getBytes(StandardCharsets.UTF_8),

92

new HashMap<String, String>()

93

);

94

95

// Create sink

96

NiFiSink<String> nifiSink = new NiFiSink<>(clientConfig, builder);

97

98

// Add to stream

99

DataStream<String> dataStream = env.fromElements("data1", "data2", "data3");

100

dataStream.addSink(nifiSink);

101

102

env.execute("NiFi Sink Job");

103

```

104

105

## Architecture

106

107

The NiFi connector is built around several key components:

108

109

- **NiFi Site-to-Site Protocol**: Uses NiFi's native Site-to-Site client for reliable data transfer

110

- **Transaction Management**: Implements NiFi transaction patterns with confirm/complete cycles for reliability

111

- **Data Packet Abstraction**: Wraps NiFi FlowFiles in NiFiDataPacket interface for Flink processing

112

- **Parallel Processing**: Supports Flink's parallel execution with configurable parallelism

113

- **Builder Pattern**: Uses NiFiDataPacketBuilder for flexible data conversion in sink operations

114

115

## Capabilities

116

117

### Data Source Operations

118

119

Pull data from NiFi output ports using the Site-to-Site protocol.

120

121

```java { .api }

122

/**

123

* A source that pulls data from Apache NiFi using the NiFi Site-to-Site client.

124

* Produces NiFiDataPackets which encapsulate NiFi FlowFile content and attributes.

125

*

126

* @deprecated The NiFi Source has been deprecated and will be removed in a future Flink release.

127

*/

128

@Deprecated

129

public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {

130

131

/**

132

* Constructs a new NiFiSource using the given client config and default wait time of 1000ms.

133

*

134

* @param clientConfig the configuration for building a NiFi SiteToSiteClient

135

*/

136

public NiFiSource(SiteToSiteClientConfig clientConfig);

137

138

/**

139

* Constructs a new NiFiSource using the given client config and wait time.

140

*

141

* @param clientConfig the configuration for building a NiFi SiteToSiteClient

142

* @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available

143

*/

144

public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs);

145

146

/**

147

* Opens the source and creates the SiteToSiteClient.

148

*

149

* @param parameters the configuration parameters from Flink

150

* @throws Exception if the SiteToSiteClient cannot be created

151

*/

152

@Override

153

public void open(Configuration parameters) throws Exception;

154

155

/**

156

* Main execution loop for pulling data from NiFi.

157

* Continuously pulls data packets and emits them to the Flink stream.

158

*

159

* @param ctx the source context for collecting data packets

160

* @throws Exception if errors occur during data transfer

161

*/

162

@Override

163

public void run(SourceContext<NiFiDataPacket> ctx) throws Exception;

164

165

/**

166

* Cancels the running source.

167

*/

168

@Override

169

public void cancel();

170

171

/**

172

* Closes the source and client connection.

173

*/

174

@Override

175

public void close() throws Exception;

176

}

177

```

178

179

### Data Sink Operations

180

181

Send data to NiFi input ports using the Site-to-Site protocol.

182

183

```java { .api }

184

/**

185

* A sink that delivers data to Apache NiFi using the NiFi Site-to-Site client.

186

* Requires a NiFiDataPacketBuilder to create NiFiDataPacket instances from incoming data.

187

*

188

* @param <T> the type of input data

189

* @deprecated The NiFi Sink has been deprecated and will be removed in a future Flink release.

190

*/

191

@Deprecated

192

public class NiFiSink<T> extends RichSinkFunction<T> {

193

194

/**

195

* Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.

196

*

197

* @param clientConfig the configuration for building a NiFi SiteToSiteClient

198

* @param builder a builder to produce NiFiDataPackets from incoming data

199

*/

200

public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder);

201

202

/**

203

* Opens the sink and creates the SiteToSiteClient.

204

*

205

* @param parameters the configuration parameters from Flink

206

* @throws Exception if the SiteToSiteClient cannot be created

207

*/

208

@Override

209

public void open(Configuration parameters) throws Exception;

210

211

/**

212

* Processes and sends individual records to NiFi.

213

* Converts input data to NiFiDataPacket using the builder and sends to NiFi.

214

*

215

* @param value the record to send to NiFi

216

* @throws Exception if data cannot be sent or transaction fails

217

*/

218

@Override

219

public void invoke(T value) throws Exception;

220

221

/**

222

* Closes the sink and client connection.

223

*/

224

@Override

225

public void close() throws Exception;

226

}

227

```

228

229

### Data Packet Interface

230

231

Core interface for wrapping NiFi FlowFile data and attributes.

232

233

```java { .api }

234

/**

235

* The NiFiDataPacket provides a packaging around a NiFi FlowFile.

236

* It wraps both a FlowFile's content and its attributes for processing by Flink.

237

*/

238

public interface NiFiDataPacket {

239

240

/**

241

* @return the contents of a NiFi FlowFile as byte array

242

*/

243

byte[] getContent();

244

245

/**

246

* @return a Map of attributes that are associated with the NiFi FlowFile

247

*/

248

Map<String, String> getAttributes();

249

}

250

```

251

252

### Standard Data Packet Implementation

253

254

Standard implementation of the NiFiDataPacket interface.

255

256

```java { .api }

257

/**

258

* An implementation of NiFiDataPacket that stores content and attributes.

259

*/

260

public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {

261

262

/**

263

* Constructs a StandardNiFiDataPacket with content and attributes.

264

*

265

* @param content the FlowFile content as byte array

266

* @param attributes the FlowFile attributes as string map

267

*/

268

public StandardNiFiDataPacket(byte[] content, Map<String, String> attributes);

269

270

/**

271

* @return the contents of the FlowFile

272

*/

273

@Override

274

public byte[] getContent();

275

276

/**

277

* @return the attributes of the FlowFile

278

*/

279

@Override

280

public Map<String, String> getAttributes();

281

}

282

```

283

284

### Data Packet Builder

285

286

Builder interface for creating NiFiDataPackets from input data in sink operations.

287

288

```java { .api }

289

/**

290

* A function that can create a NiFiDataPacket from an incoming instance of the given type.

291

* Used by NiFiSink to convert Flink stream data to NiFi FlowFile format.

292

*

293

* @param <T> the type of input data to convert

294

*/

295

public interface NiFiDataPacketBuilder<T> extends Function, Serializable {

296

297

/**

298

* Creates a NiFiDataPacket from input data.

299

*

300

* @param t the input data to convert

301

* @param ctx the Flink runtime context

302

* @return NiFiDataPacket containing the converted data

303

*/

304

NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);

305

}

306

```

307

308

## Configuration Requirements

309

310

### Required Dependencies

311

312

```xml

313

<dependency>

314

<groupId>org.apache.nifi</groupId>

315

<artifactId>nifi-site-to-site-client</artifactId>

316

<version>1.14.0</version>

317

</dependency>

318

```

319

320

### Site-to-Site Client Configuration

321

322

```java { .api }

323

/**

324

* Configuration for NiFi Site-to-Site client connection.

325

* Built using SiteToSiteClient.Builder fluent interface.

326

*/

327

SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()

328

.url("http://localhost:8080/nifi") // NiFi instance URL

329

.portName("Port Name") // Input/Output port name

330

.requestBatchCount(5) // Optional: batch size for requests

331

.timeout(30, TimeUnit.SECONDS) // Optional: connection timeout

332

.buildConfig();

333

334

/**

335

* Builder interface for creating SiteToSiteClient configurations.

336

* Part of the Apache NiFi site-to-site-client library.

337

*/

338

public static class SiteToSiteClient.Builder {

339

public Builder url(String url);

340

public Builder portName(String portName);

341

public Builder requestBatchCount(int count);

342

public Builder timeout(long time, TimeUnit unit);

343

public Builder keystoreFilename(String keystoreFilename);

344

public Builder keystorePassword(String keystorePassword);

345

public Builder truststoreFilename(String truststoreFilename);

346

public Builder truststorePassword(String truststorePassword);

347

public SiteToSiteClient build();

348

public SiteToSiteClientConfig buildConfig();

349

public Builder fromConfig(SiteToSiteClientConfig config);

350

}

351

352

/**

353

* Configuration interface for NiFi Site-to-Site client.

354

* Contains connection parameters and SSL settings.

355

*/

356

public interface SiteToSiteClientConfig {

357

String getUrl();

358

String getPortName();

359

Integer getRequestBatchCount();

360

Long getTimeoutMillis();

361

String getKeystoreFilename();

362

String getKeystorePassword();

363

String getTruststoreFilename();

364

String getTruststorePassword();

365

}

366

```

367

368

### Common Configuration Options

369

370

- **url**: Base URL of the NiFi instance (required)

371

- **portName**: Name of the NiFi input or output port (required)

372

- **requestBatchCount**: Number of FlowFiles to request in each batch (optional, improves performance)

373

- **timeout**: Connection and data transfer timeout (optional)

374

- **keystoreFilename/truststoreFilename**: SSL configuration for secure connections (optional)

375

376

## Error Handling

377

378

### Source Error Handling

379

380

- **Transaction Creation Failures**: Source logs warnings and retries with configured wait time

381

- **No Data Available**: Source waits and retries, does not throw exceptions

382

- **Connection Issues**: Handled by underlying NiFi Site-to-Site client with automatic retries

383

384

### Sink Error Handling

385

386

- **Transaction Creation Failures**: Throws `IllegalStateException` if transaction cannot be created

387

- **Send Failures**: Transaction exceptions propagate to Flink's fault tolerance mechanisms

388

- **Connection Issues**: Handled by underlying NiFi Site-to-Site client

389

390

### Common Exceptions

391

392

```java

393

// Sink operation failures

394

IllegalStateException - Unable to create NiFi transaction for sending data

395

396

// General connection and I/O issues handled by:

397

// - org.apache.nifi.remote.exception.TransmissionException

398

// - java.io.IOException for network connectivity issues

399

```

400

401

## Processing Guarantees

402

403

- **Exactly-once Processing**: Achievable when combined with Flink checkpointing and proper NiFi configuration

404

- **Transaction Safety**: Uses NiFi Site-to-Site transaction confirm/complete pattern for reliability

405

- **Parallel Processing**: Supports Flink parallel execution with independent transactions per task

406

- **Fault Tolerance**: Integrates with Flink's checkpoint and recovery mechanisms

407

408

## Migration Notes

409

410

Since this connector is deprecated, consider these alternatives:

411

412

- **Custom Flink Connectors**: Implement custom source/sink using NiFi REST API or MiNiFi

413

- **Message Queue Integration**: Use Kafka, Pulsar, or other message systems as intermediary

414

- **File-based Integration**: Use shared storage (HDFS, S3) for data exchange

415

- **Direct API Integration**: Use NiFi's REST API for control plane operations

416

417

## Limitations

418

419

- **Deprecated Status**: No new features or active development

420

- **NiFi Version Compatibility**: Tied to specific NiFi site-to-site-client version (1.14.0)

421

- **Configuration Complexity**: Requires proper NiFi port configuration and network connectivity

422

- **Limited Error Visibility**: Some errors may be hidden in NiFi Site-to-Site client logging