Flink Gelly
Gelly is Flink’s Graph API. It contains a set of methods and utilities designed to simplify the development of graph analysis applications in Flink. In Gelly, graphs can be converted and modified using advanced functions similar to those provided by the batch API. gelly provides methods for creating, converting and modifying graphs, as well as a library of graph algorithms.
Load Graph
In Gelly, a Graph
is consist of DataSet
vertices and DataSet
Edges. Vertex
is implement based on Tuple2
. Edge
is implement based on Tuple3
. A Vertex
is defined by a unique ID and a value. Vertex IDs should implement the Comparable interface. Vertices without value can be represented by setting the value type to NullValue. An Edge
is defined by a source ID (the ID of the source Vertex), a target ID (the ID of the target Vertex) and an optional value. The source and target IDs should be of the same type as the Vertex IDs. Edges with no value have a NullValue value type. (https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/gelly/graph_api/).
Graph
can be loaded based on DataSet<Vertex<K, VV>>
or fromTupleDataSet
. To make it easier and faster to load graphs, we can use csv
file to load the graph.
1 | ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); |
The data graph format shown below.
For vertices file:
1 | 1,1 |
For edges file:
1 | 1,2,0 |
Tips
When load the graph with generic type. For example, use fatMap
to load the graph. It is possible that you may encounter the following problems:
1 | Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'K' in 'class org.dkr.graph.io.utils.VertexSplitter' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). Otherwise the type has to be specified explicitly using type information. |
When this problem is encountered, it can be solved by declaring the return data type. For example:
1 | DataSet<Tuple2<K, VV>> vertices = data |
Another weird issue is that if you use Edge<K, EV>
to load edges data, you only have two data types inside your Edge Object (vertex ID type and edge value type). This problem cannot be solved by using returns
function. This issue will cause some functions to be unusable, such as graph.inDegrees()
. You can use Tuple<K, K, EV> instead of Edge<K, EV> to solve this issue.
Basic Graph Algorithms
BFS
There are two approaches two deisgn the BFS
algorithm in Gelly
. We can use runVertexCentricIteration
or runScatterGatherIteration
.
runVertexCentricIteration
Approach:
1 | import org.apache.flink.api.common.functions.MapFunction; |
runScatterGatherIteration
Approach:
1 | import org.apache.flink.api.common.functions.MapFunction; |
For the usage of these functions, please refer to doc
K-core Decomposition
A k-core of G can therefore be obtained by recursively removing all the vertices of degree less than k, until
all vertices in the remaining graph have at least degree k (https://hal.archives-ouvertes.fr/hal-00004807v2/document)
1 |
|
Connected Component
Connected component detection in Flink Gelly:
1 |
|
Single Source Shortest Path
Given a source vertex, find the shortest path to its neighbor:
1 |
|