Flink Gelly
Gelly is Flink’s Graph API. It contains a set of methods and utilities designed to simplify the development of graph analysis applications in Flink. In Gelly, graphs can be converted and modified using advanced functions similar to those provided by the batch API. gelly provides methods for creating, converting and modifying graphs, as well as a library of graph algorithms.
Load Graph
In Gelly, a Graph is consist of DataSet vertices and DataSet Edges. Vertex is implement based on Tuple2. Edge is implement based on Tuple3. A Vertex is defined by a unique ID and a value. Vertex IDs should implement the Comparable interface. Vertices without value can be represented by setting the value type to NullValue. An Edge is defined by a source ID (the ID of the source Vertex), a target ID (the ID of the target Vertex) and an optional value. The source and target IDs should be of the same type as the Vertex IDs. Edges with no value have a NullValue value type. (https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/gelly/graph_api/).
Graph can be loaded based on DataSet<Vertex<K, VV>> or fromTupleDataSet. To make it easier and faster to load graphs, we can use csv file to load the graph.
1 | ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); |
The data graph format shown below.
For vertices file:
1 | 1,1 |
For edges file:
1 | 1,2,0 |
Tips
When load the graph with generic type. For example, use fatMap to load the graph. It is possible that you may encounter the following problems:
1 | Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'K' in 'class org.dkr.graph.io.utils.VertexSplitter' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). Otherwise the type has to be specified explicitly using type information. |
When this problem is encountered, it can be solved by declaring the return data type. For example:
1 | DataSet<Tuple2<K, VV>> vertices = data |
Another weird issue is that if you use Edge<K, EV> to load edges data, you only have two data types inside your Edge Object (vertex ID type and edge value type). This problem cannot be solved by using returns function. This issue will cause some functions to be unusable, such as graph.inDegrees(). You can use Tuple<K, K, EV> instead of Edge<K, EV> to solve this issue.
Basic Graph Algorithms
BFS
There are two approaches two deisgn the BFS algorithm in Gelly. We can use runVertexCentricIteration or runScatterGatherIteration.
runVertexCentricIteration Approach:
1 | import org.apache.flink.api.common.functions.MapFunction; |
runScatterGatherIteration Approach:
1 | import org.apache.flink.api.common.functions.MapFunction; |
For the usage of these functions, please refer to doc
K-core Decomposition
A k-core of G can therefore be obtained by recursively removing all the vertices of degree less than k, until
all vertices in the remaining graph have at least degree k (https://hal.archives-ouvertes.fr/hal-00004807v2/document)
1 |
|
Connected Component
Connected component detection in Flink Gelly:
1 |
|
Single Source Shortest Path
Given a source vertex, find the shortest path to its neighbor:
1 |
|