0
# Input and Output Operations
1
2
Reading data from various sources and writing results to different sinks. These operations handle data ingestion and result persistence.
3
4
## Capabilities
5
6
### File Input Operations
7
8
Read data from various file formats and file systems.
9
10
```scala { .api }
11
class ExecutionEnvironment {
12
/**
13
* Reads a text file as DataSet of strings
14
* @param filePath Path to the text file
15
* @param charsetName Character encoding (default: UTF-8)
16
* @return DataSet of text lines
17
*/
18
def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
19
20
/**
21
* Reads a text file as DataSet of StringValue objects
22
* @param filePath Path to the text file
23
* @param charsetName Character encoding (default: UTF-8)
24
* @return DataSet of StringValue objects
25
*/
26
def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]
27
28
/**
29
* Reads a CSV file into typed DataSet
30
* @param filePath Path to the CSV file
31
* @return DataSet of parsed CSV records
32
*/
33
def readCsvFile[T: ClassTag: TypeInformation](filePath: String): DataSet[T]
34
35
/**
36
* Reads primitive values from a file
37
* @param filePath Path to the file
38
* @param delimiter Value delimiter (default: newline)
39
* @return DataSet of primitive values
40
*/
41
def readFileOfPrimitives[T: ClassTag: TypeInformation](
42
filePath: String,
43
delimiter: String = "\n"
44
): DataSet[T]
45
46
/**
47
* Reads file using custom input format
48
* @param inputFormat Custom file input format
49
* @param filePath Path to the file
50
* @return DataSet with custom format parsing
51
*/
52
def readFile[T: ClassTag: TypeInformation](
53
inputFormat: FileInputFormat[T],
54
filePath: String
55
): DataSet[T]
56
}
57
```
58
59
**Usage Examples:**
60
61
```scala
62
import org.apache.flink.api.scala._
63
64
val env = ExecutionEnvironment.getExecutionEnvironment
65
66
// Read text file
67
val textLines = env.readTextFile("hdfs://path/to/textfile.txt")
68
69
// Read CSV file with automatic parsing
70
case class Person(name: String, age: Int, city: String)
71
val people = env.readCsvFile[Person]("hdfs://path/to/people.csv")
72
73
// Read primitive values
74
val numbers = env.readFileOfPrimitives[Int]("hdfs://path/to/numbers.txt")
75
76
// Read with different encoding
77
val utf16Text = env.readTextFile("hdfs://path/to/file.txt", "UTF-16")
78
```
79
80
### Custom Input Formats
81
82
Create DataSets from custom input sources and formats.
83
84
```scala { .api }
85
class ExecutionEnvironment {
86
/**
87
* Creates DataSet from custom input format
88
* @param inputFormat Custom input format implementation
89
* @return DataSet using custom input
90
*/
91
def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
92
}
93
```
94
95
**Usage Examples:**
96
97
```scala
98
import org.apache.flink.api.common.io.InputFormat
99
100
// Custom input format for reading JSON files
101
class JsonInputFormat[T: ClassTag] extends InputFormat[T, FileInputSplit] {
102
// Implementation for reading JSON data
103
}
104
105
val jsonData = env.createInput(new JsonInputFormat[MyDataClass])
106
```
107
108
### Collection Input Sources
109
110
Create DataSets from in-memory collections and sequences.
111
112
```scala { .api }
113
class ExecutionEnvironment {
114
/**
115
* Creates DataSet from an iterable collection
116
* @param data Iterable collection of elements
117
* @return DataSet containing the collection elements
118
*/
119
def fromCollection[T: TypeInformation: ClassTag](data: Iterable[T]): DataSet[T]
120
121
/**
122
* Creates DataSet from an iterator
123
* @param data Iterator of elements
124
* @return DataSet containing the iterator elements
125
*/
126
def fromCollection[T: TypeInformation: ClassTag](data: Iterator[T]): DataSet[T]
127
128
/**
129
* Creates DataSet from individual elements
130
* @param data Variable arguments of elements
131
* @return DataSet containing the elements
132
*/
133
def fromElements[T: TypeInformation: ClassTag](data: T*): DataSet[T]
134
135
/**
136
* Creates DataSet from a parallel collection
137
* @param iterator Splittable iterator for parallel processing
138
* @return DataSet from parallel collection
139
*/
140
def fromParallelCollection[T: TypeInformation: ClassTag](iterator: SplittableIterator[T]): DataSet[T]
141
142
/**
143
* Generates a sequence of numbers
144
* @param from Starting number (inclusive)
145
* @param to Ending number (inclusive)
146
* @return DataSet containing the number sequence
147
*/
148
def generateSequence(from: Long, to: Long): DataSet[Long]
149
}
150
```
151
152
**Usage Examples:**
153
154
```scala
155
// From Scala collections
156
val listData = env.fromCollection(List(1, 2, 3, 4, 5))
157
val arrayData = env.fromCollection(Array("a", "b", "c"))
158
159
// From individual elements
160
val elementData = env.fromElements("apple", "banana", "cherry")
161
162
// Generate number sequence
163
val numbers = env.generateSequence(1, 1000000)
164
```
165
166
### File Output Operations
167
168
Write DataSets to various file formats and destinations.
169
170
```scala { .api }
171
class DataSet[T] {
172
/**
173
* Writes DataSet as text file
174
* @param filePath Output file path
175
* @param writeMode Write mode (default: NO_OVERWRITE)
176
* @return DataSink for the write operation
177
*/
178
def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = null): DataSink[T]
179
180
/**
181
* Writes DataSet as CSV file
182
* @param filePath Output file path
183
* @param rowDelimiter Row delimiter (default: newline)
184
* @param fieldDelimiter Field delimiter (default: comma)
185
* @param writeMode Write mode (default: NO_OVERWRITE)
186
* @return DataSink for the write operation
187
*/
188
def writeAsCsv(
189
filePath: String,
190
rowDelimiter: String = "\n",
191
fieldDelimiter: String = ",",
192
writeMode: FileSystem.WriteMode = null
193
): DataSink[T]
194
195
/**
196
* Writes using custom file output format
197
* @param outputFormat Custom file output format
198
* @param filePath Output file path
199
* @param writeMode Write mode (default: NO_OVERWRITE)
200
* @return DataSink for the write operation
201
*/
202
def write(
203
outputFormat: FileOutputFormat[T],
204
filePath: String,
205
writeMode: FileSystem.WriteMode = null
206
): DataSink[T]
207
208
/**
209
* Writes using custom output format
210
* @param outputFormat Custom output format implementation
211
* @return DataSink for the write operation
212
*/
213
def output(outputFormat: OutputFormat[T]): DataSink[T]
214
}
215
```
216
217
**Usage Examples:**
218
219
```scala
220
// Write as text file
221
data.writeAsText("hdfs://path/to/output.txt")
222
223
// Write as CSV with custom delimiters
224
people.writeAsCsv("hdfs://path/to/people.csv", "\n", ";")
225
226
// Overwrite existing files
227
results.writeAsText("hdfs://path/to/results.txt", FileSystem.WriteMode.OVERWRITE)
228
```
229
230
### Console Output Operations
231
232
Output DataSets to console for debugging and monitoring.
233
234
```scala { .api }
235
class DataSet[T] {
236
/**
237
* Prints all elements to standard output
238
*/
239
def print(): Unit
240
241
/**
242
* Prints all elements to standard error
243
*/
244
def printToErr(): Unit
245
246
/**
247
* Prints elements on task managers with identifier
248
* @param sinkIdentifier Identifier for the print sink
249
* @return DataSink for the print operation
250
*/
251
def print(sinkIdentifier: String): DataSink[T]
252
253
/**
254
* Prints elements to standard error on task managers with identifier
255
* @param sinkIdentifier Identifier for the print sink
256
* @return DataSink for the print operation
257
*/
258
def printToErr(sinkIdentifier: String): DataSink[T]
259
260
/**
261
* Prints elements on task managers with prefix
262
* @param prefix Prefix for each printed line
263
* @return DataSink for the print operation
264
*/
265
def printOnTaskManager(prefix: String): DataSink[T]
266
}
267
```
268
269
**Usage Examples:**
270
271
```scala
272
// Print to console (for small datasets)
273
smallData.print()
274
275
// Print with identifier in distributed environment
276
data.print("MyDataStream")
277
278
// Print with prefix for identification
279
results.printOnTaskManager("RESULT> ")
280
```
281
282
### Data Collection
283
284
Collect DataSet elements to the driver program for inspection.
285
286
```scala { .api }
287
class DataSet[T] {
288
/**
289
* Collects all elements to the driver program
290
* @return Sequence containing all elements
291
*/
292
def collect(): Seq[T]
293
294
/**
295
* Counts the number of elements in the DataSet
296
* @return Number of elements
297
*/
298
def count(): Long
299
}
300
```
301
302
**Usage Examples:**
303
304
```scala
305
// Collect results (use carefully with large datasets)
306
val results = processedData.collect()
307
results.foreach(println)
308
309
// Count elements
310
val elementCount = largeDataset.count()
311
println(s"Dataset contains $elementCount elements")
312
```
313
314
### Write Modes
315
316
Control behavior when output files already exist.
317
318
```scala { .api }
319
object FileSystem {
320
sealed trait WriteMode
321
case object OVERWRITE extends WriteMode // Overwrite existing files
322
case object NO_OVERWRITE extends WriteMode // Fail if files exist (default)
323
}
324
```
325
326
### Custom Output Formats
327
328
Create custom sinks for specialized output requirements.
329
330
```scala { .api }
331
// Example: Custom output format for writing to databases
332
abstract class OutputFormat[T] {
333
/**
334
* Configures the output format
335
* @param parameters Configuration parameters
336
*/
337
def configure(parameters: Configuration): Unit
338
339
/**
340
* Opens the output format
341
* @param taskNumber Task number
342
* @param numTasks Total number of tasks
343
*/
344
def open(taskNumber: Int, numTasks: Int): Unit
345
346
/**
347
* Writes a record
348
* @param record Record to write
349
*/
350
def writeRecord(record: T): Unit
351
352
/**
353
* Closes the output format
354
*/
355
def close(): Unit
356
}
357
```
358
359
**Usage Examples:**
360
361
```scala
362
// Custom database output format
363
class DatabaseOutputFormat[T] extends OutputFormat[T] {
364
private var connection: Connection = _
365
366
override def configure(parameters: Configuration): Unit = {
367
// Setup database connection parameters
368
}
369
370
override def open(taskNumber: Int, numTasks: Int): Unit = {
371
// Open database connection
372
}
373
374
override def writeRecord(record: T): Unit = {
375
// Write record to database
376
}
377
378
override def close(): Unit = {
379
// Close database connection
380
}
381
}
382
383
// Use custom output format
384
data.output(new DatabaseOutputFormat[Person])
385
```
386
387
### Broadcast Variables
388
389
Access side inputs in operations through broadcast variables.
390
391
```scala { .api }
392
class DataSet[T] {
393
/**
394
* Adds a broadcast DataSet that can be accessed in transformations
395
* @param data DataSet to broadcast
396
* @param name Name for accessing the broadcast data
397
* @return DataSet with broadcast variable configured
398
*/
399
def withBroadcastSet(data: DataSet[_], name: String): DataSet[T]
400
}
401
402
// In function implementations, access broadcast data:
403
abstract class RichMapFunction[T, O] extends MapFunction[T, O] {
404
/**
405
* Gets broadcast variable by name
406
* @param name Broadcast variable name
407
* @return List of broadcast elements
408
*/
409
def getBroadcastVariable[X](name: String): java.util.List[X]
410
411
/**
412
* Gets broadcast variable with hint
413
* @param name Broadcast variable name
414
* @param hint Broadcast variable hint
415
* @return Broadcast variable with hint
416
*/
417
def getBroadcastVariableWithInitializer[X, Y](
418
name: String,
419
hint: BroadcastVariableInitializer[X, Y]
420
): Y
421
}
422
```
423
424
**Usage Examples:**
425
426
```scala
427
import org.apache.flink.api.common.functions.RichMapFunction
428
import org.apache.flink.configuration.Configuration
429
430
// Broadcast lookup data
431
val lookupData = env.fromElements(("key1", "value1"), ("key2", "value2"))
432
433
// Use broadcast data in transformation
434
val enrichedData = data
435
.map(new RichMapFunction[String, String] {
436
var lookup: Map[String, String] = _
437
438
override def open(parameters: Configuration): Unit = {
439
val broadcastData = getBroadcastVariable[(String, String)]("lookup")
440
lookup = broadcastData.asScala.toMap
441
}
442
443
override def map(value: String): String = {
444
lookup.getOrElse(value, "unknown")
445
}
446
})
447
.withBroadcastSet(lookupData, "lookup")
448
```
449
450
## Types
451
452
```scala { .api }
453
trait InputFormat[T, InputSplit] {
454
/**
455
* Configures the input format
456
* @param parameters Configuration parameters
457
*/
458
def configure(parameters: Configuration): Unit
459
460
/**
461
* Creates input splits for parallel reading
462
* @param minNumSplits Minimum number of splits
463
* @return Array of input splits
464
*/
465
def createInputSplits(minNumSplits: Int): Array[InputSplit]
466
467
/**
468
* Opens an input split for reading
469
* @param split Input split to open
470
*/
471
def open(split: InputSplit): Unit
472
473
/**
474
* Checks if more records are available
475
* @return True if more records available
476
*/
477
def reachedEnd(): Boolean
478
479
/**
480
* Reads the next record
481
* @param reuse Object to reuse for the record
482
* @return Next record or null if end reached
483
*/
484
def nextRecord(reuse: T): T
485
486
/**
487
* Closes the input format
488
*/
489
def close(): Unit
490
}
491
492
abstract class FileInputFormat[T] extends InputFormat[T, FileInputSplit] {
493
/**
494
* Sets the path to read from
495
* @param filePath File path
496
*/
497
def setFilePath(filePath: Path): Unit
498
499
/**
500
* Sets file paths to read from
501
* @param filePaths Array of file paths
502
*/
503
def setFilePaths(filePaths: Path*): Unit
504
}
505
506
class DataSink[T] {
507
/**
508
* Sets the parallelism for this sink
509
* @param parallelism Degree of parallelism
510
* @return DataSink with specified parallelism
511
*/
512
def setParallelism(parallelism: Int): DataSink[T]
513
514
/**
515
* Gets the parallelism of this sink
516
* @return Current parallelism setting
517
*/
518
def getParallelism: Int
519
520
/**
521
* Sets the name for this sink
522
* @param name Sink name
523
* @return DataSink with specified name
524
*/
525
def name(name: String): DataSink[T]
526
}
527
528
trait SplittableIterator[T] extends Iterator[T] {
529
/**
530
* Splits the iterator into multiple iterators
531
* @param numPartitions Number of partitions
532
* @return Array of split iterators
533
*/
534
def split(numPartitions: Int): Array[Iterator[T]]
535
536
/**
537
* Gets the maximum number of splits
538
* @return Maximum number of splits
539
*/
540
def getMaximumNumberOfSplits: Int
541
}
542
543
class StringValue extends Comparable[StringValue] {
544
/**
545
* Creates StringValue from string
546
* @param value String value
547
*/
548
def this(value: String)
549
550
/**
551
* Gets the string value
552
* @return String value
553
*/
554
def getValue: String
555
556
/**
557
* Sets the string value
558
* @param value String value
559
*/
560
def setValue(value: String): Unit
561
}
562
```