无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。
需求
将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package inputformat;
import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { @Override public RecordReader<Text, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { WholeRecordReader recordReader=new WholeRecordReader(); recordReader.initialize(inputSplit,taskAttemptContext); return recordReader; } }
|
自定义RecordReader类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| package inputformat;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class WholeRecordReader extends RecordReader<Text, BytesWritable> { FileSplit split; Configuration configuration; Text k=new Text(); BytesWritable v=new BytesWritable(); boolean isProgress=true;
@Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { this.split=(FileSplit) inputSplit; configuration = taskAttemptContext.getConfiguration(); }
@Override public boolean nextKeyValue() throws IOException, InterruptedException { if(isProgress){ byte[] buf=new byte[(int) split.getLength()]; Path path=split.getPath(); FileSystem fs=path.getFileSystem(configuration);
FSDataInputStream fis = fs.open(path);
IOUtils.readFully(fis,buf,0,buf.length);
v.set(buf,0, buf.length);
k.set(path.toString());
IOUtils.closeStream(fis);
isProgress=false;
return true; }
return false; }
@Override public Text getCurrentKey() throws IOException, InterruptedException { return k; }
@Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return v; }
@Override public float getProgress() throws IOException, InterruptedException { return 0; }
@Override public void close() throws IOException {
} }
|
SequenceFileMapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package inputformat;
import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SequenceFileMapper extends Mapper<Text, BytesWritable,Text,BytesWritable> { @Override protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(key,value); } }
|
SequenceFileReducer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package inputformat;
import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SequenceFileReducer extends Reducer<Text, BytesWritable,Text,BytesWritable> { @Override protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { for(BytesWritable bytesWritable:values){ context.write(key,bytesWritable); } } }
|
SequenceFileDriver
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package inputformat;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class SequenceFileDriver { public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); Job job=Job.getInstance(conf); job.setJarByClass(SequenceFileDriver.class); job.setMapperClass(SequenceFileMapper.class); job.setReducerClass(SequenceFileReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); job.setInputFormatClass(WholeFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); FileSystem fs=FileSystem.get(conf); Path out=new Path("d://test/output"); if(fs.exists(out)){ fs.delete(out,true); } FileInputFormat.setInputPaths(job,new Path("d://test/input/*")); FileOutputFormat.setOutputPath(job,out); boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }
|