or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-integration.mdgraph-algorithms.mdgraph-analytics.mdgraph-creation.mdgraph-transformations.mdindex.mduser-defined-functions.md

graph-transformations.mddocs/

0

# Graph Transformations

1

2

Type-safe operations for transforming vertex and edge values, filtering graph elements, and modifying graph structure while preserving the distributed processing benefits of Flink.

3

4

## Capabilities

5

6

### Vertex Transformations

7

8

Apply functions to vertex values while preserving graph topology.

9

10

```scala { .api }

11

/**

12

* Apply a function to the attribute of each vertex in the graph.

13

* @param mapper the map function to apply.

14

* @return a new graph

15

*/

16

def mapVertices[NV](mapper: MapFunction[Vertex[K, VV], NV]): Graph[K, NV, EV]

17

18

/**

19

* Apply a function to the attribute of each vertex in the graph.

20

* @param fun the map function to apply.

21

* @return a new graph

22

*/

23

def mapVertices[NV](fun: Vertex[K, VV] => NV): Graph[K, NV, EV]

24

```

25

26

### Edge Transformations

27

28

Apply functions to edge values while preserving graph connectivity.

29

30

```scala { .api }

31

/**

32

* Apply a function to the attribute of each edge in the graph.

33

* @param mapper the map function to apply.

34

* @return a new graph

35

*/

36

def mapEdges[NV](mapper: MapFunction[Edge[K, EV], NV]): Graph[K, VV, NV]

37

38

/**

39

* Apply a function to the attribute of each edge in the graph.

40

* @param fun the map function to apply.

41

* @return a new graph

42

*/

43

def mapEdges[NV](fun: Edge[K, EV] => NV): Graph[K, VV, NV]

44

```

45

46

### ID and Value Translation

47

48

Transform vertex and edge identifiers or values using translation functions.

49

50

```scala { .api }

51

/**

52

* Translate vertex and edge IDs using the given TranslateFunction.

53

* @param translator implements conversion from K to NEW

54

* @return graph with translated vertex and edge IDs

55

*/

56

def translateGraphIds[NEW](translator: TranslateFunction[K, NEW]): Graph[NEW, VV, EV]

57

58

/**

59

* Translate vertex and edge IDs using the given function.

60

* @param fun implements conversion from K to NEW

61

* @return graph with translated vertex and edge IDs

62

*/

63

def translateGraphIds[NEW](fun: (K, NEW) => NEW): Graph[NEW, VV, EV]

64

65

/**

66

* Translate vertex values using the given TranslateFunction.

67

* @param translator implements conversion from VV to NEW

68

* @return graph with translated vertex values

69

*/

70

def translateVertexValues[NEW](translator: TranslateFunction[VV, NEW]): Graph[K, NEW, EV]

71

72

/**

73

* Translate vertex values using the given function.

74

* @param fun implements conversion from VV to NEW

75

* @return graph with translated vertex values

76

*/

77

def translateVertexValues[NEW](fun: (VV, NEW) => NEW): Graph[K, NEW, EV]

78

79

/**

80

* Translate edge values using the given TranslateFunction.

81

* @param translator implements conversion from EV to NEW

82

* @return graph with translated edge values

83

*/

84

def translateEdgeValues[NEW](translator: TranslateFunction[EV, NEW]): Graph[K, VV, NEW]

85

86

/**

87

* Translate edge values using the given function.

88

* @param fun implements conversion from EV to NEW

89

* @return graph with translated edge values

90

*/

91

def translateEdgeValues[NEW](fun: (EV, NEW) => NEW): Graph[K, VV, NEW]

92

```

93

94

### Graph Filtering

95

96

Create subgraphs by applying filter predicates to vertices and edges.

97

98

```scala { .api }

99

/**

100

* Apply filtering functions to the graph and return a sub-graph that

101

* satisfies the predicates for both vertices and edges.

102

* @param vertexFilter the filter function for vertices.

103

* @param edgeFilter the filter function for edges.

104

* @return the resulting sub-graph.

105

*/

106

def subgraph(vertexFilter: FilterFunction[Vertex[K, VV]],

107

edgeFilter: FilterFunction[Edge[K, EV]]): Graph[K, VV, EV]

108

109

/**

110

* Apply filtering functions to the graph and return a sub-graph that

111

* satisfies the predicates for both vertices and edges.

112

* @param vertexFilterFun the filter function for vertices.

113

* @param edgeFilterFun the filter function for edges.

114

* @return the resulting sub-graph.

115

*/

116

def subgraph(vertexFilterFun: Vertex[K, VV] => Boolean,

117

edgeFilterFun: Edge[K, EV] => Boolean): Graph[K, VV, EV]

118

119

/**

120

* Apply a filtering function to the graph and return a sub-graph that

121

* satisfies the predicates only for the vertices.

122

* @param vertexFilter the filter function for vertices.

123

* @return the resulting sub-graph.

124

*/

125

def filterOnVertices(vertexFilter: FilterFunction[Vertex[K, VV]]): Graph[K, VV, EV]

126

127

/**

128

* Apply a filtering function to the graph and return a sub-graph that

129

* satisfies the predicates only for the vertices.

130

* @param vertexFilterFun the filter function for vertices.

131

* @return the resulting sub-graph.

132

*/

133

def filterOnVertices(vertexFilterFun: Vertex[K, VV] => Boolean): Graph[K, VV, EV]

134

135

/**

136

* Apply a filtering function to the graph and return a sub-graph that

137

* satisfies the predicates only for the edges.

138

* @param edgeFilter the filter function for edges.

139

* @return the resulting sub-graph.

140

*/

141

def filterOnEdges(edgeFilter: FilterFunction[Edge[K, EV]]): Graph[K, VV, EV]

142

143

/**

144

* Apply a filtering function to the graph and return a sub-graph that

145

* satisfies the predicates only for the edges.

146

* @param edgeFilterFun the filter function for edges.

147

* @return the resulting sub-graph.

148

*/

149

def filterOnEdges(edgeFilterFun: Edge[K, EV] => Boolean): Graph[K, VV, EV]

150

```

151

152

### Graph Structure Modifications

153

154

Transform the structural properties of the graph.

155

156

```scala { .api }

157

/**

158

* This operation adds all inverse-direction edges to the graph.

159

* @return the undirected graph.

160

*/

161

def getUndirected(): Graph[K, VV, EV]

162

163

/**

164

* Reverse the direction of the edges in the graph

165

* @return a new graph with all edges reversed

166

*/

167

def reverse(): Graph[K, VV, EV]

168

```

169

170

**Usage Examples:**

171

172

```scala

173

import org.apache.flink.graph.scala._

174

import org.apache.flink.graph.{Edge, Vertex}

175

import org.apache.flink.api.scala._

176

177

val env = ExecutionEnvironment.getExecutionEnvironment

178

179

// Create sample graph

180

val vertices = env.fromCollection(Seq(

181

new Vertex(1L, "Alice"),

182

new Vertex(2L, "Bob"),

183

new Vertex(3L, "Charlie_longname")

184

))

185

186

val edges = env.fromCollection(Seq(

187

new Edge(1L, 2L, 0.5),

188

new Edge(2L, 3L, 0.3),

189

new Edge(1L, 3L, 0.8)

190

))

191

192

val graph = Graph.fromDataSet(vertices, edges, env)

193

194

// Vertex transformations

195

val upperCaseGraph = graph.mapVertices(v => v.getValue.toUpperCase)

196

val lengthGraph = graph.mapVertices(_.getValue.length)

197

198

// Edge transformations

199

val doubledWeights = graph.mapEdges(e => e.getValue * 2.0)

200

val binaryEdges = graph.mapEdges(e => if (e.getValue > 0.5) 1 else 0)

201

202

// Filtering operations

203

val longNameVertices = graph.filterOnVertices(_.getValue.length > 5)

204

val strongEdges = graph.filterOnEdges(_.getValue > 0.4)

205

val filteredSubgraph = graph.subgraph(

206

vertexFilterFun = _.getValue.length <= 10,

207

edgeFilterFun = _.getValue >= 0.3

208

)

209

210

// Structure modifications

211

val undirectedGraph = graph.getUndirected()

212

val reversedGraph = graph.reverse()

213

214

// ID translation (e.g., Long to String)

215

val stringIdGraph = graph.translateGraphIds[String]((longId, reuse) => longId.toString)

216

217

// Value translation

218

val intValueGraph = graph.translateVertexValues[Int]((stringValue, reuse) => stringValue.length)

219

```

220

221

### Advanced Filtering Patterns

222

223

More sophisticated filtering examples for complex graph analysis scenarios.

224

225

**Usage Examples:**

226

227

```scala

228

// Multi-criteria vertex filtering

229

val complexVertexFilter = graph.filterOnVertices { vertex =>

230

val name = vertex.getValue

231

name.length > 3 && name.startsWith("A")

232

}

233

234

// Edge filtering based on vertex relationships

235

val edgeFilterWithThreshold = graph.filterOnEdges { edge =>

236

edge.getValue > 0.5 && edge.getSource != edge.getTarget

237

}

238

239

// Combined subgraph filtering

240

val analyticsSubgraph = graph.subgraph(

241

vertexFilterFun = vertex => {

242

val name = vertex.getValue

243

name.contains("a") || name.contains("A")

244

},

245

edgeFilterFun = edge => {

246

edge.getValue >= 0.4

247

}

248

)

249

250

// Chaining transformations

251

val processedGraph = graph

252

.mapVertices(_.getValue.toLowerCase.trim)

253

.filterOnVertices(_.getValue.nonEmpty)

254

.mapEdges(e => math.round(e.getValue * 100) / 100.0)

255

.filterOnEdges(_.getValue > 0.0)

256

```

257

258

### Type Safety and Performance

259

260

The transformation operations maintain full type safety throughout the pipeline:

261

262

- **Type preservation**: Operations maintain the type relationships between K, VV, and EV

263

- **Lazy evaluation**: Transformations are lazily evaluated within Flink's execution model

264

- **Distributed processing**: All operations are automatically distributed across the Flink cluster

265

- **Functional composition**: Operations can be chained together for complex processing pipelines

266

267

All transformation methods return new Graph instances, preserving immutability principles while enabling efficient distributed processing.