or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

changelog-processing.mddatastream-integration.mdenvironment-setup.mdimplicit-conversions.mdindex.mdstatement-sets.mdtable-operations.md

datastream-integration.mddocs/

0

# DataStream Integration

1

2

## Overview

3

4

The DataStream integration capabilities enable seamless conversion between Flink's DataStream API and Table API. This allows developers to leverage both stream processing paradigms within the same application.

5

6

## Core API

7

8

### DataStream to Table Conversion

9

10

```scala { .api }

11

trait StreamTableEnvironment {

12

def fromDataStream[T](dataStream: DataStream[T]): Table

13

def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table

14

}

15

```

16

17

### Table to DataStream Conversion

18

19

```scala { .api }

20

trait StreamTableEnvironment {

21

def toDataStream(table: Table): DataStream[Row]

22

def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]

23

def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T]

24

}

25

```

26

27

### Conversion Utility Classes

28

29

```scala { .api }

30

class DataStreamConversions[T](dataStream: DataStream[T]) {

31

def toTable(tableEnv: StreamTableEnvironment): Table

32

def toTable(tableEnv: StreamTableEnvironment, schema: Schema): Table

33

}

34

35

class TableConversions(table: Table) {

36

def toDataStream: DataStream[Row]

37

def toDataStream[T](targetClass: Class[T]): DataStream[T]

38

def toDataStream[T](targetDataType: AbstractDataType[_]): DataStream[T]

39

}

40

```

41

42

## DataStream to Table Conversion

43

44

### Automatic Schema Derivation

45

46

```scala

47

case class User(name: String, age: Int, email: String)

48

49

val userStream: DataStream[User] = env.fromCollection(Seq(

50

User("Alice", 25, "alice@example.com"),

51

User("Bob", 30, "bob@example.com")

52

))

53

54

// Automatic schema derivation from case class

55

val userTable: Table = tableEnv.fromDataStream(userStream)

56

```

57

58

### Custom Schema Definition

59

60

```scala

61

import org.apache.flink.table.api.Schema

62

import org.apache.flink.table.types.DataTypes

63

64

val schema = Schema.newBuilder()

65

.column("user_name", DataTypes.STRING())

66

.column("user_age", DataTypes.INT())

67

.column("user_email", DataTypes.STRING())

68

.column("process_time", DataTypes.TIMESTAMP_LTZ(3))

69

.columnByExpression("process_time", "PROCTIME()")

70

.build()

71

72

val userTable: Table = tableEnv.fromDataStream(userStream, schema)

73

```

74

75

### Event-Time Processing

76

77

```scala

78

val schema = Schema.newBuilder()

79

.column("name", DataTypes.STRING())

80

.column("age", DataTypes.INT())

81

.column("timestamp", DataTypes.TIMESTAMP_LTZ(3))

82

.columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))

83

.watermark("rowtime", "SOURCE_WATERMARK()")

84

.build()

85

86

val eventTimeTable: Table = tableEnv.fromDataStream(timestampedStream, schema)

87

```

88

89

### Using Implicit Conversions

90

91

```scala

92

import org.apache.flink.table.api.bridge.scala._

93

94

// Implicit conversion from DataStream to DataStreamConversions

95

val userTable: Table = userStream.toTable(tableEnv)

96

97

// With custom schema

98

val userTableWithSchema: Table = userStream.toTable(tableEnv, schema)

99

```

100

101

## Table to DataStream Conversion

102

103

### Basic Conversion to Row

104

105

```scala

106

val resultTable: Table = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 25")

107

108

// Convert to DataStream[Row]

109

val resultStream: DataStream[Row] = tableEnv.toDataStream(resultTable)

110

111

// Using implicit conversion

112

val resultStream2: DataStream[Row] = resultTable.toDataStream

113

```

114

115

### Conversion to Typed DataStream

116

117

```scala

118

case class UserResult(name: String, age: Int)

119

120

// Convert to specific type

121

val typedStream: DataStream[UserResult] = tableEnv.toDataStream(resultTable, classOf[UserResult])

122

123

// Using TableConversions

124

val typedStream2: DataStream[UserResult] = resultTable.toDataStream(classOf[UserResult])

125

```

126

127

### Custom Data Type Conversion

128

129

```scala

130

import org.apache.flink.table.types.DataTypes

131

132

val targetDataType = DataTypes.ROW(

133

DataTypes.FIELD("name", DataTypes.STRING()),

134

DataTypes.FIELD("age", DataTypes.INT()),

135

DataTypes.FIELD("category", DataTypes.STRING())

136

)

137

138

val customStream: DataStream[Row] = tableEnv.toDataStream(resultTable, targetDataType)

139

```

140

141

## Advanced Schema Handling

142

143

### Complex Types

144

145

```scala

146

case class Address(street: String, city: String)

147

case class UserWithAddress(name: String, age: Int, address: Address)

148

149

val complexStream: DataStream[UserWithAddress] = // ... source

150

151

// Flink automatically flattens nested structures

152

val complexTable: Table = tableEnv.fromDataStream(complexStream)

153

// Results in columns: name, age, address.street, address.city

154

```

155

156

### Array and Map Types

157

158

```scala

159

case class UserWithTags(name: String, tags: Array[String], metadata: Map[String, String])

160

161

val userWithTagsStream: DataStream[UserWithTags] = // ... source

162

val tagsTable: Table = tableEnv.fromDataStream(userWithTagsStream)

163

164

// Query array elements

165

val queryResult = tableEnv.sqlQuery("""

166

SELECT name, tags[1] as first_tag, metadata['category'] as category

167

FROM user_tags

168

""")

169

```

170

171

### Timestamp and Watermark Handling

172

173

```scala

174

// DataStream with timestamps and watermarks

175

val timestampedStream: DataStream[User] = userStream

176

.assignTimestampsAndWatermarks(

177

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))

178

.withTimestampAssigner((user, _) => user.timestamp)

179

)

180

181

// Schema that propagates watermarks

182

val schema = Schema.newBuilder()

183

.columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))

184

.watermark("rowtime", "SOURCE_WATERMARK()")

185

.build()

186

187

val eventTimeTable: Table = tableEnv.fromDataStream(timestampedStream, schema)

188

```

189

190

## Type System Integration

191

192

### Supported Scala Types

193

194

The bridge supports conversion of these Scala types:

195

196

- **Primitive types**: Int, Long, Float, Double, Boolean, String

197

- **Case classes**: Automatically mapped to Row types

198

- **Collections**: Array, List, Map (converted to Flink array/map types)

199

- **Option types**: Handled as nullable fields

200

- **Timestamps**: java.time.LocalDateTime, java.sql.Timestamp

201

202

### Type Information Requirements

203

204

```scala

205

import org.apache.flink.api.common.typeinfo.TypeInformation

206

import org.apache.flink.api.scala._

207

208

case class CustomType(id: Long, data: String)

209

210

// Ensure TypeInformation is available

211

val customStream: DataStream[CustomType] = env.fromCollection(data)

212

val customTable: Table = tableEnv.fromDataStream(customStream)

213

```

214

215

## Error Handling

216

217

Common exceptions and their causes:

218

219

```scala

220

try {

221

val table = tableEnv.fromDataStream(dataStream)

222

} catch {

223

case e: ValidationException =>

224

// Schema validation failed or unsupported type

225

case e: TableException =>

226

// Table creation or conversion error

227

}

228

```

229

230

## Performance Considerations

231

232

1. **Schema Caching**: Reuse Schema objects when possible

233

2. **Type Conversion**: Direct type conversions are more efficient than Row conversions

234

3. **Memory Usage**: Large case classes may impact memory usage

235

4. **Serialization**: Ensure custom types are serializable

236

237

## Best Practices

238

239

1. **Use Case Classes**: Leverage Scala case classes for type safety

240

2. **Define Schemas Explicitly**: For production use, explicitly define schemas

241

3. **Handle Time Correctly**: Use proper time attributes for event-time processing

242

4. **Validate Types**: Ensure all types are supported by Flink's type system

243

5. **Consider Performance**: Choose appropriate conversion methods based on use case