0
# User-Defined Functions
1
2
Abstract base classes for implementing custom graph processing functions with access to vertex values, edges, and neighborhoods. These classes provide Scala-friendly interfaces for the underlying Java Gelly function types.
3
4
## Capabilities
5
6
### EdgesFunction
7
8
Base class for processing edges of vertices without access to vertex values.
9
10
```scala { .api }
11
/**
12
* Abstract class for processing edges of a vertex.
13
* Provides Scala collections interface for edge iteration.
14
*/
15
abstract class EdgesFunction[K, EV, T] extends org.apache.flink.graph.EdgesFunction[K, EV, T] {
16
/**
17
* Process the edges of a vertex and emit results.
18
* @param edges iterable of (vertexId, edge) pairs representing the edges of the vertex
19
* @param out collector for emitting results
20
*/
21
def iterateEdges(edges: Iterable[(K, Edge[K, EV])], out: Collector[T]): Unit
22
}
23
```
24
25
### EdgesFunctionWithVertexValue
26
27
Base class for processing edges of vertices with access to the vertex value.
28
29
```scala { .api }
30
/**
31
* Abstract class for processing edges of a vertex with access to the vertex value.
32
* Provides Scala collections interface for edge iteration.
33
*/
34
abstract class EdgesFunctionWithVertexValue[K, VV, EV, T] extends
35
org.apache.flink.graph.EdgesFunctionWithVertexValue[K, VV, EV, T] {
36
/**
37
* Process the edges of a vertex with access to vertex value and emit results.
38
* @param v the vertex whose edges are being processed
39
* @param edges iterable of edges connected to the vertex
40
* @param out collector for emitting results
41
*/
42
@throws(classOf[Exception])
43
def iterateEdges(v: Vertex[K, VV], edges: Iterable[Edge[K, EV]], out: Collector[T]): Unit
44
}
45
```
46
47
### NeighborsFunction
48
49
Base class for processing neighbors (edges and adjacent vertices) of vertices.
50
51
```scala { .api }
52
/**
53
* Abstract class for processing neighbors of a vertex.
54
* Provides access to both edges and adjacent vertex information.
55
* Provides Scala collections interface for neighbor iteration.
56
*/
57
abstract class NeighborsFunction[K, VV, EV, T] extends
58
org.apache.flink.graph.NeighborsFunction[K, VV, EV, T] {
59
/**
60
* Process the neighbors of a vertex and emit results.
61
* @param neighbors iterable of (vertexId, edge, neighborVertex) tuples
62
* @param out collector for emitting results
63
*/
64
def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])],
65
out: Collector[T]): Unit
66
}
67
```
68
69
### NeighborsFunctionWithVertexValue
70
71
Base class for processing neighbors with access to the central vertex value.
72
73
```scala { .api }
74
/**
75
* Abstract class for processing neighbors of a vertex with access to the vertex value.
76
* Provides access to the central vertex, edges, and adjacent vertex information.
77
* Provides Scala collections interface for neighbor iteration.
78
*/
79
abstract class NeighborsFunctionWithVertexValue[K, VV, EV, T] extends
80
org.apache.flink.graph.NeighborsFunctionWithVertexValue[K, VV, EV, T] {
81
/**
82
* Process the neighbors of a vertex with access to vertex value and emit results.
83
* @param vertex the central vertex whose neighbors are being processed
84
* @param neighbors iterable of (edge, neighborVertex) pairs
85
* @param out collector for emitting results
86
*/
87
def iterateNeighbors(vertex: Vertex[K, VV],
88
neighbors: Iterable[(Edge[K, EV], Vertex[K, VV])],
89
out: Collector[T]): Unit
90
}
91
```
92
93
## Usage Patterns
94
95
### Simple Edge Processing
96
97
Process edges without considering vertex values:
98
99
```scala
100
import org.apache.flink.graph.scala._
101
import org.apache.flink.graph.{Edge, Vertex}
102
import org.apache.flink.util.Collector
103
104
// Count outgoing edges per vertex
105
class EdgeCounter extends EdgesFunction[Long, Double, (Long, Int)] {
106
override def iterateEdges(edges: Iterable[(Long, Edge[Long, Double])],
107
out: Collector[(Long, Int)]): Unit = {
108
val edgeList = edges.toList
109
if (edgeList.nonEmpty) {
110
val vertexId = edgeList.head._1
111
out.collect((vertexId, edgeList.size))
112
}
113
}
114
}
115
116
// Apply the function
117
val edgeCounts = graph.groupReduceOnEdges(new EdgeCounter(), EdgeDirection.OUT)
118
```
119
120
### Edge Processing with Vertex Context
121
122
Process edges while considering the vertex value:
123
124
```scala
125
// Filter edges based on vertex value threshold
126
class EdgeFilter extends EdgesFunctionWithVertexValue[Long, Double, Double, Edge[Long, Double]] {
127
override def iterateEdges(v: Vertex[Long, Double],
128
edges: Iterable[Edge[Long, Double]],
129
out: Collector[Edge[Long, Double]]): Unit = {
130
val threshold = v.getValue
131
for (edge <- edges) {
132
if (edge.getValue >= threshold * 0.5) {
133
out.collect(edge)
134
}
135
}
136
}
137
}
138
139
// Apply the function
140
val filteredEdges = graph.groupReduceOnEdges(new EdgeFilter(), EdgeDirection.OUT)
141
```
142
143
### Neighbor Analysis
144
145
Process neighbors to compute local graph properties:
146
147
```scala
148
// Compute average neighbor degree
149
class NeighborDegreeAnalysis extends NeighborsFunction[Long, Double, Double, (Long, Double)] {
150
override def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Double], Vertex[Long, Double])],
151
out: Collector[(Long, Double)]): Unit = {
152
val neighborList = neighbors.toList
153
if (neighborList.nonEmpty) {
154
val vertexId = neighborList.head._1
155
val neighborValues = neighborList.map(_._3.getValue)
156
val avgNeighborValue = neighborValues.sum / neighborValues.size
157
158
out.collect((vertexId, avgNeighborValue))
159
}
160
}
161
}
162
163
// Apply the function
164
val neighborAnalysis = graph.groupReduceOnNeighbors(new NeighborDegreeAnalysis(), EdgeDirection.ALL)
165
```
166
167
### Advanced Neighbor Processing with Vertex Context
168
169
Perform complex analysis considering both the central vertex and its neighbors:
170
171
```scala
172
// Compute local clustering-like metric
173
class LocalStructureAnalysis extends NeighborsFunctionWithVertexValue[Long, Double, Double, (Long, Double, Int, Double)] {
174
override def iterateNeighbors(vertex: Vertex[Long, Double],
175
neighbors: Iterable[(Edge[Long, Double], Vertex[Long, Double])],
176
out: Collector[(Long, Double, Int, Double)]): Unit = {
177
val neighborList = neighbors.toList
178
val vertexValue = vertex.getValue
179
val neighborCount = neighborList.size
180
181
if (neighborCount > 0) {
182
val avgEdgeWeight = neighborList.map(_._1.getValue).sum / neighborCount
183
val result = (vertex.getId, vertexValue, neighborCount, avgEdgeWeight)
184
out.collect(result)
185
}
186
}
187
}
188
189
// Apply the function
190
val structureAnalysis = graph.groupReduceOnNeighbors(
191
new LocalStructureAnalysis(),
192
EdgeDirection.ALL
193
)
194
```
195
196
## Advanced Usage Examples
197
198
### Multi-Output Functions
199
200
Functions can emit multiple results per vertex:
201
202
```scala
203
// Emit statistics for each neighbor relationship
204
class DetailedNeighborStats extends NeighborsFunctionWithVertexValue[Long, String, Double, (Long, String, String, Double)] {
205
override def iterateNeighbors(vertex: Vertex[Long, String],
206
neighbors: Iterable[(Edge[Long, Double], Vertex[Long, String])],
207
out: Collector[(Long, String, String, Double)]): Unit = {
208
for ((edge, neighbor) <- neighbors) {
209
val result = (vertex.getId, vertex.getValue, neighbor.getValue, edge.getValue)
210
out.collect(result)
211
}
212
}
213
}
214
```
215
216
### Conditional Processing
217
218
Implement conditional logic based on graph structure:
219
220
```scala
221
// Process high-degree vertices differently
222
class AdaptiveEdgeProcessor extends EdgesFunctionWithVertexValue[Long, Double, Double, (Long, String, Double)] {
223
override def iterateEdges(v: Vertex[Long, Double],
224
edges: Iterable[Edge[Long, Double]],
225
out: Collector[(Long, String, Double)]): Unit = {
226
val edgeList = edges.toList
227
val edgeCount = edgeList.size
228
229
val analysis = if (edgeCount > 10) {
230
// High-degree vertex: compute average
231
val avgWeight = edgeList.map(_.getValue).sum / edgeCount
232
(v.getId, "high-degree", avgWeight)
233
} else {
234
// Low-degree vertex: compute max
235
val maxWeight = if (edgeList.nonEmpty) edgeList.map(_.getValue).max else 0.0
236
(v.getId, "low-degree", maxWeight)
237
}
238
239
out.collect(analysis)
240
}
241
}
242
```
243
244
### Stateful Processing
245
246
Maintain state across processing within a single function call:
247
248
```scala
249
// Track edge patterns within neighborhood
250
class EdgePatternAnalyzer extends NeighborsFunction[Long, Double, Double, (Long, Map[String, Int])] {
251
override def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Double], Vertex[Long, Double])],
252
out: Collector[(Long, Map[String, Int])]): Unit = {
253
val neighborList = neighbors.toList
254
if (neighborList.nonEmpty) {
255
val vertexId = neighborList.head._1
256
257
// Categorize edges by weight ranges
258
val patterns = scala.collection.mutable.Map[String, Int]()
259
260
for ((_, edge, _) <- neighborList) {
261
val category = edge.getValue match {
262
case w if w < 0.3 => "weak"
263
case w if w < 0.7 => "medium"
264
case _ => "strong"
265
}
266
patterns(category) = patterns.getOrElse(category, 0) + 1
267
}
268
269
out.collect((vertexId, patterns.toMap))
270
}
271
}
272
}
273
```
274
275
## Function Design Best Practices
276
277
### Performance Optimization
278
279
- **Lazy Evaluation**: Use Scala's lazy collections when appropriate
280
- **Memory Management**: Avoid collecting large neighborhoods into memory unnecessarily
281
- **Type Specialization**: Use primitive types when possible to avoid boxing overhead
282
283
### Error Handling
284
285
- **Null Safety**: Check for null values in vertex and edge data
286
- **Empty Collections**: Handle cases where vertices have no edges or neighbors
287
- **Exception Management**: Use appropriate exception handling within functions
288
289
### Scala Idioms
290
291
- **Pattern Matching**: Use Scala's pattern matching for elegant conditional logic
292
- **Collection Operations**: Leverage Scala's rich collection API (map, filter, reduce, etc.)
293
- **Functional Style**: Prefer immutable data structures and functional transformations
294
295
```scala
296
// Example using Scala idioms
297
class ScalaIdiomsExample extends NeighborsFunctionWithVertexValue[Long, String, Double, (Long, Double)] {
298
override def iterateNeighbors(vertex: Vertex[Long, String],
299
neighbors: Iterable[(Edge[Long, Double], Vertex[Long, String])],
300
out: Collector[(Long, Double)]): Unit = {
301
val result = neighbors
302
.filter(_._1.getValue > 0.5) // Filter strong edges
303
.map(_._2.getValue.length.toDouble) // Map to vertex name lengths
304
.reduceOption(_ + _) // Sum with safe reduction
305
.getOrElse(0.0) // Default value
306
307
out.collect((vertex.getId, result))
308
}
309
}
310
```
311
312
These user-defined functions provide the foundation for implementing custom graph algorithms and analytics while maintaining the performance and scalability benefits of Flink's distributed processing engine.