or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdinput-formats.mdmapreduce-functions.mdoutput-formats.mdscala-api.mdtype-system.md

scala-api.mddocs/

0

# Scala API

1

2

The Scala API provides native Scala bindings for Hadoop integration within Flink, offering idiomatic Scala interfaces with implicit type information, tuple syntax, and functional programming patterns.

3

4

## Overview

5

6

The Scala API mirrors the Java API functionality but provides Scala-friendly interfaces using native Scala tuples instead of Flink's Tuple2 class, implicit TypeInformation parameters, and object-oriented design patterns consistent with Scala conventions.

7

8

## HadoopInputs Object

9

10

The primary entry point for creating Hadoop InputFormat wrappers in Scala.

11

12

```scala { .api }

13

object HadoopInputs {

14

15

// MapRed API methods

16

def readHadoopFile[K, V](

17

mapredInputFormat: MapredFileInputFormat[K, V],

18

key: Class[K],

19

value: Class[V],

20

inputPath: String,

21

job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

22

23

def readHadoopFile[K, V](

24

mapredInputFormat: MapredFileInputFormat[K, V],

25

key: Class[K],

26

value: Class[V],

27

inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

28

29

def readSequenceFile[K, V](

30

key: Class[K],

31

value: Class[V],

32

inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

33

34

def createHadoopInput[K, V](

35

mapredInputFormat: MapredInputFormat[K, V],

36

key: Class[K],

37

value: Class[V],

38

job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

39

40

// MapReduce API methods

41

def readHadoopFile[K, V](

42

mapreduceInputFormat: MapreduceFileInputFormat[K, V],

43

key: Class[K],

44

value: Class[V],

45

inputPath: String,

46

job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];

47

48

def readHadoopFile[K, V](

49

mapreduceInputFormat: MapreduceFileInputFormat[K, V],

50

key: Class[K],

51

value: Class[V],

52

inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];

53

54

def createHadoopInput[K, V](

55

mapreduceInputFormat: MapreduceInputFormat[K, V],

56

key: Class[K],

57

value: Class[V],

58

job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];

59

}

60

```

61

62

## Scala HadoopInputFormat Classes

63

64

### MapRed HadoopInputFormat

65

66

```scala { .api }

67

@Public

68

class HadoopInputFormat[K, V] extends HadoopInputFormatBase[K, V, (K, V)] {

69

70

// Constructor with JobConf

71

def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V], job: JobConf);

72

73

// Constructor with default JobConf

74

def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]);

75

76

// Read next record as Scala tuple

77

def nextRecord(reuse: (K, V)): (K, V);

78

}

79

```

80

81

### MapReduce HadoopInputFormat

82

83

```scala { .api }

84

@Public

85

class HadoopInputFormat[K, V] extends HadoopInputFormatBase[K, V, (K, V)] {

86

87

// Constructor with Job

88

def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V], job: Job);

89

90

// Constructor with default Job

91

def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]);

92

93

// Read next record as Scala tuple

94

def nextRecord(reuse: (K, V)): (K, V);

95

}

96

```

97

98

## Scala HadoopOutputFormat Classes

99

100

### MapRed HadoopOutputFormat

101

102

```scala { .api }

103

@Public

104

class HadoopOutputFormat[K, V] extends HadoopOutputFormatBase[K, V, (K, V)] {

105

106

// Constructor with JobConf

107

def this(mapredOutputFormat: OutputFormat[K, V], job: JobConf);

108

109

// Constructor with OutputCommitter and JobConf

110

def this(

111

mapredOutputFormat: OutputFormat[K, V],

112

outputCommitterClass: Class[OutputCommitter],

113

job: JobConf);

114

115

// Write a record from Scala tuple

116

def writeRecord(record: (K, V)): Unit;

117

}

118

```

119

120

## Usage Examples

121

122

### Basic Input/Output with Scala

123

124

```scala

125

import org.apache.flink.api.scala._

126

import org.apache.flink.hadoopcompatibility.scala.HadoopInputs

127

import org.apache.hadoop.mapred.{TextInputFormat, JobConf}

128

import org.apache.hadoop.io.{LongWritable, Text}

129

import org.apache.hadoop.fs.Path

130

131

val env = ExecutionEnvironment.getExecutionEnvironment

132

133

// Read text files using Scala API

134

val input: DataSet[(LongWritable, Text)] = env.createInput(

135

HadoopInputs.readHadoopFile(

136

new TextInputFormat(),

137

classOf[LongWritable],

138

classOf[Text],

139

"hdfs://namenode:port/input/path"

140

)

141

)

142

143

// Process data with Scala operations

144

val lines: DataSet[String] = input.map(_._2.toString)

145

val words: DataSet[String] = lines.flatMap(_.split("\\s+"))

146

val wordCounts: DataSet[(String, Int)] = words

147

.map((_, 1))

148

.groupBy(0)

149

.sum(1)

150

```

151

152

### Working with Sequence Files

153

154

```scala

155

import org.apache.hadoop.io.{IntWritable, Text}

156

157

// Read sequence files

158

val sequenceData: DataSet[(IntWritable, Text)] = env.createInput(

159

HadoopInputs.readSequenceFile(

160

classOf[IntWritable],

161

classOf[Text],

162

"hdfs://namenode:port/sequence/files"

163

)

164

)

165

166

// Convert to native Scala types

167

val nativeData: DataSet[(Int, String)] = sequenceData.map {

168

case (key, value) => (key.get(), value.toString)

169

}

170

```

171

172

### Custom Configuration

173

174

```scala

175

import org.apache.hadoop.mapred.JobConf

176

177

// Configure Hadoop job

178

val jobConf = new JobConf()

179

jobConf.set("mapreduce.input.fileinputformat.inputdir", "/custom/input/path")

180

jobConf.set("custom.property", "custom-value")

181

jobConf.setBoolean("custom.flag", true)

182

183

// Use custom configuration

184

val customInput: DataSet[(LongWritable, Text)] = env.createInput(

185

HadoopInputs.readHadoopFile(

186

new TextInputFormat(),

187

classOf[LongWritable],

188

classOf[Text],

189

"/input/path",

190

jobConf

191

)

192

)

193

```

194

195

### Advanced Data Processing

196

197

```scala

198

import org.apache.hadoop.mapred.SequenceFileInputFormat

199

import org.apache.hadoop.io.{BytesWritable, Text}

200

201

// Process binary data

202

val binaryData: DataSet[(BytesWritable, Text)] = env.createInput(

203

HadoopInputs.createHadoopInput(

204

new SequenceFileInputFormat[BytesWritable, Text](),

205

classOf[BytesWritable],

206

classOf[Text],

207

jobConf

208

)

209

)

210

211

// Complex processing pipeline

212

val processedData = binaryData

213

.filter(_._2.toString.nonEmpty)

214

.map { case (bytes, text) =>

215

(text.toString, bytes.getLength)

216

}

217

.groupBy(0)

218

.reduce { (a, b) =>

219

(a._1, a._2 + b._2)

220

}

221

```

222

223

### Writing Output with Scala

224

225

```scala

226

import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat

227

import org.apache.hadoop.mapred.{TextOutputFormat, JobConf}

228

import org.apache.hadoop.io.{NullWritable, Text}

229

import org.apache.hadoop.fs.Path

230

231

// Configure output

232

val outputConf = new JobConf()

233

outputConf.setOutputFormat(classOf[TextOutputFormat[NullWritable, Text]])

234

outputConf.setOutputKeyClass(classOf[NullWritable])

235

outputConf.setOutputValueClass(classOf[Text])

236

TextOutputFormat.setOutputPath(outputConf, new Path("hdfs://namenode:port/output"))

237

238

// Create output format

239

val hadoopOutput = new HadoopOutputFormat(

240

new TextOutputFormat[NullWritable, Text](),

241

outputConf

242

)

243

244

// Prepare data for output

245

val outputData: DataSet[(NullWritable, Text)] = processedData.map {

246

case (word, count) => (NullWritable.get(), new Text(s"$word: $count"))

247

}

248

249

// Write to Hadoop output

250

outputData.output(hadoopOutput)

251

env.execute("Scala Hadoop Integration")

252

```

253

254

### Working with Custom Writable Types

255

256

```scala

257

// Define a custom Writable type

258

class CustomRecord extends Writable {

259

var id: Int = 0

260

var name: String = ""

261

var value: Double = 0.0

262

263

def this(id: Int, name: String, value: Double) = {

264

this()

265

this.id = id

266

this.name = name

267

this.value = value

268

}

269

270

override def write(out: DataOutput): Unit = {

271

out.writeInt(id)

272

out.writeUTF(name)

273

out.writeDouble(value)

274

}

275

276

override def readFields(in: DataInput): Unit = {

277

id = in.readInt()

278

name = in.readUTF()

279

value = in.readDouble()

280

}

281

282

override def toString: String = s"CustomRecord($id, $name, $value)"

283

}

284

285

// Use custom Writable in Scala

286

val customData: DataSet[(LongWritable, CustomRecord)] = env.createInput(

287

HadoopInputs.createHadoopInput(

288

new SequenceFileInputFormat[LongWritable, CustomRecord](),

289

classOf[LongWritable],

290

classOf[CustomRecord],

291

jobConf

292

)

293

)

294

295

// Process custom records

296

val summary = customData

297

.map(_._2) // Extract CustomRecord

298

.groupBy(_.name)

299

.reduce { (a, b) =>

300

new CustomRecord(

301

math.min(a.id, b.id),

302

a.name,

303

a.value + b.value

304

)

305

}

306

```

307

308

### Functional Programming Patterns

309

310

```scala

311

// Use Scala's functional programming features

312

val result = input

313

.map(_._2.toString.toLowerCase.trim)

314

.filter(_.nonEmpty)

315

.flatMap(_.split("\\W+"))

316

.filter(word => word.length > 2 && !stopWords.contains(word))

317

.map((_, 1))

318

.groupBy(0)

319

.sum(1)

320

.filter(_._2 > threshold)

321

.sortPartition(1, Order.DESCENDING)

322

323

// Use pattern matching

324

val categorized = input.map {

325

case (offset, text) if text.toString.startsWith("ERROR") =>

326

("error", text.toString)

327

case (offset, text) if text.toString.startsWith("WARN") =>

328

("warning", text.toString)

329

case (offset, text) =>

330

("info", text.toString)

331

}

332

```

333

334

### Type-Safe Configuration

335

336

```scala

337

// Type-safe configuration helpers

338

object HadoopConfig {

339

def textInput(path: String): JobConf = {

340

val conf = new JobConf()

341

conf.setInputFormat(classOf[TextInputFormat])

342

TextInputFormat.addInputPath(conf, new Path(path))

343

conf

344

}

345

346

def sequenceOutput(path: String): JobConf = {

347

val conf = new JobConf()

348

conf.setOutputFormat(classOf[SequenceFileOutputFormat[_, _]])

349

SequenceFileOutputFormat.setOutputPath(conf, new Path(path))

350

conf

351

}

352

}

353

354

// Use type-safe configuration

355

val input = env.createInput(

356

HadoopInputs.createHadoopInput(

357

new TextInputFormat(),

358

classOf[LongWritable],

359

classOf[Text],

360

HadoopConfig.textInput("/input/path")

361

)

362

)

363

```

364

365

### Error Handling in Scala

366

367

```scala

368

import scala.util.{Try, Success, Failure}

369

370

// Safe input creation

371

def createSafeInput(path: String): Option[DataSet[(LongWritable, Text)]] = {

372

Try {

373

env.createInput(

374

HadoopInputs.readHadoopFile(

375

new TextInputFormat(),

376

classOf[LongWritable],

377

classOf[Text],

378

path

379

)

380

)

381

} match {

382

case Success(input) => Some(input)

383

case Failure(exception) =>

384

println(s"Failed to create input for path $path: ${exception.getMessage}")

385

None

386

}

387

}

388

389

// Use safe input creation

390

createSafeInput("/input/path") match {

391

case Some(input) =>

392

// Process input

393

val result = input.map(_._2.toString).collect()

394

println(s"Processed ${result.length} records")

395

396

case None =>

397

println("Failed to create input, using alternative processing")

398

// Handle error case

399

}

400

```

401

402

## Integration with Flink Scala API Features

403

404

### DataSet Transformations

405

406

```scala

407

// Use Flink's rich transformation API with Hadoop inputs

408

val processedData = input

409

.map(_._2.toString) // Extract text

410

.flatMap(_.split("\\s+")) // Split into words

411

.map(_.toLowerCase.replaceAll("[^a-z]", "")) // Clean words

412

.filter(_.length > 2) // Filter short words

413

.map((_, 1)) // Create word count pairs

414

.groupBy(0) // Group by word

415

.sum(1) // Sum counts

416

.filter(_._2 > 5) // Filter rare words

417

.sortPartition(1, Order.DESCENDING) // Sort by count

418

```

419

420

### Broadcast Variables

421

422

```scala

423

// Use broadcast variables with Hadoop data

424

val stopWords = env.fromElements("the", "a", "an", "and", "or", "but")

425

val broadcastStopWords = stopWords.collect().toSet

426

427

val filteredWords = input

428

.map(_._2.toString.toLowerCase)

429

.flatMap(_.split("\\s+"))

430

.filter(word => !broadcastStopWords.contains(word))

431

```

432

433

### Rich Functions

434

435

```scala

436

import org.apache.flink.api.common.functions.RichMapFunction

437

import org.apache.flink.configuration.Configuration

438

439

class WordProcessor extends RichMapFunction[String, (String, Int)] {

440

var stopWords: Set[String] = _

441

442

override def open(parameters: Configuration): Unit = {

443

// Initialize from broadcast variable or configuration

444

stopWords = Set("the", "a", "an", "and", "or", "but")

445

}

446

447

override def map(word: String): (String, Int) = {

448

val cleaned = word.toLowerCase.replaceAll("[^a-z]", "")

449

if (cleaned.length > 2 && !stopWords.contains(cleaned)) {

450

(cleaned, 1)

451

} else {

452

("", 0) // Will be filtered out later

453

}

454

}

455

}

456

457

// Use rich function with Hadoop input

458

val wordCounts = input

459

.map(_._2.toString)

460

.flatMap(_.split("\\s+"))

461

.map(new WordProcessor())

462

.filter(_._1.nonEmpty)

463

.groupBy(0)

464

.sum(1)

465

```