MapReduce序列化案例

输入数据
1
2
3
id	手机号				ip				网址		上行流量	下行流量	请求状态
1 14530531269 102.132.54.44 www.ldbzo.ltd 1431 1360 404
2 17690836832 88.58.44.150 ftp.j.icu 1135 718 302
FlowBean
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package flowsum;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable {
private long upFlow;//上行流量
private long downFlow;//下行流量
private long sumFlow;//总流量

public FlowBean() {
}//无参构造方法

public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
sumFlow = upFlow + downFlow;
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getDownFlow() {
return downFlow;
}

public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

//序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

//反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
upFlow=in.readLong();
downFlow=in.readLong();
sumFlow=in.readLong();
}

public void set(long upFlow,long downFlow){
this.upFlow=upFlow;
this.downFlow=downFlow;
sumFlow=upFlow+downFlow;
}

@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}

FlowCountMapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package flowsum;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowCountMapper extends Mapper<LongWritable,Text,Text, FlowBean> {
Text k=new Text();
FlowBean v=new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line=value.toString();
String[] fields=line.split("\t");
k.set(fields[1]);
long upFlow=Long.parseLong(fields[fields.length-3]);
long downFlow=Long.parseLong(fields[fields.length-2]);
v.setUpFlow(upFlow);
v.setDownFlow(downFlow);
context.write(k,v);

}
}

FlowCountReducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package flowsum;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
FlowBean flowBean=new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long sum_upFlow=0;
long sum_downFlow=0;
for(FlowBean flowBean:values){
sum_upFlow+=flowBean.getUpFlow();
sum_downFlow+=flowBean.getDownFlow();
}
flowBean.set(sum_upFlow,sum_downFlow);
context.write(key,flowBean);
}
}

FlowCountDriver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package flowsum;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class FlowCountDriver {
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(FlowCountDriver.class);
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileSystem fs=FileSystem.get(conf);
Path out=new Path("d://test/output");
if(fs.exists(out)){
fs.delete(out,true);
}
FileInputFormat.setInputPaths(job,new Path("d://test/input/*"));
FileOutputFormat.setOutputPath(job,out);
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}

结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
13004073160	1494	75	1569
13022901544 490 1739 2229
13024422212 1281 1394 2675
13025000634 874 863 1737
13044775816 1496 1644 3140
13045176110 1587 1603 3190
13045696799 1617 1690 3307
13046838963 166 1480 1646
13049614982 1159 1096 2255
13058945986 355 1228 1583
13067031190 59 523 582
13068667095 1856 363 2219
13080199681 963 156 1119
13083433223 1943 574 2517
13087093131 730 231 961
13089835861 104 115 219
13120345600 403 1317 1720
13122159620 857 200 1057
13127085035 1683 1634 3317
13133388281 248 1916 2164