MapReduce学生成绩(二)

需求2:求该成绩表每门课程当中出现了相同分数的分数,还有次数,以及该分数的人数

返回结果的格式:
科目 分数 次数 该分数的人
例子:
computer 85 3 huangzitao,liujialing,huangxiaoming

解题思路:

对于mapper阶段,输出的key-value分别是:

key: 课程,分数

value: 名字

对于reducer阶段,reduce方法接收的参数是:

key: 课程,分数

values: 课程中的某个分数的多个学生的名字的迭代器

看代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.xiaojia.stu2;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class StuMapper extends Mapper<LongWritable, Text,Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line=value.toString();
String[] split=line.split(",");
String course=split[0];
String score=split[2];
String name=split[1];
context.write(new Text(course+"\t"+score),new Text(name));
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.xiaojia.stu2;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class StuReduer extends Reducer<Text, Text,Text,Text> {

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuffer buffer=new StringBuffer();
int count=0;
for (Text value:
values) {
count+=1;
buffer.append(value+",");
}

context.write(key,new Text(count+"\t"+buffer));
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.xiaojia.stu2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class StuDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
conf.set("fs.defaultFS","hdfs://hadoop01:9000");
FileSystem fs=FileSystem.get(conf);
Job job=Job.getInstance(conf);
job.setJarByClass(StuDriver.class);
job.setMapperClass(StuMapper.class);
job.setReducerClass(StuReduer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path input=new Path(args[0]);
Path output=new Path(args[1]);
if(fs.exists(output)){
fs.delete(output,true);
}
FileInputFormat.setInputPaths(job,input);
FileOutputFormat.setOutputPath(job,output);
boolean res=job.waitForCompletion(true);
System.exit(res?0:1);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
algorithm       42      1       huangdou,
algorithm 75 1 liuyifei,
algorithm 76 1 huanglei,
algorithm 81 1 huangzitao,
algorithm 85 2 liutao,huangjiaju,
computer 48 1 huangdatou,
computer 54 1 xuzheng,
computer 75 1 liuyifei,
computer 85 4 liutao,huangzitao,liujialing,huangxiaoming,
computer 86 1 huangbo,
computer 88 1 huangjiaju,
computer 99 1 huanglei,
english 48 1 huangdatou,
english 57 1 zhaobenshan,
english 75 1 liujialing,
english 76 1 liuyifei,
english 85 4 huangbo,huanglei,zhouqi,liuyifei,
english 96 1 huangxiaoming,
math 48 1 liutao,
math 54 1 xuzheng,
math 76 1 huanglei,
math 85 4 wangbaoqiang,huangjiaju,huangxiaoming,liujialing,