or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-integration.mdgraph-algorithms.mdgraph-analytics.mdgraph-creation.mdgraph-transformations.mdindex.mduser-defined-functions.md

graph-analytics.mddocs/

0

# Graph Analytics

1

2

Built-in graph metrics, degree calculations, neighborhood operations, and custom reduction functions for comprehensive graph analysis and statistical computation.

3

4

## Capabilities

5

6

### Degree Calculations

7

8

Compute vertex degrees for analyzing graph connectivity patterns.

9

10

```scala { .api }

11

/**

12

* Return the in-degree of all vertices in the graph

13

* @return A DataSet of Tuple2<vertexId, inDegree>

14

*/

15

def inDegrees(): DataSet[(K, LongValue)]

16

17

/**

18

* Return the out-degree of all vertices in the graph

19

* @return A DataSet of Tuple2<vertexId, outDegree>

20

*/

21

def outDegrees(): DataSet[(K, LongValue)]

22

23

/**

24

* Return the degree of all vertices in the graph

25

* @return A DataSet of Tuple2<vertexId, degree>

26

*/

27

def getDegrees(): DataSet[(K, LongValue)]

28

```

29

30

### Neighborhood Reduction Operations

31

32

Perform reduction operations over vertex neighborhoods for custom analytics.

33

34

```scala { .api }

35

/**

36

* Compute a reduce transformation over the neighbors' vertex values of each vertex.

37

* For each vertex, the transformation consecutively calls a

38

* ReduceNeighborsFunction until only a single value for each vertex remains.

39

* The ReduceNeighborsFunction combines a pair of neighbor vertex values

40

* into one new value of the same type.

41

* @param reduceNeighborsFunction the reduce function to apply to the neighbors of each vertex.

42

* @param direction the edge direction (in-, out-, all-)

43

* @return a Dataset of Tuple2, with one tuple per vertex.

44

* The first field of the Tuple2 is the vertex ID and the second field

45

* is the aggregate value computed by the provided ReduceNeighborsFunction.

46

*/

47

def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV],

48

direction: EdgeDirection): DataSet[(K, VV)]

49

50

/**

51

* Compute a reduce transformation over the neighbors' vertex values of each vertex.

52

* For each vertex, the transformation consecutively calls a

53

* ReduceNeighborsFunction until only a single value for each vertex remains.

54

* The ReduceNeighborsFunction combines a pair of neighbor vertex values

55

* into one new value of the same type.

56

* @param reduceEdgesFunction the reduce function to apply to the edges of each vertex.

57

* @param direction the edge direction (in-, out-, all-)

58

* @return a Dataset of Tuple2, with one tuple per vertex.

59

* The first field of the Tuple2 is the vertex ID and the second field

60

* is the aggregate value computed by the provided ReduceNeighborsFunction.

61

*/

62

def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV],

63

direction: EdgeDirection): DataSet[(K, EV)]

64

```

65

66

### Group Reduction Operations

67

68

Perform more complex aggregations over neighborhoods using user-defined functions.

69

70

```scala { .api }

71

/**

72

* Compute an aggregate over the edges of each vertex. The function applied

73

* on the edges has access to the vertex value.

74

* @param edgesFunction the function to apply to the neighborhood

75

* @param direction the edge direction (in-, out-, all-)

76

* @tparam T the output type

77

* @return a dataset of a T

78

*/

79

def groupReduceOnEdges[T](edgesFunction: EdgesFunctionWithVertexValue[K, VV, EV, T],

80

direction: EdgeDirection): DataSet[T]

81

82

/**

83

* Compute an aggregate over the edges of each vertex. The function applied

84

* on the edges has access to the vertex value.

85

* @param edgesFunction the function to apply to the neighborhood

86

* @param direction the edge direction (in-, out-, all-)

87

* @tparam T the output type

88

* @return a dataset of a T

89

*/

90

def groupReduceOnEdges[T](edgesFunction: EdgesFunction[K, EV, T],

91

direction: EdgeDirection): DataSet[T]

92

93

/**

94

* Compute an aggregate over the neighbors (edges and vertices) of each

95

* vertex. The function applied on the neighbors has access to the vertex

96

* value.

97

* @param neighborsFunction the function to apply to the neighborhood

98

* @param direction the edge direction (in-, out-, all-)

99

* @tparam T the output type

100

* @return a dataset of a T

101

*/

102

def groupReduceOnNeighbors[T](neighborsFunction: NeighborsFunctionWithVertexValue[K, VV, EV, T],

103

direction: EdgeDirection): DataSet[T]

104

105

/**

106

* Compute an aggregate over the neighbors (edges and vertices) of each

107

* vertex.

108

* @param neighborsFunction the function to apply to the neighborhood

109

* @param direction the edge direction (in-, out-, all-)

110

* @tparam T the output type

111

* @return a dataset of a T

112

*/

113

def groupReduceOnNeighbors[T](neighborsFunction: NeighborsFunction[K, VV, EV, T],

114

direction: EdgeDirection): DataSet[T]

115

```

116

117

## Function Interfaces

118

119

### Reduction Function Types

120

121

Key function interfaces for implementing custom neighborhood reductions:

122

123

```scala { .api }

124

// From Java Gelly - Reduce functions for simple aggregations

125

trait ReduceNeighborsFunction[VV] {

126

def reduceNeighbors(firstNeighborValue: VV, secondNeighborValue: VV): VV

127

}

128

129

trait ReduceEdgesFunction[EV] {

130

def reduceEdges(firstEdgeValue: EV, secondEdgeValue: EV): EV

131

}

132

```

133

134

### Edge Direction Enumeration

135

136

Control which edges to consider in neighborhood operations:

137

138

```scala { .api }

139

// From Java Gelly

140

object EdgeDirection extends Enumeration {

141

val IN: EdgeDirection // Consider only incoming edges

142

val OUT: EdgeDirection // Consider only outgoing edges

143

val ALL: EdgeDirection // Consider both incoming and outgoing edges

144

}

145

```

146

147

**Usage Examples:**

148

149

```scala

150

import org.apache.flink.graph.scala._

151

import org.apache.flink.graph.{Edge, Vertex, EdgeDirection}

152

import org.apache.flink.api.scala._

153

import org.apache.flink.types.LongValue

154

155

val env = ExecutionEnvironment.getExecutionEnvironment

156

157

// Create sample graph with numeric vertex values

158

val vertices = env.fromCollection(Seq(

159

new Vertex(1L, 10.0),

160

new Vertex(2L, 20.0),

161

new Vertex(3L, 30.0),

162

new Vertex(4L, 40.0)

163

))

164

165

val edges = env.fromCollection(Seq(

166

new Edge(1L, 2L, 1.5),

167

new Edge(2L, 3L, 2.5),

168

new Edge(3L, 4L, 3.5),

169

new Edge(1L, 4L, 4.5)

170

))

171

172

val graph = Graph.fromDataSet(vertices, edges, env)

173

174

// Basic degree calculations

175

val inDegrees = graph.inDegrees() // DataSet[(Long, LongValue)]

176

val outDegrees = graph.outDegrees() // DataSet[(Long, LongValue)]

177

val allDegrees = graph.getDegrees() // DataSet[(Long, LongValue)]

178

179

// Simple reductions on neighbors

180

val maxNeighborValue = graph.reduceOnNeighbors(

181

new ReduceNeighborsFunction[Double] {

182

override def reduceNeighbors(first: Double, second: Double): Double = {

183

math.max(first, second)

184

}

185

},

186

EdgeDirection.ALL

187

)

188

189

val sumEdgeWeights = graph.reduceOnEdges(

190

new ReduceEdgesFunction[Double] {

191

override def reduceEdges(first: Double, second: Double): Double = {

192

first + second

193

}

194

},

195

EdgeDirection.OUT

196

)

197

```

198

199

### Advanced Analytics Examples

200

201

```scala

202

// Custom edge analysis function

203

class EdgeStatistics extends EdgesFunction[Long, Double, (Long, Int, Double, Double)] {

204

override def iterateEdges(edges: Iterable[(Long, Edge[Long, Double])],

205

out: Collector[(Long, Int, Double, Double)]): Unit = {

206

val edgeList = edges.toList

207

if (edgeList.nonEmpty) {

208

val vertexId = edgeList.head._1

209

val edgeCount = edgeList.size

210

val weights = edgeList.map(_._2.getValue)

211

val minWeight = weights.min

212

val maxWeight = weights.max

213

214

out.collect((vertexId, edgeCount, minWeight, maxWeight))

215

}

216

}

217

}

218

219

// Apply custom edge statistics

220

val edgeStats = graph.groupReduceOnEdges(new EdgeStatistics(), EdgeDirection.OUT)

221

222

// Custom neighbor analysis with vertex access

223

class NeighborAnalysis extends NeighborsFunctionWithVertexValue[Long, Double, Double, (Long, Double, Double)] {

224

override def iterateNeighbors(vertex: Vertex[Long, Double],

225

neighbors: Iterable[(Edge[Long, Double], Vertex[Long, Double])],

226

out: Collector[(Long, Double, Double)]): Unit = {

227

val neighborList = neighbors.toList

228

if (neighborList.nonEmpty) {

229

val avgNeighborValue = neighborList.map(_._2.getValue).sum / neighborList.size

230

val avgEdgeWeight = neighborList.map(_._1.getValue).sum / neighborList.size

231

232

out.collect((vertex.getId, avgNeighborValue, avgEdgeWeight))

233

}

234

}

235

}

236

237

// Apply neighbor analysis

238

val neighborStats = graph.groupReduceOnNeighbors(new NeighborAnalysis(), EdgeDirection.ALL)

239

```

240

241

### Analytical Patterns

242

243

#### Local Graph Properties

244

Calculate properties for individual vertices based on their neighborhoods:

245

246

```scala

247

// Vertex clustering coefficient

248

class ClusteringCoefficient extends NeighborsFunction[Long, Double, Double, (Long, Double)] {

249

override def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Double], Vertex[Long, Double])],

250

out: Collector[(Long, Double)]): Unit = {

251

val neighborList = neighbors.toList

252

if (neighborList.size >= 2) {

253

val vertexId = neighborList.head._1

254

val neighborIds = neighborList.map(_._3.getId).toSet

255

256

// Count edges between neighbors (simplified - would need actual graph access)

257

val possibleEdges = neighborIds.size * (neighborIds.size - 1) / 2

258

val clustering = if (possibleEdges > 0) 0.0 else 0.0 // Placeholder logic

259

260

out.collect((vertexId, clustering))

261

}

262

}

263

}

264

```

265

266

#### Aggregated Statistics

267

Compute graph-wide statistics by combining local measurements:

268

269

```scala

270

// Combine degree calculations with other metrics

271

val degreeStats = graph.getDegrees().collect()

272

val avgDegree = degreeStats.map(_._2.getValue).sum / degreeStats.length.toDouble

273

val maxDegree = degreeStats.map(_._2.getValue).max

274

val minDegree = degreeStats.map(_._2.getValue).min

275

```

276

277

### Performance Considerations

278

279

- **Direction Selection**: Choose appropriate EdgeDirection (IN, OUT, ALL) to minimize computation

280

- **Function Complexity**: Keep reduction functions simple for better performance

281

- **Memory Usage**: Be aware of memory usage when collecting neighborhood information

282

- **Parallelization**: Group reduction operations are automatically parallelized across the cluster

283

- **Caching**: Consider caching frequently accessed neighborhood computations

284

285

The analytics capabilities provide both built-in metrics and flexible frameworks for custom graph analysis, all executed efficiently within Flink's distributed environment.