or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnectors.mdevent-time-watermarks.mdexecution-jobs.mdfunctions-and-operators.mdindex.mdstate-management.mdtype-system-serialization.mdutilities.md

index.mddocs/

0

# Apache Flink Core

1

2

Apache Flink Core is the foundational module that provides essential APIs for building distributed data processing applications. It offers a comprehensive set of interfaces and classes for job execution, data transformation, state management, and system configuration.

3

4

## Core Capabilities

5

6

### Job Execution and Runtime Context

7

8

```java { .api }

9

import org.apache.flink.api.common.JobExecutionResult;

10

import org.apache.flink.api.common.ExecutionConfig;

11

import org.apache.flink.api.common.functions.RuntimeContext;

12

13

// Configure execution parameters

14

ExecutionConfig config = new ExecutionConfig();

15

config.enableClosureCleaner();

16

config.setParallelism(4);

17

18

// Access runtime context in functions

19

public class MyMapFunction implements MapFunction<String, String> {

20

@Override

21

public String map(String value) throws Exception {

22

RuntimeContext ctx = getRuntimeContext();

23

int parallelism = ctx.getNumberOfParallelSubtasks();

24

return value + "_" + parallelism;

25

}

26

}

27

```

28

29

### User-Defined Functions

30

31

```java { .api }

32

import org.apache.flink.api.common.functions.*;

33

34

// Map function for 1-to-1 transformations

35

public class MyMapFunction implements MapFunction<String, Integer> {

36

@Override

37

public Integer map(String value) throws Exception {

38

return value.length();

39

}

40

}

41

42

// FlatMap function for 1-to-many transformations

43

public class TokenizerFunction implements FlatMapFunction<String, String> {

44

@Override

45

public void flatMap(String value, Collector<String> out) throws Exception {

46

for (String word : value.split(" ")) {

47

out.collect(word);

48

}

49

}

50

}

51

52

// Filter function for predicate-based filtering

53

public class LengthFilter implements FilterFunction<String> {

54

@Override

55

public boolean filter(String value) throws Exception {

56

return value.length() > 3;

57

}

58

}

59

60

// Reduce function for aggregations

61

public class SumReduceFunction implements ReduceFunction<Integer> {

62

@Override

63

public Integer reduce(Integer value1, Integer value2) throws Exception {

64

return value1 + value2;

65

}

66

}

67

```

68

69

### Type System and Serialization

70

71

```java { .api }

72

import org.apache.flink.api.common.typeinfo.TypeInformation;

73

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

74

import org.apache.flink.api.java.typeutils.TupleTypeInfo;

75

import org.apache.flink.api.common.serialization.SimpleStringSchema;

76

77

// Basic type information

78

TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;

79

TypeInformation<Integer> intType = BasicTypeInfo.INT_TYPE_INFO;

80

81

// Tuple type information

82

TupleTypeInfo<Tuple2<String, Integer>> tupleType =

83

new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);

84

85

// Custom serialization schema

86

public class MySerializationSchema implements SerializationSchema<MyClass> {

87

@Override

88

public byte[] serialize(MyClass element) {

89

// Custom serialization logic

90

return element.toString().getBytes();

91

}

92

}

93

```

94

95

### State Management

96

97

```java { .api }

98

import org.apache.flink.api.common.state.*;

99

import org.apache.flink.api.common.functions.RichMapFunction;

100

import org.apache.flink.api.common.functions.OpenContext;

101

102

public class StatefulMapFunction extends RichMapFunction<String, String> {

103

private ValueState<Integer> countState;

104

105

@Override

106

public void open(OpenContext openContext) throws Exception {

107

ValueStateDescriptor<Integer> descriptor =

108

new ValueStateDescriptor<>("count", Integer.class, 0);

109

countState = getRuntimeContext().getState(descriptor);

110

}

111

112

@Override

113

public String map(String value) throws Exception {

114

Integer currentCount = countState.value();

115

currentCount++;

116

countState.update(currentCount);

117

return value + "_" + currentCount;

118

}

119

}

120

```

121

122

### Event Time and Watermarks

123

124

```java { .api }

125

import org.apache.flink.api.common.eventtime.*;

126

127

// Custom watermark strategy

128

WatermarkStrategy<MyEvent> watermarkStrategy =

129

WatermarkStrategy.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))

130

.withTimestampAssigner((event, timestamp) -> event.getTimestamp());

131

132

// Custom watermark generator

133

public class MyWatermarkGenerator implements WatermarkGenerator<MyEvent> {

134

private long maxTimestamp = Long.MIN_VALUE;

135

private final long maxOutOfOrderness = 5000; // 5 seconds

136

137

@Override

138

public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {

139

maxTimestamp = Math.max(maxTimestamp, eventTimestamp);

140

}

141

142

@Override

143

public void onPeriodicEmit(WatermarkOutput output) {

144

output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));

145

}

146

}

147

```

148

149

### Data Sources and Sinks

150

151

```java { .api }

152

import org.apache.flink.api.connector.source.*;

153

import org.apache.flink.api.connector.sink2.*;

154

155

// Custom source implementation

156

public class MySource implements Source<String, MySplit, MySourceEnumState> {

157

@Override

158

public Boundedness getBoundedness() {

159

return Boundedness.CONTINUOUS_UNBOUNDED;

160

}

161

162

@Override

163

public SourceReader<String, MySplit> createReader(SourceReaderContext readerContext) {

164

return new MySourceReader();

165

}

166

167

@Override

168

public SplitEnumerator<MySplit, MySourceEnumState> createEnumerator(

169

SplitEnumeratorContext<MySplit> enumContext) {

170

return new MySplitEnumerator();

171

}

172

}

173

174

// Custom sink implementation

175

public class MySink implements Sink<String> {

176

@Override

177

public SinkWriter<String> createWriter(InitContext context) throws IOException {

178

return new MySinkWriter();

179

}

180

}

181

```

182

183

### Configuration Management

184

185

```java { .api }

186

import org.apache.flink.configuration.*;

187

188

// Reading configuration values

189

Configuration config = new Configuration();

190

int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);

191

String tmpDir = config.getString(CoreOptions.TMP_DIRS);

192

193

// Setting configuration options

194

config.setInteger(CoreOptions.DEFAULT_PARALLELISM, 8);

195

config.setString(CoreOptions.TMP_DIRS, "/tmp/flink");

196

197

// Custom configuration options

198

public static final ConfigOption<String> MY_OPTION =

199

ConfigOptions.key("my.custom.option")

200

.stringType()

201

.defaultValue("default-value")

202

.withDescription("Description of my custom option");

203

```

204

205

## Package Organization

206

207

Apache Flink Core is organized into several key packages:

208

209

- **`org.apache.flink.api.common.*`** - Core APIs for execution, functions, types, and state

210

- **`org.apache.flink.api.connector.*`** - Source and sink connector interfaces

211

- **`org.apache.flink.configuration.*`** - Configuration system and options

212

- **`org.apache.flink.core.*`** - Core execution, filesystem, I/O, and memory management

213

- **`org.apache.flink.types.*`** - Basic data types and utilities

214

- **`org.apache.flink.util.*`** - Common utility classes and functions

215

216

## Detailed Documentation

217

218

### [Functions and Operators](./functions-and-operators.md)

219

User-defined functions, transformation operators, and function interfaces for data processing pipelines.

220

221

### [Type System and Serialization](./type-system-serialization.md)

222

Type information system, serializers, and type utilities for handling data types in Flink applications.

223

224

### [State Management](./state-management.md)

225

Stateful computation APIs, state descriptors, and state backends for managing application state.

226

227

### [Event Time and Watermarks](./event-time-watermarks.md)

228

Time-based processing, watermark generation, and timestamp assignment for event-time computations.

229

230

### [Connectors](./connectors.md)

231

Source and sink APIs for data ingestion and output, including connector interfaces and utilities.

232

233

### [Configuration System](./configuration.md)

234

Configuration management, options, and system settings for Flink applications and clusters.

235

236

### [Execution and Jobs](./execution-jobs.md)

237

Job execution, task management, runtime contexts, and execution environments.

238

239

### [Core Utilities](./utilities.md)

240

Common utility classes, I/O operations, memory management, and filesystem abstractions.

241

242

## Getting Started

243

244

To use Apache Flink Core in your project:

245

246

```xml { .api }

247

<dependency>

248

<groupId>org.apache.flink</groupId>

249

<artifactId>flink-core</artifactId>

250

<version>1.18.0</version>

251

</dependency>

252

```

253

254

### Basic Usage Example

255

256

```java { .api }

257

import org.apache.flink.api.common.functions.MapFunction;

258

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

259

import org.apache.flink.streaming.api.datastream.DataStream;

260

261

public class BasicFlinkApp {

262

public static void main(String[] args) throws Exception {

263

// Create execution environment

264

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

265

266

// Create data stream

267

DataStream<String> text = env.fromElements("hello", "world", "flink");

268

269

// Apply transformations using Flink Core APIs

270

DataStream<Integer> lengths = text.map(new MapFunction<String, Integer>() {

271

@Override

272

public Integer map(String value) throws Exception {

273

return value.length();

274

}

275

});

276

277

// Output results

278

lengths.print();

279

280

// Execute the job

281

env.execute("Basic Flink Application");

282

}

283

}

284

```

285

286

## Key Concepts

287

288

- **Functions**: User-defined transformation logic implemented via function interfaces

289

- **Type Information**: System for managing data types and serialization

290

- **State**: Managed state for stateful computations and fault tolerance

291

- **Event Time**: Processing based on event timestamps rather than processing time

292

- **Watermarks**: Mechanism for handling out-of-order events in event-time processing

293

- **Sources/Sinks**: Interfaces for data ingestion and output

294

- **Configuration**: System for managing application and cluster settings

295

- **Execution**: Runtime system for distributed job execution

296

297

Apache Flink Core provides the foundation for building robust, scalable, and fault-tolerant stream and batch processing applications. The modular design allows developers to use only the components they need while maintaining full compatibility with the broader Flink ecosystem.