or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-sources-sinks.mdexecution-environment.mdgrouping-aggregation.mdhadoop-integration.mdindex.mditerations.mdjoins-cogroups.mdtransformations.mdtype-system.md

data-sources-sinks.mddocs/

0

# Data Sources and Sinks

1

2

Apache Flink Scala API provides comprehensive support for reading data from various sources and writing results to different output formats with type-safe operations.

3

4

## Data Sources

5

6

### Collection Sources

7

8

Create DataSets from in-memory collections, useful for testing and small datasets.

9

10

```scala { .api }

11

class ExecutionEnvironment {

12

// Create DataSet from Scala collections

13

def fromCollection[T: ClassTag : TypeInformation](data: Iterable[T]): DataSet[T]

14

def fromCollection[T: ClassTag : TypeInformation](data: Iterator[T]): DataSet[T]

15

16

// Create DataSet from individual elements

17

def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T]

18

19

// Create DataSet from parallel splittable iterator

20

def fromParallelCollection[T: ClassTag : TypeInformation](data: SplittableIterator[T]): DataSet[T]

21

22

// Generate sequence of numbers

23

def generateSequence(from: Long, to: Long): DataSet[Long]

24

}

25

```

26

27

### File Sources

28

29

Read data from various file formats with configurable encoding and parsing options.

30

31

```scala { .api }

32

class ExecutionEnvironment {

33

// Read text files

34

def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]

35

def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]

36

37

// Read CSV files with type-safe parsing

38

def readCsvFile[T: ClassTag : TypeInformation](filePath: String): CsvReader[T]

39

40

// Read files containing primitive values

41

def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, delimiter: String): DataSet[T]

42

def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, typeClass: Class[T]): DataSet[T]

43

}

44

```

45

46

### Custom Input Formats

47

48

```scala { .api }

49

class ExecutionEnvironment {

50

// Use custom input format

51

def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]

52

}

53

```

54

55

## Data Sinks

56

57

### Basic Output Operations

58

59

```scala { .api }

60

class DataSet[T] {

61

// Write to text files

62

def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = NO_OVERWRITE): DataSink[T]

63

64

// Write with custom text formatting

65

def writeAsFormattedText(

66

filePath: String,

67

writeMode: FileSystem.WriteMode = NO_OVERWRITE,

68

format: TextFormatter[T]

69

): DataSink[T]

70

71

// Write as CSV

72

def writeAsCsv(

73

filePath: String,

74

rowDelimiter: String = ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,

75

fieldDelimiter: String = ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER,

76

writeMode: FileSystem.WriteMode = NO_OVERWRITE

77

): DataSink[T]

78

}

79

```

80

81

### Custom Output Formats

82

83

```scala { .api }

84

class DataSet[T] {

85

// Use custom output format

86

def write(

87

outputFormat: OutputFormat[T],

88

filePath: String,

89

writeMode: FileSystem.WriteMode = NO_OVERWRITE

90

): DataSink[T]

91

92

def output(outputFormat: OutputFormat[T]): DataSink[T]

93

}

94

```

95

96

### Console Output

97

98

```scala { .api }

99

class DataSet[T] {

100

// Print to standard output

101

def print(): DataSink[T]

102

103

// Print to standard error

104

def printToErr(): DataSink[T]

105

106

// Print on task manager (for debugging)

107

def printOnTaskManager(sinkIdentifier: String): DataSink[T]

108

}

109

```

110

111

### Collect Results

112

113

```scala { .api }

114

class DataSet[T] {

115

// Collect all elements to driver program

116

def collect(): Seq[T]

117

118

// Count elements

119

def count(): Long

120

}

121

```

122

123

## CSV Reader Configuration

124

125

```scala { .api }

126

class CsvReader[T] {

127

// Configure field parsing

128

def fieldDelimiter(delimiter: String): CsvReader[T]

129

def lineDelimiter(delimiter: String): CsvReader[T]

130

131

// Configure data types

132

def types(types: Class[_]*): CsvReader[T]

133

def pojoType[P](pojoType: Class[P], fields: String*): DataSet[P]

134

def tupleType[T](types: Class[_]*): DataSet[T]

135

136

// Configure parsing options

137

def includeFields(mask: String): CsvReader[T]

138

def includeFields(includeMask: Boolean*): CsvReader[T]

139

def parseQuotedStrings(delimiter: Char): CsvReader[T]

140

def ignoreComments(commentPrefix: String): CsvReader[T]

141

def ignoreInvalidLines(): CsvReader[T]

142

def ignoreFirstLine(): CsvReader[T]

143

}

144

```

145

146

## Usage Examples

147

148

### Reading from Collections

149

150

```scala

151

import org.apache.flink.api.scala._

152

153

val env = ExecutionEnvironment.getExecutionEnvironment

154

155

// From collection

156

val data1 = env.fromCollection(List(1, 2, 3, 4, 5))

157

158

// From elements

159

val data2 = env.fromElements("hello", "world", "flink")

160

161

// Generate sequence

162

val sequence = env.generateSequence(1, 1000)

163

```

164

165

### Reading Text Files

166

167

```scala

168

import org.apache.flink.api.scala._

169

170

val env = ExecutionEnvironment.getExecutionEnvironment

171

172

// Read text file

173

val lines = env.readTextFile("/path/to/file.txt")

174

175

// Process lines

176

val words = lines.flatMap(_.split(" "))

177

```

178

179

### Reading CSV Files

180

181

```scala

182

import org.apache.flink.api.scala._

183

184

val env = ExecutionEnvironment.getExecutionEnvironment

185

186

// Define case class for CSV data

187

case class Person(name: String, age: Int, city: String)

188

189

// Read CSV with case class

190

val people = env.readCsvFile[Person]("/path/to/people.csv")

191

.fieldDelimiter(",")

192

.includeFields("111") // name, age, city

193

.ignoreFirstLine()

194

195

// Read CSV as tuples

196

val tuples = env.readCsvFile[(String, Int, String)]("/path/to/people.csv")

197

.fieldDelimiter(",")

198

.types(classOf[String], classOf[Int], classOf[String])

199

```

200

201

### Writing Results

202

203

```scala

204

import org.apache.flink.api.scala._

205

import org.apache.flink.core.fs.FileSystem.WriteMode

206

207

val env = ExecutionEnvironment.getExecutionEnvironment

208

val result = env.fromElements(1, 2, 3, 4, 5).map(_ * 2)

209

210

// Write to text file

211

result.writeAsText("/path/to/output.txt", WriteMode.OVERWRITE)

212

213

// Write as CSV

214

case class Result(id: Int, value: Int)

215

val resultData = env.fromElements(Result(1, 10), Result(2, 20))

216

resultData.writeAsCsv("/path/to/results.csv", "\n", ",", WriteMode.OVERWRITE)

217

218

// Print to console

219

result.print()

220

221

// Collect to driver

222

val collected: Seq[Int] = result.collect()

223

println(s"Results: ${collected.mkString(", ")}")

224

```

225

226

### Custom Input/Output Formats

227

228

```scala

229

import org.apache.flink.api.scala._

230

import org.apache.flink.api.common.io.InputFormat

231

import org.apache.flink.api.java.io.TextOutputFormat

232

233

val env = ExecutionEnvironment.getExecutionEnvironment

234

235

// Custom input format

236

class MyInputFormat extends InputFormat[String, _] {

237

// Implementation details...

238

}

239

240

val customData = env.createInput(new MyInputFormat())

241

242

// Custom output format

243

val data = env.fromElements("a", "b", "c")

244

data.output(new TextOutputFormat[String](new Path("/path/to/output")))

245

```