or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core.mdexceptions.mdgraphx.mdindex.mdlogging.mdmllib.mdsql.mdstorage.mdstreaming.mdutils.md

graphx.mddocs/

0

# Graph Processing (GraphX)

1

2

GraphX is Apache Spark's API for graph-parallel computation, combining the benefits of both data-parallel and graph-parallel systems. It provides a unified framework for ETL, exploratory analysis, and iterative graph computation within a single system.

3

4

```scala { .api }

5

import org.apache.spark.graphx._

6

import org.apache.spark.rdd.RDD

7

8

// Core graph types

9

type VertexId = Long

10

type EdgeTriplet[VD, ED] = org.apache.spark.graphx.EdgeTriplet[VD, ED]

11

12

case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED)

13

14

class Graph[VD: ClassTag, ED: ClassTag] extends Serializable {

15

// Graph properties

16

def vertices: VertexRDD[VD]

17

def edges: EdgeRDD[ED]

18

def triplets: RDD[EdgeTriplet[VD, ED]]

19

def numVertices: Long

20

def numEdges: Long

21

22

// Graph transformations

23

def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]

24

def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]

25

def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

26

def reverse: Graph[VD, ED]

27

def subgraph(epred: EdgeTriplet[VD, ED] => Boolean = _ => true, vpred: (VertexId, VD) => Boolean = (_, _) => true): Graph[VD, ED]

28

def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]

29

def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

30

31

// Structural operations

32

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]

33

def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]

34

def aggregateMessages[A: ClassTag](sendMsg: EdgeContext[VD, ED, A] => Unit, mergeMsg: (A, A) => A, tripletFields: TripletFields = TripletFields.All): VertexRDD[A]

35

36

// Caching and persistence

37

def cache(): Graph[VD, ED]

38

def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

39

def unpersist(blocking: Boolean = false): Graph[VD, ED]

40

def checkpoint(): Unit

41

def isCheckpointed: Boolean

42

43

// Graph algorithms

44

def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

45

def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]

46

def connectedComponents(): Graph[VertexId, ED]

47

def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]

48

def triangleCount(): Graph[Int, ED]

49

}

50

51

// Graph construction

52

object Graph {

53

def apply[VD: ClassTag, ED: ClassTag](vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD = null.asInstanceOf[VD], edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

54

55

def fromEdges[VD: ClassTag, ED: ClassTag](edges: RDD[Edge[ED]], defaultValue: VD, uniqueEdges: Option[PartitionStrategy] = None, edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

56

57

def fromEdgeTuples[VD: ClassTag](rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD, uniqueEdges: Option[PartitionStrategy] = None, edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int]

58

}

59

60

// Built-in graph algorithms

61

object GraphXUtils {

62

def connectedComponents[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]

63

def stronglyConnectedComponents[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED]

64

def triangleCount[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]

65

def pageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

66

def staticPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]

67

}

68

```

69

70

**Usage Examples:**

71

72

```scala

73

import org.apache.spark.graphx._

74

import org.apache.spark.rdd.RDD

75

76

val sc = spark.sparkContext

77

78

// Create vertices RDD

79

val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(

80

(1L, "Alice"), (2L, "Bob"), (3L, "Charlie"), (4L, "David"), (5L, "Ed")

81

))

82

83

// Create edges RDD

84

val edges: RDD[Edge[String]] = sc.parallelize(Array(

85

Edge(1L, 2L, "friend"), Edge(2L, 3L, "follow"), Edge(3L, 4L, "friend"),

86

Edge(4L, 5L, "follow"), Edge(1L, 3L, "follow"), Edge(2L, 5L, "friend")

87

))

88

89

// Create graph

90

val graph = Graph(vertices, edges)

91

92

// Basic graph statistics

93

println(s"Number of vertices: ${graph.numVertices}")

94

println(s"Number of edges: ${graph.numEdges}")

95

96

// Graph algorithms

97

val pageRankGraph = graph.pageRank(0.0001).cache()

98

val connectedComponentsGraph = graph.connectedComponents().cache()

99

val triangleCountGraph = graph.triangleCount().cache()

100

101

// Analyze results

102

pageRankGraph.vertices.collect().foreach {

103

case (id, rank) => println(s"Vertex $id has PageRank $rank")

104

}

105

106

// Complex graph analysis

107

val inDegrees = graph.inDegrees

108

val outDegrees = graph.outDegrees

109

val degrees = graph.degrees

110

111

// Join with vertex data

112

val vertexInfo = vertices.join(inDegrees).join(outDegrees).map {

113

case (id, ((name, inDeg), outDeg)) => (id, (name, inDeg, outDeg))

114

}

115

116

vertexInfo.collect().foreach {

117

case (id, (name, inDeg, outDeg)) =>

118

println(s"$name (id: $id) has $inDeg in-degree and $outDeg out-degree")

119

}

120

```

121

122

## Capabilities

123

124

### Graph Construction and Basic Operations

125

126

Create graphs from vertices and edges, perform basic transformations:

127

128

```scala

129

// From edge list

130

val socialGraph = Graph.fromEdgeTuples(

131

sc.parallelize(Array((1L, 2L), (2L, 3L), (3L, 1L))),

132

defaultValue = "user"

133

)

134

135

// Transform vertices and edges

136

val transformedGraph = graph

137

.mapVertices((id, attr) => attr.toUpperCase)

138

.mapEdges(edge => edge.attr + "_relationship")

139

140

// Subgraph operations

141

val friendsOnly = graph.subgraph(

142

epred = triplet => triplet.attr == "friend",

143

vpred = (id, attr) => attr != "Ed"

144

)

145

```

146

147

### Graph Algorithms

148

149

Built-in implementations of common graph algorithms:

150

151

```scala

152

// PageRank algorithm

153

val pageRanks = graph.pageRank(0.0001)

154

val topUsers = pageRanks.vertices.top(5)(Ordering.by(_._2))

155

156

// Connected components

157

val cc = graph.connectedComponents()

158

val sameComponent = cc.vertices.filter { case (id, component) =>

159

component == 1L

160

}.collect()

161

162

// Triangle counting

163

val triangles = graph.triangleCount()

164

val vertexTriangles = triangles.vertices.collect()

165

166

// Shortest paths

167

val sourceId: VertexId = 1L

168

val shortestPaths = graph.shortestPaths(Seq(sourceId))

169

```

170

171

### Message Passing and Aggregation

172

173

The Pregel-like aggregateMessages API for custom graph computations:

174

175

```scala

176

// Count neighbors

177

val neighborCounts = graph.aggregateMessages[Int](

178

triplet => {

179

triplet.sendToSrc(1)

180

triplet.sendToDst(1)

181

},

182

(a, b) => a + b

183

)

184

185

// Collect neighbor information

186

val neighborAttrs = graph.aggregateMessages[Array[String]](

187

triplet => {

188

triplet.sendToSrc(Array(triplet.dstAttr))

189

triplet.sendToDst(Array(triplet.srcAttr))

190

},

191

(a, b) => a ++ b

192

)

193

194

// Custom algorithm: find vertices with high-degree neighbors

195

val highDegreeNeighbors = graph.aggregateMessages[Int](

196

triplet => {

197

if (triplet.srcAttr.contains("important")) {

198

triplet.sendToDst(1)

199

}

200

if (triplet.dstAttr.contains("important")) {

201

triplet.sendToSrc(1)

202

}

203

},

204

(a, b) => a + b

205

)

206

```

207

208

GraphX provides efficient graph processing capabilities that integrate seamlessly with Spark's data processing pipeline, enabling complex graph analytics at scale.