输入数据
1001 01 1
1002 02 2
1003 03 3
1004 01 4
1005 02 5
1006 03 6
01 魅族
02 华为
03 小米
TableJoin
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| package table;
import org.apache.hadoop.io.Writable;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
public class TableBean implements Writable {
private String id; private String pid; private int amount; private String pname; private String flag;
public TableBean() { }
public TableBean(String id, String pid, int amount, String pname, String flag) { this.id = id; this.pid = pid; this.amount = amount; this.pname = pname; this.flag = flag; }
@Override public void write(DataOutput out) throws IOException { out.writeUTF(id); out.writeUTF(pid); out.writeInt(amount); out.writeUTF(pname); out.writeUTF(flag); }
@Override public void readFields(DataInput in) throws IOException { id=in.readUTF(); pid=in.readUTF(); amount=in.readInt(); pname=in.readUTF(); flag=in.readUTF(); }
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public int getAmount() { return amount; }
public void setAmount(int amount) { this.amount = amount; }
public String getPname() { return pname; }
public void setPname(String pname) { this.pname = pname; }
public void setPid(String pid) { this.pid = pid; }
public void setFlag(String flag) { this.flag = flag; }
public String getPid() { return pid; }
public String getFlag() { return flag; }
@Override public String toString() { return id+"\t"+amount+"\t"+pname; } }
|
TableMapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package table;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class TableMapper extends Mapper<LongWritable, Text,Text,TableBean> {
private String name;
@Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit inputSplit = (FileSplit) context.getInputSplit(); name = inputSplit.getPath().getName(); }
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { TableBean tableBean = new TableBean(); String line = value.toString(); Text k=new Text(); if(name.startsWith("order")){ String[] fields = line.split("\t"); tableBean.setId(fields[0]); tableBean.setPid(fields[1]); tableBean.setAmount(Integer.parseInt(fields[2])); tableBean.setPname(""); tableBean.setFlag("order"); k.set(fields[1]); }else{ String[] fields = line.split("\t"); tableBean.setId(""); tableBean.setPid(fields[0]); tableBean.setAmount(0); tableBean.setPname(fields[1]); tableBean.setFlag("pd"); k.set(fields[0]); } context.write(k,tableBean);
} }
|
TableReducer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package table;
import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList;
public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> { @Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException { ArrayList<TableBean> orderBeans = new ArrayList<>(); TableBean pdBean=new TableBean(); for (TableBean tableBean: values) { if("order".equals(tableBean.getFlag())){ TableBean tmpBean=new TableBean(); try { BeanUtils.copyProperties(tmpBean,tableBean); orderBeans.add(tmpBean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } }else{ try { BeanUtils.copyProperties(pdBean,tableBean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } }
for (TableBean tableBean: orderBeans) { tableBean.setPname(pdBean.getPname()); context.write(tableBean,NullWritable.get()); } } }
|
TableDriver
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package table;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TableDriver { public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); Job job=Job.getInstance(conf); FileSystem fs = FileSystem.get(conf); Path out=new Path("d://test/output"); if(fs.exists(out)){ fs.delete(out,true); } job.setJarByClass(TableDriver.class); job.setMapperClass(TableMapper.class); job.setReducerClass(TableReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TableBean.class); job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job,new Path("d://test/input/*")); FileOutputFormat.setOutputPath(job,out); boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }
|
输出数据
1004 4 魅族
1001 1 魅族
1005 5 华为
1002 2 华为
1006 6 小米
1003 3 小米