or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro.mdhadoop.mdhbase.mdhcatalog.mdindex.mdjdbc.md

avro.mddocs/

0

# Avro Connectors

1

2

Apache Avro serialization support for Flink batch processing, providing efficient binary serialization with schema evolution support.

3

4

## Capabilities

5

6

### AvroInputFormat

7

8

Reads Apache Avro files into Flink DataSets with full type safety and schema support.

9

10

```java { .api }

11

/**

12

* Input format for reading Avro files in Flink batch jobs

13

* @param <E> The type of records to read

14

*/

15

public class AvroInputFormat<E> extends FileInputFormat<E>

16

implements ResultTypeQueryable<E>, CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {

17

18

/**

19

* Creates an AvroInputFormat for reading Avro files

20

* @param filePath Path to the Avro file or directory

21

* @param type Class representing the record type to read

22

*/

23

public AvroInputFormat(Path filePath, Class<E> type);

24

25

/**

26

* Sets whether to reuse Avro value instances for better performance

27

* @param reuseAvroValue true to reuse instances, false to create new ones

28

*/

29

public void setReuseAvroValue(boolean reuseAvroValue);

30

31

/**

32

* Sets whether files should be read as whole (non-splittable)

33

* @param unsplittable true to read files as whole, false to allow splitting

34

*/

35

public void setUnsplittable(boolean unsplittable);

36

37

/**

38

* Returns the type information for the records produced by this format

39

* @return TypeInformation describing the output type

40

*/

41

public TypeInformation<E> getProducedType();

42

43

/**

44

* Opens the input split for reading

45

* @param split The file input split to read from

46

*/

47

public void open(FileInputSplit split) throws IOException;

48

49

/**

50

* Checks if the end of the input has been reached

51

* @return true if no more records are available

52

*/

53

public boolean reachedEnd() throws IOException;

54

55

/**

56

* Reads the next record from the input

57

* @param reuseValue Record instance to reuse (may be null)

58

* @return The next record

59

*/

60

public E nextRecord(E reuseValue) throws IOException;

61

62

/**

63

* Returns the number of records read from the current block

64

* @return Number of records read

65

*/

66

public long getRecordsReadFromBlock();

67

68

/**

69

* Gets the current checkpoint state for fault tolerance

70

* @return Checkpoint state as (position, recordsRead) tuple

71

*/

72

public Tuple2<Long, Long> getCurrentState();

73

74

/**

75

* Reopens the format with a previous checkpoint state

76

* @param split The file input split to read

77

* @param state The checkpoint state to restore

78

*/

79

public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException;

80

}

81

```

82

83

**Usage Example:**

84

85

```java

86

import org.apache.flink.api.java.ExecutionEnvironment;

87

import org.apache.flink.api.java.DataSet;

88

import org.apache.flink.api.java.io.AvroInputFormat;

89

import org.apache.flink.core.fs.Path;

90

91

// Define your Avro record class

92

public class User {

93

public String name;

94

public int age;

95

public String email;

96

}

97

98

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

99

100

// Create AvroInputFormat

101

AvroInputFormat<User> avroInput = new AvroInputFormat<>(

102

new Path("hdfs://path/to/users.avro"),

103

User.class

104

);

105

106

// Configure for better performance

107

avroInput.setReuseAvroValue(true);

108

109

// Read the data

110

DataSet<User> users = env.createInput(avroInput);

111

users.print();

112

```

113

114

### AvroOutputFormat

115

116

Writes Flink DataSets to Apache Avro files with automatic schema generation or custom schema support.

117

118

```java { .api }

119

/**

120

* Output format for writing Avro files in Flink batch jobs

121

* @param <E> The type of records to write

122

*/

123

public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {

124

125

/**

126

* Creates an AvroOutputFormat with file path and record type

127

* @param filePath Path where the Avro file will be written

128

* @param type Class representing the record type to write

129

*/

130

public AvroOutputFormat(Path filePath, Class<E> type);

131

132

/**

133

* Creates an AvroOutputFormat with only record type (path set later)

134

* @param type Class representing the record type to write

135

*/

136

public AvroOutputFormat(Class<E> type);

137

138

/**

139

* Sets a custom Avro schema to use for writing

140

* @param schema The Avro schema to use

141

*/

142

public void setSchema(Schema schema);

143

144

/**

145

* Writes a record to the output

146

* @param record The record to write

147

*/

148

public void writeRecord(E record) throws IOException;

149

150

/**

151

* Opens the output format for writing

152

* @param taskNumber The number of the parallel task

153

* @param numTasks The total number of parallel tasks

154

*/

155

public void open(int taskNumber, int numTasks) throws IOException;

156

157

/**

158

* Closes the output format and flushes any remaining data

159

*/

160

public void close() throws IOException;

161

}

162

```

163

164

**Usage Example:**

165

166

```java

167

import org.apache.flink.api.java.io.AvroOutputFormat;

168

import org.apache.avro.Schema;

169

import org.apache.avro.SchemaBuilder;

170

171

// Create output format

172

AvroOutputFormat<User> avroOutput = new AvroOutputFormat<>(

173

new Path("hdfs://path/to/output.avro"),

174

User.class

175

);

176

177

// Optional: Set custom schema

178

Schema customSchema = SchemaBuilder.record("User")

179

.fields()

180

.name("name").type().stringType().noDefault()

181

.name("age").type().intType().noDefault()

182

.name("email").type().stringType().noDefault()

183

.endRecord();

184

avroOutput.setSchema(customSchema);

185

186

// Write data

187

DataSet<User> users = // ... your data

188

users.output(avroOutput);

189

```

190

191

### DataInputDecoder

192

193

Low-level Avro decoder for reading from Java DataInput streams.

194

195

```java { .api }

196

/**

197

* Avro decoder that reads from Java DataInput streams

198

*/

199

public class DataInputDecoder extends org.apache.avro.io.Decoder {

200

201

// Note: Uses package-level constructor in actual implementation

202

203

/**

204

* Sets the input data source

205

* @param in DataInput stream to read from

206

*/

207

public void setIn(DataInput in);

208

209

/**

210

* Reads null value

211

*/

212

public void readNull() throws IOException;

213

214

/**

215

* Reads a boolean value

216

* @return The boolean value

217

*/

218

public boolean readBoolean() throws IOException;

219

220

/**

221

* Reads an integer value

222

* @return The integer value

223

*/

224

public int readInt() throws IOException;

225

226

/**

227

* Reads a long value

228

* @return The long value

229

*/

230

public long readLong() throws IOException;

231

232

/**

233

* Reads a float value

234

* @return The float value

235

*/

236

public float readFloat() throws IOException;

237

238

/**

239

* Reads a double value

240

* @return The double value

241

*/

242

public double readDouble() throws IOException;

243

244

/**

245

* Reads variable-length bytes

246

* @param old ByteBuffer to reuse (may be null)

247

* @return ByteBuffer containing the bytes

248

*/

249

public ByteBuffer readBytes(ByteBuffer old) throws IOException;

250

251

/**

252

* Reads an enum value

253

* @return The enum ordinal

254

*/

255

public int readEnum() throws IOException;

256

257

/**

258

* Reads fixed-length bytes

259

* @param bytes Destination byte array

260

* @param start Starting offset in the array

261

* @param length Number of bytes to read

262

*/

263

public void readFixed(byte[] bytes, int start, int length) throws IOException;

264

265

/**

266

* Skips fixed-length bytes

267

* @param length Number of bytes to skip

268

*/

269

public void skipFixed(int length) throws IOException;

270

271

/**

272

* Skips variable-length bytes

273

*/

274

public void skipBytes() throws IOException;

275

276

/**

277

* Reads UTF-8 string

278

* @param old Utf8 object to reuse (may be null)

279

* @return UTF-8 string

280

*/

281

public Utf8 readString(Utf8 old) throws IOException;

282

283

/**

284

* Reads a string value

285

* @return The string value

286

*/

287

public String readString() throws IOException;

288

289

/**

290

* Skips string value

291

*/

292

public void skipString() throws IOException;

293

294

/**

295

* Starts reading an array

296

* @return Number of elements in the array

297

*/

298

public long readArrayStart() throws IOException;

299

300

/**

301

* Reads next array element count

302

* @return Number of remaining elements

303

*/

304

public long arrayNext() throws IOException;

305

306

/**

307

* Skips entire array

308

* @return Number of elements skipped

309

*/

310

public long skipArray() throws IOException;

311

312

/**

313

* Starts reading a map

314

* @return Number of entries in the map

315

*/

316

public long readMapStart() throws IOException;

317

318

/**

319

* Reads next map element count

320

* @return Number of remaining entries

321

*/

322

public long mapNext() throws IOException;

323

324

/**

325

* Skips entire map

326

* @return Number of entries skipped

327

*/

328

public long skipMap() throws IOException;

329

330

/**

331

* Reads union index

332

* @return Union branch index

333

*/

334

public int readIndex() throws IOException;

335

336

/**

337

* Utility method for reading variable-length long count

338

* @param in DataInput stream to read from

339

* @return Variable-length long value

340

*/

341

public static long readVarLongCount(DataInput in) throws IOException;

342

}

343

```

344

345

### DataOutputEncoder

346

347

Low-level Avro encoder for writing to Java DataOutput streams.

348

349

```java { .api }

350

/**

351

* Avro encoder that writes to Java DataOutput streams

352

*/

353

public class DataOutputEncoder extends org.apache.avro.io.Encoder implements Serializable {

354

355

// Note: Uses package-level constructor in actual implementation

356

357

/**

358

* Sets the output data destination

359

* @param out DataOutput stream to write to

360

*/

361

public void setOut(DataOutput out);

362

363

/**

364

* Writes a boolean value

365

* @param b The boolean value to write

366

*/

367

public void writeBoolean(boolean b) throws IOException;

368

369

/**

370

* Writes an integer value

371

* @param n The integer value to write

372

*/

373

public void writeInt(int n) throws IOException;

374

375

/**

376

* Writes a long value

377

* @param n The long value to write

378

*/

379

public void writeLong(long n) throws IOException;

380

381

/**

382

* Flushes the output (no-op implementation)

383

*/

384

public void flush() throws IOException;

385

386

/**

387

* Writes null value

388

*/

389

public void writeNull() throws IOException;

390

391

/**

392

* Writes a float value

393

* @param f The float value to write

394

*/

395

public void writeFloat(float f) throws IOException;

396

397

/**

398

* Writes a double value

399

* @param d The double value to write

400

*/

401

public void writeDouble(double d) throws IOException;

402

403

/**

404

* Writes an enum value

405

* @param e The enum ordinal to write

406

*/

407

public void writeEnum(int e) throws IOException;

408

409

/**

410

* Writes fixed-length bytes

411

* @param bytes Byte array containing data

412

* @param start Starting offset in the array

413

* @param len Number of bytes to write

414

*/

415

public void writeFixed(byte[] bytes, int start, int len) throws IOException;

416

417

/**

418

* Writes variable-length bytes from ByteBuffer

419

* @param bytes ByteBuffer containing data

420

*/

421

public void writeBytes(ByteBuffer bytes) throws IOException;

422

423

/**

424

* Writes UTF-8 string

425

* @param utf8 The UTF-8 string to write

426

*/

427

public void writeString(Utf8 utf8) throws IOException;

428

429

/**

430

* Writes a string value

431

* @param str The string to write

432

*/

433

public void writeString(String str) throws IOException;

434

435

/**

436

* Writes variable-length bytes

437

* @param bytes Byte array containing data

438

* @param start Starting offset in the array

439

* @param len Number of bytes to write

440

*/

441

public void writeBytes(byte[] bytes, int start, int len) throws IOException;

442

443

/**

444

* Starts writing an array

445

*/

446

public void writeArrayStart() throws IOException;

447

448

/**

449

* Sets the item count for arrays and maps

450

* @param itemCount Number of items to write

451

*/

452

public void setItemCount(long itemCount) throws IOException;

453

454

/**

455

* Starts writing an individual item

456

*/

457

public void startItem() throws IOException;

458

459

/**

460

* Ends writing an array

461

*/

462

public void writeArrayEnd() throws IOException;

463

464

/**

465

* Starts writing a map

466

*/

467

public void writeMapStart() throws IOException;

468

469

/**

470

* Ends writing a map

471

*/

472

public void writeMapEnd() throws IOException;

473

474

/**

475

* Writes union index

476

* @param unionIndex The union branch index

477

*/

478

public void writeIndex(int unionIndex) throws IOException;

479

480

/**

481

* Utility method for writing variable-length long count

482

* @param out DataOutput stream to write to

483

* @param val Variable-length long value to write

484

*/

485

public static void writeVarLongCount(DataOutput out, long val) throws IOException;

486

}

487

```

488

489

### FSDataInputStreamWrapper

490

491

Wrapper to make Flink's FSDataInputStream compatible with Avro's SeekableInput interface.

492

493

```java { .api }

494

/**

495

* Wrapper for Flink FSDataInputStream to make it compatible with Avro SeekableInput

496

*/

497

public class FSDataInputStreamWrapper implements Closeable, SeekableInput {

498

499

/**

500

* Creates a wrapper around a Flink FSDataInputStream

501

* @param stream The FSDataInputStream to wrap

502

* @param len Length of the stream in bytes

503

*/

504

public FSDataInputStreamWrapper(FSDataInputStream stream, long len);

505

506

/**

507

* Returns the length of the stream

508

* @return Stream length in bytes

509

*/

510

public long length();

511

512

/**

513

* Reads data into a byte array

514

* @param b Destination byte array

515

* @param off Offset in the destination array

516

* @param len Maximum number of bytes to read

517

* @return Number of bytes actually read

518

*/

519

public int read(byte[] b, int off, int len) throws IOException;

520

521

/**

522

* Seeks to a specific position in the stream

523

* @param p Position to seek to

524

*/

525

public void seek(long p) throws IOException;

526

527

/**

528

* Returns the current position in the stream

529

* @return Current position

530

*/

531

public long tell() throws IOException;

532

533

/**

534

* Closes the underlying stream

535

*/

536

public void close() throws IOException;

537

}

538

```

539

540

## Common Types

541

542

```java { .api }

543

// Avro types

544

import org.apache.avro.Schema;

545

import org.apache.avro.io.Decoder;

546

import org.apache.avro.io.Encoder;

547

import org.apache.avro.file.SeekableInput;

548

import org.apache.avro.util.Utf8;

549

550

// Flink types

551

import org.apache.flink.api.common.io.FileInputFormat;

552

import org.apache.flink.api.common.io.FileOutputFormat;

553

import org.apache.flink.api.common.io.CheckpointableInputFormat;

554

import org.apache.flink.api.java.typeutils.ResultTypeQueryable;

555

import org.apache.flink.api.common.typeinfo.TypeInformation;

556

import org.apache.flink.api.java.tuple.Tuple2;

557

import org.apache.flink.core.io.InputSplit;

558

import org.apache.flink.core.fs.FSDataInputStream;

559

import org.apache.flink.core.fs.Path;

560

561

// Java types

562

import java.io.DataInput;

563

import java.io.DataOutput;

564

import java.io.Closeable;

565

import java.io.Serializable;

566

import java.io.IOException;

567

import java.nio.ByteBuffer;

568

```