NLineInputFormat使用案例

输入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
hadoop hello hadoop
mapreduce hello
spark hello scala
hdfs java hadoop
yarn hello java
hadoop hello hadoop
mapreduce hello
spark2 hello scala
hdfs java hadoop
yarn2 hello java
hadoop2 hello hadoop
mapreduce hello
spark4 hello scala
hdfs java hadoop
yarn hello java
NLMapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package NLInputFormat;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class NLMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line=value.toString();
String[] fields = line.split(" ");
for(String word:fields){
context.write(new Text(word),new IntWritable(1));
}
}
}

NLReducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package NLInputFormat;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class NLReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum=0;
for (IntWritable value:values) {
sum+=value.get();
}
context.write(key,new IntWritable(sum));
}
}

NLDriver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package NLInputFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NLDriver {
public static void main(String[] args)throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
NLineInputFormat.setNumLinesPerSplit(job,3);
job.setJarByClass(NLDriver.class);
job.setMapperClass(NLMapper.class);
job.setReducerClass(NLReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(NLineInputFormat.class);
FileSystem fs=FileSystem.get(conf);
Path out=new Path("d://test/output");
if(fs.exists(out)){
fs.delete(out,true);
}
FileInputFormat.setInputPaths(job,new Path("d://test/input/*"));
FileOutputFormat.setOutputPath(job,out);
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}