KeyValueTextInputFormat案例

输入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
hadoop hello hadoop
mapreduce hello
spark hello scala
hdfs java hadoop
yarn hello java
hadoop hello hadoop
mapreduce hello
spark2 hello scala
hdfs java hadoop
yarn2 hello java
hadoop2 hello hadoop
mapreduce hello
spark4 hello scala
hdfs java hadoop
yarn hello java
KVTextMapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package keyvaluetest;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class KVTextMapper extends Mapper<Text, Text,Text, IntWritable> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(key,new IntWritable(1));
}
}

KVTextRedducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package keyvaluetest;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class KVTextReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum=0;
for(IntWritable value:values){
sum+=value.get();
}
context.write(key,new IntWritable(sum));
}
}

KVTextDriver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package keyvaluetest;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class KVTextDriver {
public static void main(String[] args)throws Exception {
Configuration conf=new Configuration();
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR," ");
Job job=Job.getInstance(conf);
job.setJarByClass(KVTextDriver.class);
job.setMapperClass(KVTextMapper.class);
job.setReducerClass(KVTextReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
FileSystem fs=FileSystem.get(conf);
Path out=new Path("d://test/output");
if(fs.exists(out)){
fs.delete(out,true);
}
FileInputFormat.setInputPaths(job,new Path("d://test/input/*"));
FileOutputFormat.setOutputPath(job,out);
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}