0
# Graph Creation and Management
1
2
Comprehensive functionality for creating graphs from various data sources and managing graph structure through CRUD operations.
3
4
## Capabilities
5
6
### Graph Factory Methods
7
8
Create graphs from different data sources including DataSets, Collections, CSV files, and various tuple formats.
9
10
#### Creating from DataSets
11
12
```scala { .api }
13
/**
14
* Creates a Graph from a DataSet of vertices and a DataSet of edges.
15
*/
16
def fromDataSet[K, VV, EV](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]],
17
env: ExecutionEnvironment): Graph[K, VV, EV]
18
19
/**
20
* Creates a Graph from a DataSet of edges.
21
* Vertices are created automatically and their values are set to NullValue.
22
*/
23
def fromDataSet[K, EV](edges: DataSet[Edge[K, EV]],
24
env: ExecutionEnvironment): Graph[K, NullValue, EV]
25
26
/**
27
* Creates a graph from a DataSet of edges.
28
* Vertices are created automatically and their values are set by applying the provided
29
* vertexValueInitializer map function to the vertex ids.
30
*/
31
def fromDataSet[K, VV, EV](edges: DataSet[Edge[K, EV]],
32
vertexValueInitializer: MapFunction[K, VV],
33
env: ExecutionEnvironment): Graph[K, VV, EV]
34
```
35
36
#### Creating from Collections
37
38
```scala { .api }
39
/**
40
* Creates a Graph from a Seq of vertices and a Seq of edges.
41
*/
42
def fromCollection[K, VV, EV](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]],
43
env: ExecutionEnvironment): Graph[K, VV, EV]
44
45
/**
46
* Creates a Graph from a Seq of edges.
47
* Vertices are created automatically and their values are set to NullValue.
48
*/
49
def fromCollection[K, EV](edges: Seq[Edge[K, EV]],
50
env: ExecutionEnvironment): Graph[K, NullValue, EV]
51
52
/**
53
* Creates a graph from a Seq of edges.
54
* Vertices are created automatically and their values are set by applying the provided
55
* vertexValueInitializer map function to the vertex ids.
56
*/
57
def fromCollection[K, VV, EV](edges: Seq[Edge[K, EV]],
58
vertexValueInitializer: MapFunction[K, VV],
59
env: ExecutionEnvironment): Graph[K, VV, EV]
60
```
61
62
#### Creating from Tuple DataSets
63
64
```scala { .api }
65
/**
66
* Creates a graph from DataSets of tuples for vertices and for edges.
67
* The first field of the Tuple2 vertex object will become the vertex ID
68
* and the second field will become the vertex value.
69
* The first field of the Tuple3 object for edges will become the source ID,
70
* the second field will become the target ID, and the third field will become
71
* the edge value.
72
*/
73
def fromTupleDataSet[K, VV, EV](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)],
74
env: ExecutionEnvironment): Graph[K, VV, EV]
75
76
/**
77
* Creates a Graph from a DataSet of Tuples representing the edges.
78
* Vertices are created automatically and their values are set to NullValue.
79
*/
80
def fromTupleDataSet[K, EV](edges: DataSet[(K, K, EV)],
81
env: ExecutionEnvironment): Graph[K, NullValue, EV]
82
83
/**
84
* Creates a Graph from a DataSet of Tuples representing the edges.
85
* Vertices are created automatically and their values are set by applying the provided
86
* vertexValueInitializer map function to the vertex ids.
87
*/
88
def fromTupleDataSet[K, VV, EV](edges: DataSet[(K, K, EV)],
89
vertexValueInitializer: MapFunction[K, VV],
90
env: ExecutionEnvironment): Graph[K, VV, EV]
91
```
92
93
#### Creating from Tuple2 DataSets (Simple Edges)
94
95
```scala { .api }
96
/**
97
* Creates a Graph from a DataSet of Tuple2's representing the edges.
98
* The first field of the Tuple2 object for edges will become the source ID,
99
* the second field will become the target ID. The edge value will be set to NullValue.
100
* Vertices are created automatically and their values are set to NullValue.
101
*/
102
def fromTuple2DataSet[K](edges: DataSet[(K, K)],
103
env: ExecutionEnvironment): Graph[K, NullValue, NullValue]
104
105
/**
106
* Creates a Graph from a DataSet of Tuple2's representing the edges.
107
* The first field of the Tuple2 object for edges will become the source ID,
108
* the second field will become the target ID. The edge value will be set to NullValue.
109
* Vertices are created automatically and their values are set by applying the provided
110
* vertexValueInitializer map function to the vertex IDs.
111
*/
112
def fromTuple2DataSet[K, VV](edges: DataSet[(K, K)],
113
vertexValueInitializer: MapFunction[K, VV],
114
env: ExecutionEnvironment): Graph[K, VV, NullValue]
115
```
116
117
#### Creating from CSV Files
118
119
```scala { .api }
120
/**
121
* Creates a Graph from CSV files.
122
* Supports extensive configuration for parsing edges and optionally vertices.
123
* The edge value is read from the CSV file if EV is not of type NullValue.
124
* Otherwise the edge value is set to NullValue.
125
* If the vertex value type VV is specified (unequal NullValue), then the vertex values
126
* are read from the file specified by pathVertices.
127
*/
128
def fromCsvReader[K, VV, EV](
129
env: ExecutionEnvironment,
130
pathEdges: String,
131
pathVertices: String = null,
132
lineDelimiterVertices: String = "\n",
133
fieldDelimiterVertices: String = ",",
134
quoteCharacterVertices: Character = null,
135
ignoreFirstLineVertices: Boolean = false,
136
ignoreCommentsVertices: String = null,
137
lenientVertices: Boolean = false,
138
includedFieldsVertices: Array[Int] = null,
139
lineDelimiterEdges: String = "\n",
140
fieldDelimiterEdges: String = ",",
141
quoteCharacterEdges: Character = null,
142
ignoreFirstLineEdges: Boolean = false,
143
ignoreCommentsEdges: String = null,
144
lenientEdges: Boolean = false,
145
includedFieldsEdges: Array[Int] = null,
146
vertexValueInitializer: MapFunction[K, VV] = null): Graph[K, VV, EV]
147
```
148
149
**Usage Examples:**
150
151
```scala
152
import org.apache.flink.graph.scala._
153
import org.apache.flink.graph.{Edge, Vertex}
154
import org.apache.flink.api.scala._
155
156
val env = ExecutionEnvironment.getExecutionEnvironment
157
158
// From DataSets
159
val vertices = env.fromCollection(Seq(
160
new Vertex(1L, "Alice"),
161
new Vertex(2L, "Bob")
162
))
163
val edges = env.fromCollection(Seq(
164
new Edge(1L, 2L, 0.5)
165
))
166
val graph1 = Graph.fromDataSet(vertices, edges, env)
167
168
// From Collections
169
val vertexSeq = Seq(new Vertex(1L, "Alice"), new Vertex(2L, "Bob"))
170
val edgeSeq = Seq(new Edge(1L, 2L, 0.5))
171
val graph2 = Graph.fromCollection(vertexSeq, edgeSeq, env)
172
173
// From Tuples
174
val vertexTuples = env.fromCollection(Seq((1L, "Alice"), (2L, "Bob")))
175
val edgeTuples = env.fromCollection(Seq((1L, 2L, 0.5)))
176
val graph3 = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env)
177
178
// From CSV
179
val graphFromCsv = Graph.fromCsvReader[Long, String, Double](
180
env,
181
pathEdges = "/path/to/edges.csv",
182
pathVertices = "/path/to/vertices.csv"
183
)
184
```
185
186
### Graph Access Methods
187
188
Retrieve graph components in various formats for analysis and processing.
189
190
```scala { .api }
191
/**
192
* @return the vertex DataSet.
193
*/
194
def getVertices(): DataSet[Vertex[K, VV]]
195
196
/**
197
* @return the edge DataSet.
198
*/
199
def getEdges(): DataSet[Edge[K, EV]]
200
201
/**
202
* @return the vertex DataSet as Tuple2.
203
*/
204
def getVerticesAsTuple2(): DataSet[(K, VV)]
205
206
/**
207
* @return the edge DataSet as Tuple3.
208
*/
209
def getEdgesAsTuple3(): DataSet[(K, K, EV)]
210
211
/**
212
* @return a DataSet of Triplets,
213
* consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
214
*/
215
def getTriplets(): DataSet[Triplet[K, VV, EV]]
216
217
/**
218
* @return The IDs of the vertices as DataSet
219
*/
220
def getVertexIds(): DataSet[K]
221
222
/**
223
* @return The IDs of the edges as DataSet
224
*/
225
def getEdgeIds(): DataSet[(K, K)]
226
```
227
228
### Graph Modification Operations
229
230
Add and remove vertices and edges from existing graphs.
231
232
#### Adding Elements
233
234
```scala { .api }
235
/**
236
* Adds the input vertex to the graph. If the vertex already
237
* exists in the graph, it will not be added again.
238
*/
239
def addVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]
240
241
/**
242
* Adds the list of vertices, passed as input, to the graph.
243
* If the vertices already exist in the graph, they will not be added once more.
244
*/
245
def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV]
246
247
/**
248
* Adds the given edge to the graph. If the source and target vertices do
249
* not exist in the graph, they will also be added.
250
*/
251
def addEdge(source: Vertex[K, VV], target: Vertex[K, VV], edgeValue: EV): Graph[K, VV, EV]
252
253
/**
254
* Adds the given list edges to the graph.
255
* When adding an edge for a non-existing set of vertices,
256
* the edge is considered invalid and ignored.
257
*/
258
def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV]
259
```
260
261
#### Removing Elements
262
263
```scala { .api }
264
/**
265
* Removes the given vertex and its edges from the graph.
266
*/
267
def removeVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]
268
269
/**
270
* Removes the given vertex and its edges from the graph.
271
*/
272
def removeVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV]
273
274
/**
275
* Removes all edges that match the given edge from the graph.
276
*/
277
def removeEdge(edge: Edge[K, EV]): Graph[K, VV, EV]
278
279
/**
280
* Removes all the edges that match the edges in the given data set from the graph.
281
*/
282
def removeEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV]
283
```
284
285
### Graph Set Operations
286
287
Perform mathematical set operations between graphs.
288
289
```scala { .api }
290
/**
291
* Performs union on the vertices and edges sets of the input graphs
292
* removing duplicate vertices but maintaining duplicate edges.
293
*/
294
def union(graph: Graph[K, VV, EV]): Graph[K, VV, EV]
295
296
/**
297
* Performs Difference on the vertex and edge sets of the input graphs
298
* removes common vertices and edges. If a source/target vertex is removed,
299
* its corresponding edge will also be removed
300
*/
301
def difference(graph: Graph[K, VV, EV]): Graph[K, VV, EV]
302
303
/**
304
* Performs intersect on the edge sets of the input graphs. Edges are considered equal, if they
305
* have the same source identifier, target identifier and edge value.
306
* The method computes pairs of equal edges from the input graphs. If the same edge occurs
307
* multiple times in the input graphs, there will be multiple edge pairs to be considered. Each
308
* edge instance can only be part of one pair. If the given parameter `distinctEdges` is set
309
* to `true`, there will be exactly one edge in the output graph representing all pairs of
310
* equal edges. If the parameter is set to `false`, both edges of each pair will be in the
311
* output.
312
* Vertices in the output graph will have no vertex values.
313
*/
314
def intersect(graph: Graph[K, VV, EV], distinctEdges: Boolean): Graph[K, NullValue, EV]
315
```
316
317
### Graph Metrics
318
319
Basic graph statistics and properties.
320
321
```scala { .api }
322
/**
323
* @return a long integer representing the number of vertices
324
*/
325
def numberOfVertices(): Long
326
327
/**
328
* @return a long integer representing the number of edges
329
*/
330
def numberOfEdges(): Long
331
```
332
333
**Usage Examples:**
334
335
```scala
336
// Adding elements
337
val newVertex = new Vertex(4L, "David")
338
val graphWithVertex = graph.addVertex(newVertex)
339
340
val newEdge = new Edge(3L, 4L, 0.7)
341
val graphWithEdge = graph.addEdge(new Vertex(3L, "Charlie"), newVertex, 0.7)
342
343
// Set operations
344
val graph1 = Graph.fromTupleDataSet(vertices1, edges1, env)
345
val graph2 = Graph.fromTupleDataSet(vertices2, edges2, env)
346
val unionGraph = graph1.union(graph2)
347
val intersectionGraph = graph1.intersect(graph2, distinctEdges = true)
348
349
// Basic metrics
350
val vertexCount = graph.numberOfVertices()
351
val edgeCount = graph.numberOfEdges()
352
```