MapReduce学生成绩(五)

题目描述

关于对于学生成绩相关的练习题,之前是一个入门级别的需求,现在对这些需求进行增强,首先看数据的改变:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
computer,huangxiaoming,85,86,41,75,93,42,85
computer,xuzheng,54,52,86,91,42
computer,huangbo,85,42,96,38
english,zhaobenshan,54,52,86,91,42,85,75
english,liuyifei,85,41,75,21,85,96,14
algorithm,liuyifei,75,85,62,48,54,96,15
computer,huangjiaju,85,75,86,85,85
english,liuyifei,76,95,86,74,68,74,48
english,huangdatou,48,58,67,86,15,33,85
algorithm,huanglei,76,95,86,74,68,74,48
algorithm,huangjiaju,85,75,86,85,85,74,86
computer,huangdatou,48,58,67,86,15,33,85
english,zhouqi,85,86,41,75,93,42,85,75,55,47,22
english,huangbo,85,42,96,38,55,47,22
algorithm,liutao,85,75,85,99,66
computer,huangzitao,85,86,41,75,93,42,85
math,wangbaoqiang,85,86,41,75,93,42,85
computer,liujialing,85,41,75,21,85,96,14,74,86
computer,liuyifei,75,85,62,48,54,96,15
computer,liutao,85,75,85,99,66,88,75,91
computer,huanglei,76,95,86,74,68,74,48
english,liujialing,75,85,62,48,54,96,15
math,huanglei,76,95,86,74,68,74,48
math,huangjiaju,85,75,86,85,85,74,86
math,liutao,48,58,67,86,15,33,85
english,huanglei,85,75,85,99,66,88,75,91
math,xuzheng,54,52,86,91,42,85,75
math,huangxiaoming,85,75,85,99,66,88,75,91
math,liujialing,85,86,41,75,93,42,85,75
english,huangxiaoming,85,86,41,75,93,42,85
algorithm,huangdatou,48,58,67,86,15,33,85
algorithm,huangzitao,85,86,41,75,93,42,85,75

一、数据解释

数据字段个数不固定:
第一个是课程名称,总共四个课程,computer,math,english,algorithm,
第二个是学生姓名,后面是每次考试的分数

二、统计需求:

1、统计每门课程的参考人数和课程平均分

2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数

*3、求出每门课程参考学生成绩最高的2个学生的信息:课程,姓名和平均分*

三、解题思路

mapper阶段的输出:

key: CourseScore

value: NullWritable

reducer阶段的输出:

key: CourseScore

value:NullWritable

实现难点:

​ 分组条件(课程) 和 排序规则(课程,成绩)不一致,所以需要自定义分组

自定义分组的代码 CourseScoreGroupComparator.java 在 MR 程序里头

四、代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.xiaojia.stu5;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class StuMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split=value.toString().split(",");
String course=split[0];
int num=0;
int sum=0;
for(int i=2;i<split.length;i++){
sum+=Integer.parseInt(split[i]);
num++;
}
double avg=sum/num;
FlowBean f=new FlowBean();
f.setCourse(course);
f.setName(split[1]);
f.setScore(avg);
context.write(f,NullWritable.get());
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.xiaojia.stu5;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class StuReduer extends Reducer<FlowBean, NullWritable, FlowBean,NullWritable> {
int topN=2;
@Override
protected void reduce(FlowBean flow, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

int number = 0;
for(NullWritable nvl: values){
context.write(flow, nvl);
number ++;
if(number == topN){
break;
}
}


}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.xiaojia.stu5;

import com.xiaojia.stu4.StuPartitioner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class StuDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
conf.set("fs.defaultFS","hdfs://hadoop01:9000");
FileSystem fs=FileSystem.get(conf);
Job job=Job.getInstance(conf);
job.setJarByClass(StuDriver.class);
job.setMapperClass(StuMapper.class);
job.setReducerClass(StuReduer.class);
// job.setPartitionerClass(StuPartitioner.class);
// job.setNumReduceTasks(4);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(FlowBean.class);
job.setOutputValueClass(NullWritable.class);
job.setGroupingComparatorClass(CourseScoreGroupComparator.class);
Path input=new Path(args[0]);
Path output=new Path(args[1]);
if(fs.exists(output)){
fs.delete(output,true);
}
FileInputFormat.setInputPaths(job,input);
FileOutputFormat.setOutputPath(job,output);
boolean res=job.waitForCompletion(true);
System.exit(res?0:1);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.xiaojia.stu5;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class CourseScoreGroupComparator extends WritableComparator {

CourseScoreGroupComparator(){
super(FlowBean.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {

FlowBean cs1 = (FlowBean)a;
FlowBean cs2 = (FlowBean)b;

int result = cs1.getCourse().compareTo(cs2.getCourse());
return result;
}
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.xiaojia.stu5;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {

private String course;
private String name;
private double score;

public FlowBean() {
}

public FlowBean(String course, String name, double score) {
this.course = course;
this.name = name;
this.score = score;
}

public String getCourse() {
return course;
}

public void setCourse(String course) {
this.course = course;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public double getScore() {
return score;
}

public void setScore(double score) {
this.score = score;
}

public int compareTo(FlowBean o) {
int compareTo = this.course.compareTo(o.getCourse());

if (compareTo == 0) {
double diff = o.getScore() - this.score;
if (diff >
0) {
return 1;
} else if (diff < 0) {
return -1;
} else {
return 0;
}
} else {
return compareTo;
}

}

public void write(DataOutput out) throws IOException {
out.writeUTF(course);
out.writeUTF(name);
out.writeDouble(score);
}

public void readFields(DataInput in) throws IOException {
this.course = in.readUTF();
this.name = in.readUTF();
this.score = in.readDouble();
}

@Override
public String toString() {
return course + "\t" + name + "\t" + score;
}

}

1
2
3
4
5
6
7
8
algorithm       huangjiaju      82.0
algorithm liutao 82.0
computer huangjiaju 83.0
computer liutao 83.0
english huanglei 83.0
english liuyifei 74.0
math huangxiaoming 83.0
math huangjiaju 82.0