WritableComparable全排序案例

此案例是在MapReduce序列化案例 基础上完成

FlowBean
1
2
3
4
5
6
7
8
9
10
11
12
public class FlowBean implements WritableComparable<FlowBean> {
······
@Override
public int compareTo(FlowBean o) {
if(this.sumFlow!=o.getSumFlow()) {
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}else {
return 0;
}
}
······
}
FlowSortMapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package flowsum3;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowSortMapper extends Mapper<LongWritable,Text,FlowBean, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text v=new Text();

FlowBean k=new FlowBean();

String line=value.toString();
String[] fields=line.split("\t");
v.set(fields[1]);
long upFlow=Long.parseLong(fields[fields.length-3]);
long downFlow=Long.parseLong(fields[fields.length-2]);
k.set(upFlow,downFlow);
context.write(k,v);

}
}

FlowSortReducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package flowsum3;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowSortReducer extends Reducer<FlowBean, Text,Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for(Text value:values){
context.write(value,key);
}
}
}

FlowSortDriver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package flowsum3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class FlowSortDriver {
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(FlowCountDriver.class);
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileSystem fs=FileSystem.get(conf);
Path out=new Path("d://test/output");
if(fs.exists(out)){
fs.delete(out,true);
}
FileInputFormat.setInputPaths(job,new Path("d://test/input/*"));
FileOutputFormat.setOutputPath(job,out);
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}

part-r-00000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
14586865192	1994	1915	3909
14701964954 1991 1871 3862
14793112239 1982 1872 3854
14559149184 1999 1744 3743
18167826533 1863 1874 3737
14543635180 1738 1973 3711
15349358275 1763 1911 3674
13872302605 1892 1763 3655
13640490595 1946 1683 3629
15980678136 1773 1836 3609
15056758327 1775 1824 3599
13929240982 1780 1815 3595
14551050233 1695 1873 3568
13088994565 1665 1902 3567
15984368085 1771 1796 3567
14708731769 1625 1936 3561
13652391923 1776 1772 3548
14551297518 1883 1664 3547
14785890081 1875 1655 3530
18595974724 1752 1777 3529
13681443952 1998 1529 3527
13388838008 1974 1544 3518
14506664448 1932 1583 3515
17733893011 1597 1909 3506
18122118349 1672 1818 3490
13561155968 1608 1877 3485
17696066325 1889 1562 3451
14753787683 1794 1645 3439
14572782882 1880 1552 3432
17742910217 1804 1626 3430