MapReduce学生成绩(一)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
computer,huangxiaoming,85
computer,xuzheng,54
computer,huangbo,86
computer,liutao,85
computer,huanglei,99
computer,liujialing,85
computer,liuyifei,75
computer,huangdatou,48
computer,huangjiaju,88
computer,huangzitao,85
english,zhaobenshan,57
english,liuyifei,85
english,liuyifei,76
english,huangdatou,48
english,zhouqi,85
english,huangbo,85
english,huangxiaoming,96
english,huanglei,85
english,liujialing,75
algorithm,liuyifei,75
algorithm,huanglei,76
algorithm,huangjiaju,85
algorithm,liutao,85
algorithm,huangdou,42
algorithm,huangzitao,81
math,wangbaoqiang,85
math,huanglei,76
math,huangjiaju,85
math,liutao,48
math,xuzheng,54
math,huangxiaoming,85
math,liujialing,85

以上所有的是数据,该数据每行有三个字段值,分别是course,name,score

现在求:需求1:每一个course的最高分,最低分,平均分

返回结果格式:
course 95 22 55
例子:
computer 99 48 75

解题思路:

对于要求每门课程的最高分,最低分,平均分,这其实就是简单的按照课程分组,然后对分数做max,min,avg的聚合操作。

对于需求来说,并不复杂

对于mapper阶段,输出的key-value分别是:

key: 课程 course

value: 分数 score

对于reducer阶段,reduce方法接收的参数是:

key: 课程 course

values: 某一门课程对应的所有的score的一个迭代器

对于任何一个MapReduce程序来说,清楚的知道mapper阶段和reducer阶段的输入和输出是什么,这是写出mapreduce程序的关键

具体看代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.xiaojia.stu;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class StuMapper extends Mapper<LongWritable, Text,Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line=value.toString();
String[] split=line.split(",");
context.write(new Text(split[0]),new Text(split[2]));
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.xiaojia.stu;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class StuReduer extends Reducer<Text, Text,Text,Text> {
List<Integer> list= new ArrayList<Integer>();

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
list.clear();
for (Text value:
values) {
list.add(Integer.parseInt(value.toString()));
}
Integer max = Collections.max(list);
Integer min = Collections.min(list);
int sum=0;
for (Integer l:
list) {
sum+=l;
}
int avg=sum/list.size();
context.write(key,new Text(max+","+min+","+avg));
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.xiaojia.stu;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class StuDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
conf.set("fs.defaultFS","hdfs://hadoop01:9000");
FileSystem fs=FileSystem.get(conf);
Job job=Job.getInstance(conf);
job.setJarByClass(StuDriver.class);
job.setMapperClass(StuMapper.class);
job.setReducerClass(StuReduer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path input=new Path(args[0]);
Path output=new Path(args[1]);
if(fs.exists(output)){
fs.delete(output,true);
}
FileInputFormat.setInputPaths(job,input);
FileOutputFormat.setOutputPath(job,output);
boolean res=job.waitForCompletion(true);
System.exit(res?0:1);
}
}

最后的执行结果:

1
2
3
4
algorithm	85	42	74
computer 99 48 79
english 96 48 77
math 85 48 74