or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-integration.mdgraph-algorithms.mdgraph-analytics.mdgraph-creation.mdgraph-transformations.mdindex.mduser-defined-functions.md

user-defined-functions.mddocs/

0

# User-Defined Functions

1

2

Abstract base classes for implementing custom graph processing functions with access to vertex values, edges, and neighborhoods. These classes provide Scala-friendly interfaces for the underlying Java Gelly function types.

3

4

## Capabilities

5

6

### EdgesFunction

7

8

Base class for processing edges of vertices without access to vertex values.

9

10

```scala { .api }

11

/**

12

* Abstract class for processing edges of a vertex.

13

* Provides Scala collections interface for edge iteration.

14

*/

15

abstract class EdgesFunction[K, EV, T] extends org.apache.flink.graph.EdgesFunction[K, EV, T] {

16

/**

17

* Process the edges of a vertex and emit results.

18

* @param edges iterable of (vertexId, edge) pairs representing the edges of the vertex

19

* @param out collector for emitting results

20

*/

21

def iterateEdges(edges: Iterable[(K, Edge[K, EV])], out: Collector[T]): Unit

22

}

23

```

24

25

### EdgesFunctionWithVertexValue

26

27

Base class for processing edges of vertices with access to the vertex value.

28

29

```scala { .api }

30

/**

31

* Abstract class for processing edges of a vertex with access to the vertex value.

32

* Provides Scala collections interface for edge iteration.

33

*/

34

abstract class EdgesFunctionWithVertexValue[K, VV, EV, T] extends

35

org.apache.flink.graph.EdgesFunctionWithVertexValue[K, VV, EV, T] {

36

/**

37

* Process the edges of a vertex with access to vertex value and emit results.

38

* @param v the vertex whose edges are being processed

39

* @param edges iterable of edges connected to the vertex

40

* @param out collector for emitting results

41

*/

42

@throws(classOf[Exception])

43

def iterateEdges(v: Vertex[K, VV], edges: Iterable[Edge[K, EV]], out: Collector[T]): Unit

44

}

45

```

46

47

### NeighborsFunction

48

49

Base class for processing neighbors (edges and adjacent vertices) of vertices.

50

51

```scala { .api }

52

/**

53

* Abstract class for processing neighbors of a vertex.

54

* Provides access to both edges and adjacent vertex information.

55

* Provides Scala collections interface for neighbor iteration.

56

*/

57

abstract class NeighborsFunction[K, VV, EV, T] extends

58

org.apache.flink.graph.NeighborsFunction[K, VV, EV, T] {

59

/**

60

* Process the neighbors of a vertex and emit results.

61

* @param neighbors iterable of (vertexId, edge, neighborVertex) tuples

62

* @param out collector for emitting results

63

*/

64

def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])],

65

out: Collector[T]): Unit

66

}

67

```

68

69

### NeighborsFunctionWithVertexValue

70

71

Base class for processing neighbors with access to the central vertex value.

72

73

```scala { .api }

74

/**

75

* Abstract class for processing neighbors of a vertex with access to the vertex value.

76

* Provides access to the central vertex, edges, and adjacent vertex information.

77

* Provides Scala collections interface for neighbor iteration.

78

*/

79

abstract class NeighborsFunctionWithVertexValue[K, VV, EV, T] extends

80

org.apache.flink.graph.NeighborsFunctionWithVertexValue[K, VV, EV, T] {

81

/**

82

* Process the neighbors of a vertex with access to vertex value and emit results.

83

* @param vertex the central vertex whose neighbors are being processed

84

* @param neighbors iterable of (edge, neighborVertex) pairs

85

* @param out collector for emitting results

86

*/

87

def iterateNeighbors(vertex: Vertex[K, VV],

88

neighbors: Iterable[(Edge[K, EV], Vertex[K, VV])],

89

out: Collector[T]): Unit

90

}

91

```

92

93

## Usage Patterns

94

95

### Simple Edge Processing

96

97

Process edges without considering vertex values:

98

99

```scala

100

import org.apache.flink.graph.scala._

101

import org.apache.flink.graph.{Edge, Vertex}

102

import org.apache.flink.util.Collector

103

104

// Count outgoing edges per vertex

105

class EdgeCounter extends EdgesFunction[Long, Double, (Long, Int)] {

106

override def iterateEdges(edges: Iterable[(Long, Edge[Long, Double])],

107

out: Collector[(Long, Int)]): Unit = {

108

val edgeList = edges.toList

109

if (edgeList.nonEmpty) {

110

val vertexId = edgeList.head._1

111

out.collect((vertexId, edgeList.size))

112

}

113

}

114

}

115

116

// Apply the function

117

val edgeCounts = graph.groupReduceOnEdges(new EdgeCounter(), EdgeDirection.OUT)

118

```

119

120

### Edge Processing with Vertex Context

121

122

Process edges while considering the vertex value:

123

124

```scala

125

// Filter edges based on vertex value threshold

126

class EdgeFilter extends EdgesFunctionWithVertexValue[Long, Double, Double, Edge[Long, Double]] {

127

override def iterateEdges(v: Vertex[Long, Double],

128

edges: Iterable[Edge[Long, Double]],

129

out: Collector[Edge[Long, Double]]): Unit = {

130

val threshold = v.getValue

131

for (edge <- edges) {

132

if (edge.getValue >= threshold * 0.5) {

133

out.collect(edge)

134

}

135

}

136

}

137

}

138

139

// Apply the function

140

val filteredEdges = graph.groupReduceOnEdges(new EdgeFilter(), EdgeDirection.OUT)

141

```

142

143

### Neighbor Analysis

144

145

Process neighbors to compute local graph properties:

146

147

```scala

148

// Compute average neighbor degree

149

class NeighborDegreeAnalysis extends NeighborsFunction[Long, Double, Double, (Long, Double)] {

150

override def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Double], Vertex[Long, Double])],

151

out: Collector[(Long, Double)]): Unit = {

152

val neighborList = neighbors.toList

153

if (neighborList.nonEmpty) {

154

val vertexId = neighborList.head._1

155

val neighborValues = neighborList.map(_._3.getValue)

156

val avgNeighborValue = neighborValues.sum / neighborValues.size

157

158

out.collect((vertexId, avgNeighborValue))

159

}

160

}

161

}

162

163

// Apply the function

164

val neighborAnalysis = graph.groupReduceOnNeighbors(new NeighborDegreeAnalysis(), EdgeDirection.ALL)

165

```

166

167

### Advanced Neighbor Processing with Vertex Context

168

169

Perform complex analysis considering both the central vertex and its neighbors:

170

171

```scala

172

// Compute local clustering-like metric

173

class LocalStructureAnalysis extends NeighborsFunctionWithVertexValue[Long, Double, Double, (Long, Double, Int, Double)] {

174

override def iterateNeighbors(vertex: Vertex[Long, Double],

175

neighbors: Iterable[(Edge[Long, Double], Vertex[Long, Double])],

176

out: Collector[(Long, Double, Int, Double)]): Unit = {

177

val neighborList = neighbors.toList

178

val vertexValue = vertex.getValue

179

val neighborCount = neighborList.size

180

181

if (neighborCount > 0) {

182

val avgEdgeWeight = neighborList.map(_._1.getValue).sum / neighborCount

183

val result = (vertex.getId, vertexValue, neighborCount, avgEdgeWeight)

184

out.collect(result)

185

}

186

}

187

}

188

189

// Apply the function

190

val structureAnalysis = graph.groupReduceOnNeighbors(

191

new LocalStructureAnalysis(),

192

EdgeDirection.ALL

193

)

194

```

195

196

## Advanced Usage Examples

197

198

### Multi-Output Functions

199

200

Functions can emit multiple results per vertex:

201

202

```scala

203

// Emit statistics for each neighbor relationship

204

class DetailedNeighborStats extends NeighborsFunctionWithVertexValue[Long, String, Double, (Long, String, String, Double)] {

205

override def iterateNeighbors(vertex: Vertex[Long, String],

206

neighbors: Iterable[(Edge[Long, Double], Vertex[Long, String])],

207

out: Collector[(Long, String, String, Double)]): Unit = {

208

for ((edge, neighbor) <- neighbors) {

209

val result = (vertex.getId, vertex.getValue, neighbor.getValue, edge.getValue)

210

out.collect(result)

211

}

212

}

213

}

214

```

215

216

### Conditional Processing

217

218

Implement conditional logic based on graph structure:

219

220

```scala

221

// Process high-degree vertices differently

222

class AdaptiveEdgeProcessor extends EdgesFunctionWithVertexValue[Long, Double, Double, (Long, String, Double)] {

223

override def iterateEdges(v: Vertex[Long, Double],

224

edges: Iterable[Edge[Long, Double]],

225

out: Collector[(Long, String, Double)]): Unit = {

226

val edgeList = edges.toList

227

val edgeCount = edgeList.size

228

229

val analysis = if (edgeCount > 10) {

230

// High-degree vertex: compute average

231

val avgWeight = edgeList.map(_.getValue).sum / edgeCount

232

(v.getId, "high-degree", avgWeight)

233

} else {

234

// Low-degree vertex: compute max

235

val maxWeight = if (edgeList.nonEmpty) edgeList.map(_.getValue).max else 0.0

236

(v.getId, "low-degree", maxWeight)

237

}

238

239

out.collect(analysis)

240

}

241

}

242

```

243

244

### Stateful Processing

245

246

Maintain state across processing within a single function call:

247

248

```scala

249

// Track edge patterns within neighborhood

250

class EdgePatternAnalyzer extends NeighborsFunction[Long, Double, Double, (Long, Map[String, Int])] {

251

override def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Double], Vertex[Long, Double])],

252

out: Collector[(Long, Map[String, Int])]): Unit = {

253

val neighborList = neighbors.toList

254

if (neighborList.nonEmpty) {

255

val vertexId = neighborList.head._1

256

257

// Categorize edges by weight ranges

258

val patterns = scala.collection.mutable.Map[String, Int]()

259

260

for ((_, edge, _) <- neighborList) {

261

val category = edge.getValue match {

262

case w if w < 0.3 => "weak"

263

case w if w < 0.7 => "medium"

264

case _ => "strong"

265

}

266

patterns(category) = patterns.getOrElse(category, 0) + 1

267

}

268

269

out.collect((vertexId, patterns.toMap))

270

}

271

}

272

}

273

```

274

275

## Function Design Best Practices

276

277

### Performance Optimization

278

279

- **Lazy Evaluation**: Use Scala's lazy collections when appropriate

280

- **Memory Management**: Avoid collecting large neighborhoods into memory unnecessarily

281

- **Type Specialization**: Use primitive types when possible to avoid boxing overhead

282

283

### Error Handling

284

285

- **Null Safety**: Check for null values in vertex and edge data

286

- **Empty Collections**: Handle cases where vertices have no edges or neighbors

287

- **Exception Management**: Use appropriate exception handling within functions

288

289

### Scala Idioms

290

291

- **Pattern Matching**: Use Scala's pattern matching for elegant conditional logic

292

- **Collection Operations**: Leverage Scala's rich collection API (map, filter, reduce, etc.)

293

- **Functional Style**: Prefer immutable data structures and functional transformations

294

295

```scala

296

// Example using Scala idioms

297

class ScalaIdiomsExample extends NeighborsFunctionWithVertexValue[Long, String, Double, (Long, Double)] {

298

override def iterateNeighbors(vertex: Vertex[Long, String],

299

neighbors: Iterable[(Edge[Long, Double], Vertex[Long, String])],

300

out: Collector[(Long, Double)]): Unit = {

301

val result = neighbors

302

.filter(_._1.getValue > 0.5) // Filter strong edges

303

.map(_._2.getValue.length.toDouble) // Map to vertex name lengths

304

.reduceOption(_ + _) // Sum with safe reduction

305

.getOrElse(0.0) // Default value

306

307

out.collect((vertex.getId, result))

308

}

309

}

310

```

311

312

These user-defined functions provide the foundation for implementing custom graph algorithms and analytics while maintaining the performance and scalability benefits of Flink's distributed processing engine.