0
# Data Sources and Sinks
1
2
Apache Flink Scala API provides comprehensive support for reading data from various sources and writing results to different output formats with type-safe operations.
3
4
## Data Sources
5
6
### Collection Sources
7
8
Create DataSets from in-memory collections, useful for testing and small datasets.
9
10
```scala { .api }
11
class ExecutionEnvironment {
12
// Create DataSet from Scala collections
13
def fromCollection[T: ClassTag : TypeInformation](data: Iterable[T]): DataSet[T]
14
def fromCollection[T: ClassTag : TypeInformation](data: Iterator[T]): DataSet[T]
15
16
// Create DataSet from individual elements
17
def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T]
18
19
// Create DataSet from parallel splittable iterator
20
def fromParallelCollection[T: ClassTag : TypeInformation](data: SplittableIterator[T]): DataSet[T]
21
22
// Generate sequence of numbers
23
def generateSequence(from: Long, to: Long): DataSet[Long]
24
}
25
```
26
27
### File Sources
28
29
Read data from various file formats with configurable encoding and parsing options.
30
31
```scala { .api }
32
class ExecutionEnvironment {
33
// Read text files
34
def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
35
def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]
36
37
// Read CSV files with type-safe parsing
38
def readCsvFile[T: ClassTag : TypeInformation](filePath: String): CsvReader[T]
39
40
// Read files containing primitive values
41
def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, delimiter: String): DataSet[T]
42
def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, typeClass: Class[T]): DataSet[T]
43
}
44
```
45
46
### Custom Input Formats
47
48
```scala { .api }
49
class ExecutionEnvironment {
50
// Use custom input format
51
def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
52
}
53
```
54
55
## Data Sinks
56
57
### Basic Output Operations
58
59
```scala { .api }
60
class DataSet[T] {
61
// Write to text files
62
def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = NO_OVERWRITE): DataSink[T]
63
64
// Write with custom text formatting
65
def writeAsFormattedText(
66
filePath: String,
67
writeMode: FileSystem.WriteMode = NO_OVERWRITE,
68
format: TextFormatter[T]
69
): DataSink[T]
70
71
// Write as CSV
72
def writeAsCsv(
73
filePath: String,
74
rowDelimiter: String = ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
75
fieldDelimiter: String = ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER,
76
writeMode: FileSystem.WriteMode = NO_OVERWRITE
77
): DataSink[T]
78
}
79
```
80
81
### Custom Output Formats
82
83
```scala { .api }
84
class DataSet[T] {
85
// Use custom output format
86
def write(
87
outputFormat: OutputFormat[T],
88
filePath: String,
89
writeMode: FileSystem.WriteMode = NO_OVERWRITE
90
): DataSink[T]
91
92
def output(outputFormat: OutputFormat[T]): DataSink[T]
93
}
94
```
95
96
### Console Output
97
98
```scala { .api }
99
class DataSet[T] {
100
// Print to standard output
101
def print(): DataSink[T]
102
103
// Print to standard error
104
def printToErr(): DataSink[T]
105
106
// Print on task manager (for debugging)
107
def printOnTaskManager(sinkIdentifier: String): DataSink[T]
108
}
109
```
110
111
### Collect Results
112
113
```scala { .api }
114
class DataSet[T] {
115
// Collect all elements to driver program
116
def collect(): Seq[T]
117
118
// Count elements
119
def count(): Long
120
}
121
```
122
123
## CSV Reader Configuration
124
125
```scala { .api }
126
class CsvReader[T] {
127
// Configure field parsing
128
def fieldDelimiter(delimiter: String): CsvReader[T]
129
def lineDelimiter(delimiter: String): CsvReader[T]
130
131
// Configure data types
132
def types(types: Class[_]*): CsvReader[T]
133
def pojoType[P](pojoType: Class[P], fields: String*): DataSet[P]
134
def tupleType[T](types: Class[_]*): DataSet[T]
135
136
// Configure parsing options
137
def includeFields(mask: String): CsvReader[T]
138
def includeFields(includeMask: Boolean*): CsvReader[T]
139
def parseQuotedStrings(delimiter: Char): CsvReader[T]
140
def ignoreComments(commentPrefix: String): CsvReader[T]
141
def ignoreInvalidLines(): CsvReader[T]
142
def ignoreFirstLine(): CsvReader[T]
143
}
144
```
145
146
## Usage Examples
147
148
### Reading from Collections
149
150
```scala
151
import org.apache.flink.api.scala._
152
153
val env = ExecutionEnvironment.getExecutionEnvironment
154
155
// From collection
156
val data1 = env.fromCollection(List(1, 2, 3, 4, 5))
157
158
// From elements
159
val data2 = env.fromElements("hello", "world", "flink")
160
161
// Generate sequence
162
val sequence = env.generateSequence(1, 1000)
163
```
164
165
### Reading Text Files
166
167
```scala
168
import org.apache.flink.api.scala._
169
170
val env = ExecutionEnvironment.getExecutionEnvironment
171
172
// Read text file
173
val lines = env.readTextFile("/path/to/file.txt")
174
175
// Process lines
176
val words = lines.flatMap(_.split(" "))
177
```
178
179
### Reading CSV Files
180
181
```scala
182
import org.apache.flink.api.scala._
183
184
val env = ExecutionEnvironment.getExecutionEnvironment
185
186
// Define case class for CSV data
187
case class Person(name: String, age: Int, city: String)
188
189
// Read CSV with case class
190
val people = env.readCsvFile[Person]("/path/to/people.csv")
191
.fieldDelimiter(",")
192
.includeFields("111") // name, age, city
193
.ignoreFirstLine()
194
195
// Read CSV as tuples
196
val tuples = env.readCsvFile[(String, Int, String)]("/path/to/people.csv")
197
.fieldDelimiter(",")
198
.types(classOf[String], classOf[Int], classOf[String])
199
```
200
201
### Writing Results
202
203
```scala
204
import org.apache.flink.api.scala._
205
import org.apache.flink.core.fs.FileSystem.WriteMode
206
207
val env = ExecutionEnvironment.getExecutionEnvironment
208
val result = env.fromElements(1, 2, 3, 4, 5).map(_ * 2)
209
210
// Write to text file
211
result.writeAsText("/path/to/output.txt", WriteMode.OVERWRITE)
212
213
// Write as CSV
214
case class Result(id: Int, value: Int)
215
val resultData = env.fromElements(Result(1, 10), Result(2, 20))
216
resultData.writeAsCsv("/path/to/results.csv", "\n", ",", WriteMode.OVERWRITE)
217
218
// Print to console
219
result.print()
220
221
// Collect to driver
222
val collected: Seq[Int] = result.collect()
223
println(s"Results: ${collected.mkString(", ")}")
224
```
225
226
### Custom Input/Output Formats
227
228
```scala
229
import org.apache.flink.api.scala._
230
import org.apache.flink.api.common.io.InputFormat
231
import org.apache.flink.api.java.io.TextOutputFormat
232
233
val env = ExecutionEnvironment.getExecutionEnvironment
234
235
// Custom input format
236
class MyInputFormat extends InputFormat[String, _] {
237
// Implementation details...
238
}
239
240
val customData = env.createInput(new MyInputFormat())
241
242
// Custom output format
243
val data = env.fromElements("a", "b", "c")
244
data.output(new TextOutputFormat[String](new Path("/path/to/output")))
245
```