Flink Gelly

Flink Gelly

Gelly is Flink’s Graph API. It contains a set of methods and utilities designed to simplify the development of graph analysis applications in Flink. In Gelly, graphs can be converted and modified using advanced functions similar to those provided by the batch API. gelly provides methods for creating, converting and modifying graphs, as well as a library of graph algorithms.

Load Graph

In Gelly, a Graph is consist of DataSet vertices and DataSet Edges. Vertex is implement based on Tuple2. Edge is implement based on Tuple3. A Vertex is defined by a unique ID and a value. Vertex IDs should implement the Comparable interface. Vertices without value can be represented by setting the value type to NullValue. An Edge is defined by a source ID (the ID of the source Vertex), a target ID (the ID of the target Vertex) and an optional value. The source and target IDs should be of the same type as the Vertex IDs. Edges with no value have a NullValue value type. (https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/gelly/graph_api/).

Graph can be loaded based on DataSet<Vertex<K, VV>> or fromTupleDataSet. To make it easier and faster to load graphs, we can use csv file to load the graph.

1
2
3
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromCsvReader("data\\taro.v", "data\\taro.e", env)
.types(Long.class, Long.class, Long.class);

The data graph format shown below.

For vertices file:

1
2
3
4
5
1,1
2,2
3,3
4,4
...

For edges file:

1
2
3
4
5
1,2,0
1,3,0
1,4,0
2,3,0
...

Tips

When load the graph with generic type. For example, use fatMap to load the graph. It is possible that you may encounter the following problems:

1
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'K' in 'class org.dkr.graph.io.utils.VertexSplitter' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). Otherwise the type has to be specified explicitly using type information.

When this problem is encountered, it can be solved by declaring the return data type. For example:

1
2
3
DataSet<Tuple2<K, VV>> vertices = data
.flatMap(new FlatMapFunction())
.returns(Types.TUPLE(TypeInformation.of(dataType), TypeInformation.of(dataType)))

Another weird issue is that if you use Edge<K, EV> to load edges data, you only have two data types inside your Edge Object (vertex ID type and edge value type). This problem cannot be solved by using returns function. This issue will cause some functions to be unusable, such as graph.inDegrees(). You can use Tuple<K, K, EV> instead of Edge<K, EV> to solve this issue.

Basic Graph Algorithms

BFS

There are two approaches two deisgn the BFS algorithm in Gelly. We can use runVertexCentricIteration or runScatterGatherIteration.

runVertexCentricIteration Approach:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.pregel.ComputeFunction;
import org.apache.flink.graph.pregel.MessageCombiner;
import org.apache.flink.graph.pregel.MessageIterator;

public class BFS {
private Long src = null;
private int maxInteractions = 0;

public BFSV(int maxInteractions, Long src) {
this.src = src;
this.maxInteractions = maxInteractions;
}

public Graph<Long, Long, Long> run(Graph<Long, Long, Long> graph) {

return graph
.mapVertices(new InitVertex(this.src))
.runVertexCentricIteration(new VertexComputeFunction(this.src), new VertexValueCombiner(), this.maxInteractions);
}

public static final class InitVertex implements MapFunction<Vertex<Long, Long>, Long> {
private final Long src;
public InitVertex(Long src) {
this.src = src;
}

@Override
public Long map(Vertex<Long, Long> value) throws Exception {
if (value.getId().equals(this.src)) return 1L;
return 0L;
}

}

/**
* send message and update the values.
*/
public static final class VertexComputeFunction extends ComputeFunction<Long, Long, Long, Long> {

private final Long src;
public VertexComputeFunction(Long src) {
this.src = src;
}

@Override
public void compute(Vertex<Long, Long> vertex, MessageIterator<Long> messages) throws Exception {

if (vertex.getId().equals(this.src)) {
for (Edge<Long, Long> edge : getEdges()) {
sendMessageTo(edge.getTarget(), vertex.getValue() + 1L);
}
} else {

Long minDistance = Long.MAX_VALUE;
for (Long msg : messages) {
if (msg < minDistance) {
minDistance = msg;
}
}

// does not receive any new values
if (minDistance == Long.MAX_VALUE) return;

if (vertex.getValue() > minDistance || vertex.getValue() == 0L) {
setNewVertexValue(minDistance);

for (Edge<Long, Long> edge : getEdges()) {
// can not use vertex value to update.
sendMessageTo(edge.getTarget(), minDistance + 1L);
}
}
}
}
}

/**
* combiner for vertex's message, choose the smallest one
*/
public static final class VertexValueCombiner extends MessageCombiner<Long, Long> {

@Override
public void combineMessages(MessageIterator<Long> messages) throws Exception {
Long minDistance = Long.MAX_VALUE;
for (Long msg : messages) {
if (msg < minDistance) {
minDistance = msg;
}
}

sendCombinedMessage(minDistance);
}
}
}

runScatterGatherIteration Approach:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.ScatterFunction;

public class BFS {
private Long src = null;
private int maxInteractions = 0;

public BFS(int maxInteractions, Long src) {
this.src = src;
this.maxInteractions = maxInteractions;
}

/**
* return a new graph with the bfs traversal order
* @param graph input data graph
* @return a new graph
*/
public Graph<Long, Long, Long> run(Graph<Long, Long, Long> graph) {

return graph
.mapVertices(new InitVertex(this.src))
.runScatterGatherIteration(new MapOrderMessenger(), new VertexOrderUpdater(), this.maxInteractions);
}

/**
* init the vertex value, the vertex value present the traversal order
*/
public static final class InitVertex implements MapFunction<Vertex<Long, Long>, Long> {
private final Long src;
public InitVertex(Long src) {
this.src = src;
}

@Override
public Long map(Vertex<Long, Long> value) throws Exception {
if (value.getId().equals(this.src)) return 1L;
return 0L;
}
}

/**
* traversal the vertex, if it has been visited, then send the traversal order to its neighbor
*/
public static final class MapOrderMessenger extends ScatterFunction<Long, Long, Long, Long> {

@Override
public void sendMessages(Vertex<Long, Long> vertex) throws Exception {
if (vertex.getValue() > 0L) {
for (Edge<Long, Long> edge : getEdges()) {
sendMessageTo(edge.getTarget(), vertex.getValue() + 1L);
}
}
}
}

/**
* based on the received message, then update the vertex value
* since the processing is parallel, the received value can be multiple time, choose the smallest one
*/
public static final class VertexOrderUpdater extends GatherFunction<Long, Long, Long> {

@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {

Long minDistance = Long.MAX_VALUE;
for (Long msg : inMessages) {
if (msg < minDistance) {
minDistance = msg;
}
}

if (vertex.getValue() > minDistance || vertex.getValue() == 0L) {
setNewVertexValue(minDistance);
}
}
}
}

For the usage of these functions, please refer to doc

K-core Decomposition

A k-core of G can therefore be obtained by recursively removing all the vertices of degree less than k, until
all vertices in the remaining graph have at least degree k (https://hal.archives-ouvertes.fr/hal-00004807v2/document)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.VertexJoinFunction;
import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.ScatterFunction;
import org.apache.flink.types.LongValue;

import java.util.HashMap;
import java.util.Objects;

public class KCore {
private final Long k;
private int maxInteractions = 0;

public KCore(int maxInteractions, Long k) {
this.k = k;
this.maxInteractions = maxInteractions;
}

public Graph<Long, Long, Long> run(Graph<Long, Long, Long> graph) {
// set the new values
return graph
.joinWithVertices(graph.inDegrees(), new VertexUpdateFunction())
.runScatterGatherIteration(new VertexDegreeMessenger(this.k), new VertexDegreeUpdater(), this.maxInteractions)
.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() {
@Override
public boolean filter(Vertex<Long, Long> value) throws Exception {
return value.getValue() > 0;
}
});
}


/**
* based on the degree value to update the vertex value
*/
public static final class VertexUpdateFunction implements VertexJoinFunction<Long, LongValue> {
@Override
public Long vertexJoin(Long vertexValue, LongValue inputValue) throws Exception {
return Long.valueOf(String.valueOf(inputValue));
}
}


/**
* if the vertex value is less than k, then set the vertex value as 0
* and the send the indicator to message
*/
public static final class VertexDegreeMessenger extends ScatterFunction<Long, Long, String, Long> {

private final Long k;
public VertexDegreeMessenger(Long k) {
this.k = k;
}

@Override
public void sendMessages(Vertex<Long, Long> vertex) throws Exception {

if (vertex.getValue() < this.k) {
for (Edge<Long, Long> edge : getEdges()) {
sendMessageTo(edge.getTarget(), vertex.getId() + " " + edge.getTarget());
}

sendMessageTo(vertex.getId(), "0");
}
}
}

/**
* based on the received message, to check whether the degree has been updated
* if not updated, then update the degree by minus 1. if the status is 0, then set the degree as 0
*/
public static final class VertexDegreeUpdater extends GatherFunction<Long, Long, String> {

private final HashMap<String, Boolean> map;

public VertexDegreeUpdater() {
this.map = new HashMap<>();
}

@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<String> inMessages) throws Exception {
for (String msg : inMessages) {
if (Objects.equals(msg, "0")) {
setNewVertexValue(0L);
break;
}
if (map.containsKey(msg)) continue;
map.put(msg, true);

setNewVertexValue(vertex.getValue() - 1);
break;
}
}
}
}

Connected Component

Connected component detection in Flink Gelly:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.ScatterFunction;

/**
* Connected component detection
*/
public class CC {
private int maxInteractions = 0;

public CC(int maxInteractions) {
this.maxInteractions = maxInteractions;
}


public Graph<Long, Long, Long> run(Graph<Long, Long, Long> graph) {

return graph
.mapVertices(new InitVertex())
.runScatterGatherIteration(
new MapOrderMessenger(),
new VertexOrderUpdater(),
this.maxInteractions);

}

public static final class InitVertex implements MapFunction<Vertex<Long, Long>, Long> {
@Override
public Long map(Vertex<Long, Long> value) throws Exception {
return 0L;
}
}

/**
* the first visited vertex id will be the root of the connect component
*/
public static final class MapOrderMessenger extends ScatterFunction<Long, Long, Long, Long> {

@Override
public void sendMessages(Vertex<Long, Long> vertex) throws Exception {
for (Edge<Long, Long> edge : getEdges()) {
sendMessageTo(edge.getTarget(), vertex.getValue());
}
sendMessageTo(vertex.getId(), vertex.getId());
}
}


/**
* choose the root of the connect component
*/
public static final class VertexOrderUpdater extends GatherFunction<Long, Long, Long> {

@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {

Long minDistance = Long.MAX_VALUE;
for (Long msg : inMessages) {
if (msg < minDistance) {
minDistance = msg;
}
}

if (vertex.getValue() > minDistance || vertex.getValue() == 0L) {
setNewVertexValue(minDistance);
}
}
}
}

Single Source Shortest Path

Given a source vertex, find the shortest path to its neighbor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.ScatterFunction;

/**
* single source shortest path
*/
public class SSSP {
private int maxInteractions = 0;
private Long src = null;

public SSSP(int maxInteractions, Long src) {
this.src = src;
this.maxInteractions = maxInteractions;
}

public Graph<Long, Long, Long> run(Graph<Long, Long, Long> graph) {

return graph
.mapVertices(new InitVertex(this.src))
.runScatterGatherIteration(
new MapOrderMessenger(),
new VertexOrderUpdater(),
this.maxInteractions)
.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() {
@Override
public boolean filter(Vertex<Long, Long> value) throws Exception {
return value.getValue() != Long.MAX_VALUE;
}
});

}


public static final class InitVertex implements MapFunction<Vertex<Long, Long>, Long> {
private final Long src;
public InitVertex(Long src) {
this.src = src;
}

@Override
public Long map(Vertex<Long, Long> value) throws Exception {
if (value.getId().equals(this.src)) return 0L;
return Long.MAX_VALUE;
}
}

public static final class MapOrderMessenger extends ScatterFunction<Long, Long, Long, Long> {

@Override
public void sendMessages(Vertex<Long, Long> vertex) throws Exception {

if (vertex.getValue() != Long.MAX_VALUE) {
for (Edge<Long, Long> edge : getEdges()) {
sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
}
}
}
}


/**
* choose the root of the connect component
*/
public static final class VertexOrderUpdater extends GatherFunction<Long, Long, Long> {

@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {

Long minDistance = Long.MAX_VALUE;
for (Long msg : inMessages) {
if (msg < minDistance) {
minDistance = msg;
}
}

if (vertex.getValue() > minDistance) {
setNewVertexValue(minDistance);
}
}
}
}
----- End Thanks for reading-----