Hadoop Partition

Hadoop Partition And Secondary Sort

This article will describe an implementation and process of Hadoop partitioning and secondary sorting. First we need to briefly understand the workflow of Hadoop map-reduce task. In the previous article, we briefly explained the operation mechanism of Hadoop. The mapper task will read the file and process the data, after processing the mapper task will pass the result to the shuffle. shuffle will sort and partition all the map task results, and then will pass the partitioned and sorted data to the reducer, the key-value pairs with the same key will be passed to the same reducer.

The default distribution method used by hadoop is based on the hash value, but in practice, this is not very efficient or performs the task as we require. For example, after partitioning, one node’s reducer is assigned 20 records while the other is assigned 10 million records, so imagine how inefficient this is. Or, we want to process the output file according to a certain rule, assuming there are two reducers, we want the final result to store the results of records starting with “a” in part-00000, and other results in part-00001, the default partitioner is unable to do. So we need to customize our own partition to choose the reducer of records according to our own requirements.

Why partition

We want to use Hadoop to get a ordered file by using a partition (because it will be sorted before partitioning). However if the input is huge (e.g. 10 GB), the performance of reducer is pretty inefficient which completely losing the advantage of the parallel architecture provided by MapReduce. We can use a partitioner to sort the data, so that when the reducer processes the data, it can be guaranteed that the data is already sorted. One partition corresponds to one reducer, so all the output of the reducer is ordered, we just need to merge all the output files, then the total file must be ordered.

Hadoop Streaming Partition

1
2
3
4
5
6
7
8
9
10
11
12
mapred streaming \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
-D stream.map.output.field.separator=, \
-D stream.num.map.output.key.fields=2 \
-D mapreduce.map.output.key.field.separator=,\
-D mapreduce.partition.keycomparator.options=-k2,2nr \
-D mapreduce.job.reduces=2\
-input input
-output output \
-mapper /bin/cat \
-reducer /bin/cat \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
1
2
3
4
5
6
7
8
9
10
-D map.output.key.field.separator=, means the separator for the key is “,”
-D stream.num.map.output.key.fields=2 means the prefix up to the second “,” in a line will be the key and the rest of the line (excluding the second “,”) will be the value.

e.g.:
1,2,3 then 'map.output.key.field.separator' split the input by , and 'stream.num.map.output.key.fields=2' will set (1,2) is the key and 3 is the value

-D map.output.key.field.separator=, means the separator for the key is also “,”

-D mapreduce.partition.keypartitioner.options=-k1,2” means
MapReduce will partition the map outputs by the first two fields of the keys

For example, the input files is like this, name (input.txt)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
muse,curious,1
muse,days,2
muse,do,3
muse,growing,4
muse,grown,5
muse,in,1
cat,invent,2
muse,more,3
muse,please,4
cat,subject,5
cat,ten,2
muse,these,3
muse,this,2
muse,times,5
muse,to,6
cat,want,8
muse,with,9
muse,worth,0
muse,*,10
muse,age,12

Put this file into hdfs:

1
hdfs dfs -put input.txt input

We partition by two different ranges of keys (mapreduce.partition.keypartitioner.options) (e.g.: key is (muse, curious) value is 1), and we get two different results.

The first command is partition based the first filed in the key.

1
2
3
4
5
6
7
8
9
10
11
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-D stream.map.output.field.separator=, \
-D stream.num.map.output.key.fields=2 \
-D map.output.key.field.separator=, \
-D mapreduce.partition.keypartitioner.options=-k1,1 \
-D mapreduce.job.reduces=3 \
-input input/test.txt \
-output ou2 \
-mapper /bin/cat \
-reducer /bin/cat \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

The first reducer gets:

1
2
3
4
cat,invent	2
cat,subject 5
cat,ten 2
cat,want 8

The second reducer gets:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
muse,*	10
muse,age 12
muse,curious 1
muse,days 2
muse,do 3
muse,growing 4
muse,grown 5
muse,in 1
muse,more 3
muse,please 4
muse,these 3
muse,this 2
muse,times 5
muse,to 6
muse,with 9
muse,worth 0

The last reducer is empty. With the result of the reducer we get the desired partition.

The first command is partition based the second filed in the key.

1
2
3
4
5
6
7
8
9
10
11
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-D stream.map.output.field.separator=, \
-D stream.num.map.output.key.fields=2 \
-D map.output.key.field.separator=, \
-D mapreduce.partition.keypartitioner.options=-k2,2 \
-D mapreduce.job.reduces=3 \
-input input/test.txt \
-output ou1 \
-mapper /bin/cat \
-reducer /bin/cat \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

Reducer 1:

1
2
3
4
5
6
7
8
cat,subject	5
cat,ten 2
muse,* 10
muse,more 3
muse,these 3
muse,times 5
muse,with 9
muse,worth 0

Reducer 2:

1
2
3
4
5
6
cat,invent	2
cat,want 8
muse,age 12
muse,curious 1
muse,days 2
muse,do 3

Reducer 3:

1
2
3
4
5
6
muse,growing	4
muse,grown 5
muse,in 1
muse,please 4
muse,this 2
muse,to 6

Comparing the two output results we can see that different partition ranges lead to completely different partition results.

MRJob example

By using the given text file, I did a example that in mrjob.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from mrjob.job import MRJob

class Example(MRJob):
"""
the number of reducer is 1,
then we can use map separator
"""

def mapper(self, _, line):
words = line.split(',')

yield words[0] "," words[1], int(words[2])


def reducer(self, key, values):
yield key, sum(values)


SORT_VALUES = True

JOBCONF = {
'map.output.key.field.separator': ',',
'mapred.reduce.tasks': 3,
'mapreduce.partition.keypartitioner.options':'-k1,1',
'partitioner':'org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner',
'mapreduce.partition.keycomparator.options':'-k1,1 -k2,2n',
'mapreduce.job.output.key.comparator.class':'org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator'
}


if __name__ == '__main__':
Example.run()

Run the code:

1
python3 example.py -r hadoop hdfs:///user/xx/example.txt -o hdfs:///user/xx/output

The result of three reducers:

  1. None

  2. 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    muse,*	10
    muse,age 12
    muse,curious 1
    muse,days 2
    muse,do 3
    muse,growing 4
    muse,grown 5
    muse,in 1
    muse,more 3
    muse,please 4
    muse,these 3
    muse,this 2
    muse,times 5
    muse,to 6
    muse,with 9
    muse,worth 0
  3. 1
    2
    3
    4
    cat,invent	2
    cat,subject 5
    cat,ten 2
    cat,want 8

Partition and secondary sort video

----- End Thanks for reading-----