or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

algorithms.mdanalytics.mddata-access.mdgenerators.mdgraph-creation.mdindex.mditerative-processing.md

data-access.mddocs/

0

# Data Access and Utilities

1

2

Gelly provides comprehensive methods for accessing graph data, computing neighborhoods, performing joins, and integrating with Flink's DataSet API. This includes graph validation, format conversion, and utility functions for common graph operations.

3

4

## Capabilities

5

6

### Basic Data Access

7

8

Access the fundamental graph components as Flink DataSets.

9

10

```java { .api }

11

// Core data access

12

public DataSet<Vertex<K, VV>> getVertices()

13

public DataSet<Edge<K, EV>> getEdges()

14

public DataSet<Triplet<K, VV, EV>> getTriplets()

15

16

// ID extraction

17

public DataSet<K> getVertexIds()

18

public DataSet<Tuple2<K, K>> getEdgeIds()

19

```

20

21

**Usage Example:**

22

23

```java

24

Graph<Long, String, Double> graph = /* ... */;

25

26

// Access vertices and edges

27

DataSet<Vertex<Long, String>> vertices = graph.getVertices();

28

DataSet<Edge<Long, Double>> edges = graph.getEdges();

29

30

// Process vertices

31

DataSet<String> vertexValues = vertices.map(vertex -> vertex.getValue());

32

vertexValues.print();

33

34

// Process edges

35

DataSet<Double> edgeWeights = edges.map(edge -> edge.getValue());

36

System.out.println("Average weight: " + edgeWeights.reduce((a, b) -> (a + b) / 2));

37

38

// Get triplets (edges with vertex values)

39

DataSet<Triplet<Long, String, Double>> triplets = graph.getTriplets();

40

triplets.print(); // Shows: (srcId, trgId, srcValue, trgValue, edgeValue)

41

```

42

43

### Neighborhood Operations

44

45

Perform computations on vertex neighborhoods with various aggregation patterns.

46

47

#### Edge-Based Neighborhood Operations

48

49

Operate on edges incident to each vertex.

50

51

```java { .api }

52

public <T> DataSet<T> groupReduceOnEdges(

53

EdgesFunction<K, EV, T> edgesFunction,

54

EdgeDirection direction)

55

56

public <T> DataSet<T> groupReduceOnEdges(

57

EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,

58

EdgeDirection direction)

59

60

public DataSet<EV> reduceOnEdges(

61

ReduceEdgesFunction<EV> reduceEdgesFunction,

62

EdgeDirection direction)

63

```

64

65

#### Neighbor-Based Operations

66

67

Operate on neighboring vertices.

68

69

```java { .api }

70

public <T> DataSet<T> groupReduceOnNeighbors(

71

NeighborsFunction<K, VV, EV, T> neighborsFunction,

72

EdgeDirection direction)

73

74

public <T> DataSet<T> groupReduceOnNeighbors(

75

NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,

76

EdgeDirection direction)

77

78

public DataSet<VV> reduceOnNeighbors(

79

ReduceNeighborsFunction<VV> reduceNeighborsFunction,

80

EdgeDirection direction)

81

```

82

83

**Usage Example:**

84

85

```java

86

// Count outgoing edges for each vertex

87

DataSet<Tuple2<Long, Integer>> outDegrees = graph.groupReduceOnEdges(

88

new EdgesFunction<Long, Double, Tuple2<Long, Integer>>() {

89

@Override

90

public void iterateEdges(

91

Iterable<Tuple2<Long, Edge<Long, Double>>> edges,

92

Collector<Tuple2<Long, Integer>> out) {

93

94

int count = 0;

95

Long vertexId = null;

96

97

for (Tuple2<Long, Edge<Long, Double>> edge : edges) {

98

vertexId = edge.f0; // Vertex ID

99

count++;

100

}

101

102

out.collect(new Tuple2<>(vertexId, count));

103

}

104

},

105

EdgeDirection.OUT

106

);

107

108

// Sum neighbor values

109

DataSet<Tuple2<Long, String>> neighborSums = graph.groupReduceOnNeighbors(

110

new NeighborsFunction<Long, String, Double, Tuple2<Long, String>>() {

111

@Override

112

public void iterateNeighbors(

113

Iterable<Tuple3<Long, Vertex<Long, String>, Edge<Long, Double>>> neighbors,

114

Collector<Tuple2<Long, String>> out) {

115

116

StringBuilder sum = new StringBuilder();

117

Long vertexId = null;

118

119

for (Tuple3<Long, Vertex<Long, String>, Edge<Long, Double>> neighbor : neighbors) {

120

vertexId = neighbor.f0;

121

sum.append(neighbor.f1.getValue()).append(" ");

122

}

123

124

out.collect(new Tuple2<>(vertexId, sum.toString().trim()));

125

}

126

},

127

EdgeDirection.ALL

128

);

129

```

130

131

### Function Interfaces

132

133

Core interfaces for neighborhood operations.

134

135

#### EdgesFunction<K, EV, O>

136

137

Process edges incident to each vertex.

138

139

```java { .api }

140

public interface EdgesFunction<K, EV, O> extends Function, Serializable {

141

void iterateEdges(

142

Iterable<Tuple2<K, Edge<K, EV>>> edges,

143

Collector<O> out) throws Exception;

144

}

145

```

146

147

#### EdgesFunctionWithVertexValue<K, VV, EV, T>

148

149

Process edges with access to the vertex value.

150

151

```java { .api }

152

public interface EdgesFunctionWithVertexValue<K, VV, EV, T> extends Function, Serializable {

153

void iterateEdges(

154

Vertex<K, VV> vertex,

155

Iterable<Edge<K, EV>> edges,

156

Collector<T> out) throws Exception;

157

}

158

```

159

160

#### NeighborsFunction<K, VV, EV, T>

161

162

Process neighboring vertices and connecting edges.

163

164

```java { .api }

165

public interface NeighborsFunction<K, VV, EV, T> extends Function, Serializable {

166

void iterateNeighbors(

167

Iterable<Tuple3<K, Vertex<K, VV>, Edge<K, EV>>> neighbors,

168

Collector<T> out) throws Exception;

169

}

170

```

171

172

#### NeighborsFunctionWithVertexValue<K, VV, EV, T>

173

174

Process neighbors with access to the source vertex value.

175

176

```java { .api }

177

public interface NeighborsFunctionWithVertexValue<K, VV, EV, T> extends Function, Serializable {

178

void iterateNeighbors(

179

Vertex<K, VV> vertex,

180

Iterable<Tuple2<Vertex<K, VV>, Edge<K, EV>>> neighbors,

181

Collector<T> out) throws Exception;

182

}

183

```

184

185

#### Reduce Functions

186

187

Simple reduction operations on edges and neighbors.

188

189

```java { .api }

190

public interface ReduceEdgesFunction<EV> extends Function, Serializable {

191

EV reduceEdges(EV firstEdgeValue, EV secondEdgeValue) throws Exception;

192

}

193

194

public interface ReduceNeighborsFunction<VV> extends Function, Serializable {

195

VV reduceNeighbors(VV firstNeighborValue, VV secondNeighborValue) throws Exception;

196

}

197

```

198

199

**Usage Example:**

200

201

```java

202

// Reduce edge weights to find maximum outgoing edge weight per vertex

203

DataSet<Tuple2<Long, Double>> maxWeights = graph.reduceOnEdges(

204

new ReduceEdgesFunction<Double>() {

205

@Override

206

public Double reduceEdges(Double first, Double second) {

207

return Math.max(first, second);

208

}

209

},

210

EdgeDirection.OUT

211

);

212

213

// Reduce neighbor values (concatenate strings)

214

DataSet<Tuple2<Long, String>> concatenatedNeighbors = graph.reduceOnNeighbors(

215

new ReduceNeighborsFunction<String>() {

216

@Override

217

public String reduceNeighbors(String first, String second) {

218

return first + "_" + second;

219

}

220

},

221

EdgeDirection.ALL

222

);

223

```

224

225

### Graph Joins

226

227

Join graph components with external DataSets.

228

229

#### Vertex Joins

230

231

Join vertices with external data.

232

233

```java { .api }

234

public <T> Graph<K, VV, EV> joinWithVertices(

235

DataSet<Tuple2<K, T>> inputDataSet,

236

VertexJoinFunction<VV, T> vertexJoinFunction)

237

238

public <T> Graph<K, T, EV> joinWithVertices(

239

DataSet<Tuple2<K, T>> inputDataSet,

240

VertexJoinFunction<VV, T> vertexJoinFunction,

241

boolean keepUnmatchedVertices)

242

```

243

244

#### Edge Joins

245

246

Join edges with external data.

247

248

```java { .api }

249

public <T> Graph<K, VV, EV> joinWithEdges(

250

DataSet<Tuple3<K, K, T>> inputDataSet,

251

EdgeJoinFunction<EV, T> edgeJoinFunction)

252

253

public <T> Graph<K, VV, T> joinWithEdges(

254

DataSet<Tuple3<K, K, T>> inputDataSet,

255

EdgeJoinFunction<EV, T> edgeJoinFunction,

256

boolean keepUnmatchedEdges)

257

```

258

259

#### Join Function Interfaces

260

261

```java { .api }

262

public interface VertexJoinFunction<VV, T> extends Function, Serializable {

263

VV vertexJoin(VV vertexValue, T inputValue) throws Exception;

264

}

265

266

public interface EdgeJoinFunction<EV, T> extends Function, Serializable {

267

EV edgeJoin(EV edgeValue, T inputValue) throws Exception;

268

}

269

```

270

271

**Usage Example:**

272

273

```java

274

// External vertex data

275

DataSet<Tuple2<Long, Integer>> vertexAges = env.fromElements(

276

new Tuple2<>(1L, 25),

277

new Tuple2<>(2L, 30),

278

new Tuple2<>(3L, 35)

279

);

280

281

// Join with vertices to add age information

282

Graph<Long, String, Double> enrichedGraph = graph.joinWithVertices(

283

vertexAges,

284

new VertexJoinFunction<String, Integer>() {

285

@Override

286

public String vertexJoin(String name, Integer age) {

287

return name + "_age" + age;

288

}

289

}

290

);

291

292

// External edge data

293

DataSet<Tuple3<Long, Long, String>> edgeLabels = env.fromElements(

294

new Tuple3<>(1L, 2L, "friendship"),

295

new Tuple3<>(2L, 3L, "colleague")

296

);

297

298

// Join with edges to add labels

299

Graph<Long, String, String> labeledGraph = enrichedGraph.joinWithEdges(

300

edgeLabels,

301

new EdgeJoinFunction<Double, String>() {

302

@Override

303

public String edgeJoin(Double weight, String label) {

304

return label + "_" + weight;

305

}

306

}

307

);

308

```

309

310

### Graph Validation

311

312

Validate graph structure and properties.

313

314

```java { .api }

315

public Graph<K, VV, EV> validate(GraphValidator<K, VV, EV> validator) throws Exception

316

317

public interface GraphValidator<K, VV, EV> extends Serializable {

318

boolean validate(Graph<K, VV, EV> graph) throws Exception;

319

}

320

```

321

322

**Usage Example:**

323

324

```java

325

// Custom validator to check for self-loops

326

GraphValidator<Long, String, Double> noSelfLoopValidator =

327

new GraphValidator<Long, String, Double>() {

328

@Override

329

public boolean validate(Graph<Long, String, Double> graph) throws Exception {

330

DataSet<Edge<Long, Double>> selfLoops = graph.getEdges()

331

.filter(edge -> edge.getSource().equals(edge.getTarget()));

332

333

long selfLoopCount = selfLoops.count();

334

return selfLoopCount == 0;

335

}

336

};

337

338

// Validate graph

339

Graph<Long, String, Double> validatedGraph = graph.validate(noSelfLoopValidator);

340

```

341

342

### Utility Classes

343

344

Conversion utilities for different data formats.

345

346

#### Tuple Conversion Utilities

347

348

```java { .api }

349

// Convert between tuples and graph types

350

public class Tuple2ToVertexMap<K, VV> implements MapFunction<Tuple2<K, VV>, Vertex<K, VV>>

351

public class Tuple3ToEdgeMap<K, EV> implements MapFunction<Tuple3<K, K, EV>, Edge<K, EV>>

352

public class VertexToTuple2Map<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<K, VV>>

353

public class EdgeToTuple3Map<K, EV> implements MapFunction<Edge<K, EV>, Tuple3<K, K, EV>>

354

```

355

356

**Usage Example:**

357

358

```java

359

// Convert graph to tuple format for export

360

DataSet<Tuple2<Long, String>> vertexTuples = graph.getVertices()

361

.map(new VertexToTuple2Map<Long, String>());

362

363

DataSet<Tuple3<Long, Long, Double>> edgeTuples = graph.getEdges()

364

.map(new EdgeToTuple3Map<Long, Double>());

365

366

// Export to CSV

367

vertexTuples.writeAsCsv("vertices.csv");

368

edgeTuples.writeAsCsv("edges.csv");

369

370

// Import from tuples

371

DataSet<Vertex<Long, String>> importedVertices = vertexTuples

372

.map(new Tuple2ToVertexMap<Long, String>());

373

374

DataSet<Edge<Long, Double>> importedEdges = edgeTuples

375

.map(new Tuple3ToEdgeMap<Long, Double>());

376

377

Graph<Long, String, Double> importedGraph = Graph.fromDataSet(

378

importedVertices, importedEdges, env);

379

```

380

381

### Integration with Flink DataSet API

382

383

Seamless integration with Flink's DataSet operations.

384

385

**Usage Example:**

386

387

```java

388

// Use Flink operations on graph components

389

DataSet<Vertex<Long, String>> vertices = graph.getVertices();

390

391

// Standard DataSet operations

392

DataSet<Vertex<Long, String>> filteredVertices = vertices

393

.filter(vertex -> vertex.getValue().length() > 3)

394

.map(vertex -> new Vertex<>(vertex.getId(), vertex.getValue().toUpperCase()));

395

396

// Group operations

397

DataSet<Tuple2<String, Long>> vertexGroups = vertices

398

.map(vertex -> new Tuple2<>(vertex.getValue().substring(0, 1), 1L))

399

.groupBy(0)

400

.sum(1);

401

402

// Join with other DataSets

403

DataSet<Tuple2<Long, Integer>> externalData = /* ... */;

404

DataSet<Tuple3<Long, String, Integer>> joined = vertices

405

.join(externalData)

406

.where("f0").equalTo("f0")

407

.with((vertex, data) -> new Tuple3<>(vertex.getId(), vertex.getValue(), data.f1));

408

409

// Create new graph from processed data

410

Graph<Long, String, Double> processedGraph = Graph.fromDataSet(

411

filteredVertices, graph.getEdges(), env);

412

```

413

414

### Performance Optimization

415

416

#### Caching Graph Components

417

418

```java

419

// Cache frequently accessed components

420

DataSet<Vertex<Long, String>> vertices = graph.getVertices().cache();

421

DataSet<Edge<Long, Double>> edges = graph.getEdges().cache();

422

423

// Reuse cached data

424

DataSet<String> values1 = vertices.map(v -> v.getValue());

425

DataSet<String> values2 = vertices.map(v -> v.getValue().toUpperCase());

426

```

427

428

#### Efficient Data Types

429

430

```java

431

// Use value types for better performance

432

Graph<LongValue, StringValue, DoubleValue> efficientGraph = /* ... */;

433

434

// Avoid boxed primitives for large-scale operations

435

Graph<Long, String, Double> lessEfficientGraph = /* ... */;

436

```

437

438

#### Parallel Processing

439

440

```java

441

// Configure parallelism for data access operations

442

env.setParallelism(8);

443

444

// Operations will use configured parallelism

445

DataSet<Vertex<Long, String>> vertices = graph.getVertices(); // Uses parallelism 8

446

```