MapReduce分组(辅助排序)

输入数据
订单id 商品id 商品价格
000000001 Pdt_01 222.8
000000002 Pdt_05 722.4
000000001 Pdt_02 33.8
000000003 Pdt_02 232.8
000000003 Pdt_02 33.8
000000002 Pdt_03 522.8
000000002 Pdt_04 122.4
需求

求出每一个订单中最贵的商品。

OrderBean
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package order;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {

private int order_id;
private double price;

public OrderBean() {
}

public OrderBean(int order_id, double price) {
this.order_id = order_id;
this.price = price;
}

@Override
public int compareTo(OrderBean o) {
if(order_id>o.getOrder_id()){
return 1;
}else if(order_id<o.getOrder_id()){
return -1;
}else{
if(price>o.getPrice()){
return -1;
}else if(price<o.getPrice()){
return 1;
}else{
return 0;
}
}
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(order_id);
out.writeDouble(price);
}

@Override
public void readFields(DataInput in) throws IOException {
order_id=in.readInt();
price=in.readDouble();
}

public int getOrder_id() {
return order_id;
}

public void setOrder_id(int order_id) {
this.order_id = order_id;
}

public double getPrice() {
return price;
}

public void setPrice(double price) {
this.price = price;
}

@Override
public String toString() {
return order_id+" "+price;
}
}

OrderMapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package order;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields=value.toString().split(" ");
OrderBean orderBean=new OrderBean();
orderBean.setOrder_id(Integer.parseInt(fields[0]));
orderBean.setPrice(Double.parseDouble(fields[2]));
context.write(orderBean, NullWritable.get());
}
}

OrderReducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package order;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class OrderReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}

OrderDriver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package order;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OrderDriver {
public static void main(String[] args)throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
FileSystem fs = FileSystem.get(conf);
Path out=new Path("d://test/output");
if(fs.exists(out)){
fs.delete(out,true);
}
job.setJarByClass(OrderDriver.class);
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,new Path("d://test/input/*"));
FileOutputFormat.setOutputPath(job,out);
job.setGroupingComparatorClass(OrderGroupingComparator.class);
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}

OrderGroupingComparator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package order;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderGroupingComparator extends WritableComparator {

protected OrderGroupingComparator(){
super(OrderBean.class,true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean aBean= (OrderBean) a;
OrderBean bBean= (OrderBean) b;
if(aBean.getOrder_id()>bBean.getOrder_id()){
return 1;
}else if (aBean.getOrder_id()<bBean.getOrder_id()){
return -1;
}else{
return 0;
}
}
}

扩展

求出每个订单中Top3贵的商品

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package order;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class OrderReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
int count=0;

for(NullWritable value:values){
if(count==3){
break;
}
context.write(key,NullWritable.get());
count+=1;
}
}
}