or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

graph-analytics.mdgraph-construction.mdgraph-operations.mdindex.mditerative-algorithms.md

graph-analytics.mddocs/

0

# Graph Analytics

1

2

Complete API for graph analytics, metrics calculations, and structural operations.

3

4

## Degree Calculations

5

6

### Basic Degree Metrics

7

8

```scala { .api }

9

def inDegrees(): DataSet[(K, LongValue)]

10

```

11

12

Returns the in-degree of all vertices in the graph as a DataSet of `(vertexId, inDegree)` tuples.

13

14

```scala { .api }

15

def outDegrees(): DataSet[(K, LongValue)]

16

```

17

18

Returns the out-degree of all vertices in the graph as a DataSet of `(vertexId, outDegree)` tuples.

19

20

```scala { .api }

21

def getDegrees(): DataSet[(K, LongValue)]

22

```

23

24

Returns the total degree (in-degree + out-degree) of all vertices as a DataSet of `(vertexId, degree)` tuples.

25

26

## Structural Transformations

27

28

### Graph Structure Modifications

29

30

```scala { .api }

31

def getUndirected(): Graph[K, VV, EV]

32

```

33

34

Creates an undirected version of the graph by adding all inverse-direction edges. Each edge `(u,v)` results in both `(u,v)` and `(v,u)` edges.

35

36

```scala { .api }

37

def reverse(): Graph[K, VV, EV]

38

```

39

40

Reverses the direction of all edges in the graph. Edge `(u,v)` becomes `(v,u)`.

41

42

### Validation

43

44

```scala { .api }

45

def validate(validator: GraphValidator[K, VV, EV]): Boolean

46

```

47

48

Validates the graph using the provided validator function.

49

50

**Parameters:**

51

- `validator` - GraphValidator that defines validation rules

52

53

## Set Operations

54

55

### Graph Union

56

57

```scala { .api }

58

def union(graph: Graph[K, VV, EV]): Graph[K, VV, EV]

59

```

60

61

Performs union on the vertices and edges sets of the input graphs. Removes duplicate vertices but maintains duplicate edges.

62

63

**Parameters:**

64

- `graph` - The graph to perform union with

65

66

### Graph Difference

67

68

```scala { .api }

69

def difference(graph: Graph[K, VV, EV]): Graph[K, VV, EV]

70

```

71

72

Performs difference on the vertex and edge sets. Removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed.

73

74

**Parameters:**

75

- `graph` - The graph to perform difference with

76

77

### Graph Intersection

78

79

```scala { .api }

80

def intersect(graph: Graph[K, VV, EV], distinctEdges: Boolean): Graph[K, NullValue, EV]

81

```

82

83

Performs intersection on the edge sets of the input graphs. Edges are considered equal if they have the same source identifier, target identifier, and edge value.

84

85

**Parameters:**

86

- `graph` - The graph to perform intersection with

87

- `distinctEdges` - If `true`, exactly one edge represents all pairs of equal edges; if `false`, both edges of each pair are included

88

89

## Neighborhood Operations

90

91

### Edge-based Aggregations

92

93

```scala { .api }

94

def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunction[K, EV, T], direction: EdgeDirection): DataSet[T]

95

```

96

97

Computes an aggregate over the edges of each vertex without access to the vertex value.

98

99

**Parameters:**

100

- `edgesFunction` - Function to apply to the edges of each vertex

101

- `direction` - Edge direction (IN, OUT, ALL)

102

103

```scala { .api }

104

def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]

105

```

106

107

Computes an aggregate over the edges of each vertex with access to the vertex value.

108

109

**Parameters:**

110

- `edgesFunction` - Function that has access to both vertex value and edges

111

- `direction` - Edge direction (IN, OUT, ALL)

112

113

### Neighbor-based Aggregations

114

115

```scala { .api }

116

def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunction[K, VV, EV, T], direction: EdgeDirection): DataSet[T]

117

```

118

119

Computes an aggregate over the neighbors (edges and vertices) of each vertex.

120

121

**Parameters:**

122

- `neighborsFunction` - Function to apply to the neighborhood

123

- `direction` - Edge direction (IN, OUT, ALL)

124

125

```scala { .api }

126

def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]

127

```

128

129

Computes an aggregate over the neighbors with access to the source vertex value.

130

131

**Parameters:**

132

- `neighborsFunction` - Function that has access to source vertex and neighbors

133

- `direction` - Edge direction (IN, OUT, ALL)

134

135

## Reduction Operations

136

137

### Neighbor Value Reduction

138

139

```scala { .api }

140

def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction: EdgeDirection): DataSet[(K, VV)]

141

```

142

143

Computes a reduce transformation over the neighbors' vertex values of each vertex. The function consecutively combines pairs of neighbor vertex values until only a single value remains.

144

145

**Parameters:**

146

- `reduceNeighborsFunction` - Reduce function to apply to neighbor values

147

- `direction` - Edge direction (IN, OUT, ALL)

148

149

**Returns:** DataSet of `(vertexId, aggregatedValue)` tuples

150

151

### Edge Value Reduction

152

153

```scala { .api }

154

def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection): DataSet[(K, EV)]

155

```

156

157

Computes a reduce transformation over the edge values of each vertex. The function consecutively combines pairs of edge values until only a single value remains.

158

159

**Parameters:**

160

- `reduceEdgesFunction` - Reduce function to apply to edge values

161

- `direction` - Edge direction (IN, OUT, ALL)

162

163

**Returns:** DataSet of `(vertexId, aggregatedValue)` tuples

164

165

## Custom Function Types

166

167

### EdgesFunction

168

169

```scala { .api }

170

abstract class EdgesFunction[K, EV, T] {

171

def iterateEdges(edges: Iterable[(K, Edge[K, EV])], out: Collector[T]): Unit

172

}

173

```

174

175

Abstract base class for functions that operate on the edges of a vertex.

176

177

### EdgesFunctionWithVertexValue

178

179

```scala { .api }

180

abstract class EdgesFunctionWithVertexValue[K, VV, EV, T] {

181

def iterateEdges(v: Vertex[K, VV], edges: Iterable[Edge[K, EV]], out: Collector[T]): Unit

182

}

183

```

184

185

Abstract base class for functions that operate on edges with access to the vertex value.

186

187

### NeighborsFunction

188

189

```scala { .api }

190

abstract class NeighborsFunction[K, VV, EV, T] {

191

def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])], out: Collector[T]): Unit

192

}

193

```

194

195

Abstract base class for functions that operate on vertex neighbors (edges and adjacent vertices).

196

197

### NeighborsFunctionWithVertexValue

198

199

```scala { .api }

200

abstract class NeighborsFunctionWithVertexValue[K, VV, EV, T] {

201

def iterateNeighbors(vertex: Vertex[K, VV], neighbors: Iterable[(Edge[K, EV], Vertex[K, VV])], out: Collector[T]): Unit

202

}

203

```

204

205

Abstract base class for functions that operate on neighbors with access to the source vertex value.

206

207

## Edge Direction Enum

208

209

```scala { .api }

210

object EdgeDirection extends Enumeration {

211

val IN, OUT, ALL = Value

212

}

213

```

214

215

Enumeration for specifying edge directions in neighborhood operations:

216

- `IN` - Consider only incoming edges

217

- `OUT` - Consider only outgoing edges

218

- `ALL` - Consider both incoming and outgoing edges

219

220

## Usage Examples

221

222

### Degree Analysis

223

224

```scala

225

import org.apache.flink.graph.EdgeDirection

226

227

// Calculate all degree metrics

228

val inDegrees = graph.inDegrees()

229

val outDegrees = graph.outDegrees()

230

val totalDegrees = graph.getDegrees()

231

232

// Find vertices with high out-degree

233

val highOutDegree = outDegrees.filter(_._2.getValue > 10)

234

```

235

236

### Structural Operations

237

238

```scala

239

// Create undirected version

240

val undirectedGraph = graph.getUndirected()

241

242

// Reverse all edges

243

val reversedGraph = graph.reverse()

244

245

// Combine with another graph

246

val combinedGraph = graph.union(otherGraph)

247

248

// Find intersection with another graph

249

val intersection = graph.intersect(otherGraph, distinctEdges = true)

250

```

251

252

### Neighborhood Aggregations

253

254

```scala

255

// Custom edge aggregation function

256

class SumEdgeValues extends EdgesFunction[Long, Double, Double] {

257

override def iterateEdges(edges: Iterable[(Long, Edge[Long, Double])], out: Collector[Double]): Unit = {

258

val sum = edges.map(_._2.getValue).sum

259

out.collect(sum)

260

}

261

}

262

263

// Apply edge aggregation

264

val edgeSums = graph.groupReduceOnEdges(new SumEdgeValues(), EdgeDirection.OUT)

265

266

// Reduce neighbor values

267

val neighborSums = graph.reduceOnNeighbors(

268

new ReduceNeighborsFunction[String] {

269

override def reduceNeighbors(firstNeighborValue: String, secondNeighborValue: String): String = {

270

firstNeighborValue + "," + secondNeighborValue

271

}

272

},

273

EdgeDirection.ALL

274

)

275

```

276

277

### Advanced Analytics

278

279

```scala

280

// Calculate average edge weight per vertex

281

class AverageEdgeWeight extends EdgesFunctionWithVertexValue[Long, String, Double, (Long, Double)] {

282

override def iterateEdges(vertex: Vertex[Long, String], edges: Iterable[Edge[Long, Double]], out: Collector[(Long, Double)]): Unit = {

283

val edgeList = edges.toList

284

if (edgeList.nonEmpty) {

285

val average = edgeList.map(_.getValue).sum / edgeList.size

286

out.collect((vertex.getId, average))

287

}

288

}

289

}

290

291

val avgWeights = graph.groupReduceOnEdges(new AverageEdgeWeight(), EdgeDirection.OUT)

292

```