MapReduce的类型
-
默认的MR作业
- 默认的mapper是Mapper类,它将输入的键和值原封不动地写到输出中
- 默认的partitioner是HashPartitioner,它对每条记录的键进行哈希操作以决定该记录应该属于哪个分区(每个分区对应于一个reduce任务)
- 默认的reducer是Reducer类,它将所有的输入写到输出中
- map任务的数量等于输入文件被划分成的块数
- reduce任务的个数的选择: 一个经验法则是目标reducer保持在每个运行5分钟左右且产生至少一个HDFS块的输出比较合适
- 默认的输入格式是TexInputFormat,输出是TextOutpFormat
-
默认的streaming作业
输入格式
输入分片与记录
- 一个输入分片就是由单个map操作来处理的数据块,并且每一个map只处理一个分片、
- 每个输入分片分为若干个记录,每条记录就是 一个键值对,map将一个接一个地处理记录
- 输入分片和记录都是逻辑概念,不一定对应着文件,也可能对应其他数据形式,如对于数据库,输入分片就是对应于一个表上的若干行,一条记录对应着其中的一行
- 输入分片只是指向数据的引用,不包含数据本身
- InputSpilt接口(Java中的实现,开发人员不必直接处理InputSplit,因为它是由InputFormat创建的),包含
- 以字节为单位的长度,表示分片的大小,用以排序分片,以便优先处理最大的分片,从而最小化作业运行时间
- 一组存储位置,供MR系统使用一边将map任务尽可能放在分片数据附近
- 该接口由InputFormat创建
- InputFormat
- 运行作业的客户端使用getSplits方法计算分片,并将结果告知application master,后者使用其存储信息来调度map任务从而在集群集群上处理这些分片数据
- map任务将输入分片传给createRecordReader方法来获取这个分片的RecordReader(就像是记录上的迭代器),map任务用这个RecordReader来生成记录的键值对,然后再将键值对传递给map函数(参见run方法)
InputFormat是MapReduce中一个很常用的概念,它在程序的运行中到底起到了什么作用呢?
InputFormat其实是一个接口,包含了两个方法:
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
这两个方法有分别完成着以下工作: 方法getSplits将输入数据切分成splits,splits的个数即为map tasks的个数,splits的大小默认为块大小,即128M 方法getRecordReader将每个split解析成records, 再依次将record解析成<K,V>对
也就是说InputFormat完成以下工作:
InputFile –> splits –> <K,V> : map任务把输入分片传给InputFormat的createRecordReader()方法来获得这个分片的RecordReader, RecordReader就像是记录上的迭代器,map任务用一个RecordReader来生成记录的键-值对,然后再传递给map函数进行处理。
系统常用的 InputFormat 又有哪些呢?
在领会自定义 InputFormat 之前,需要弄懂一下几个抽象类、接口及其之间的关系:
InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class), RecordReader(interface), LineRecordReader(class)的关系
FileInputFormat implements InputFormat TextInputFormat extends FileInputFormat TextInputFormat.getRecordReader calls LineRecordReader LineRecordReader implements RecordReader 对于InputFormat接口,上面已经有详细的描述再看看FileInputFormat,它实现了InputFormat接口中的getSplits方法,而将getRecordReader与isSplitable留给具体类(如TextInputFormat)实现,isSplitable方法通常不用修改,所以只需要在自定义的InputFormat中实现getRecordReader方法即可,而该方法的核心是调用LineRecordReader(即由LineRecorderReader类来实现 “将每个split解析成records, 再依次将record解析成<K,V>对”),该方法实现了接口RecordReader
public interface RecordReader<K, V> {
boolean next(K key, V value) throws IOException;
K createKey();
V createValue();
long getPos() throws IOException;
public void close() throws IOException;
float getProgress() throws IOException;
}
因此自定义InputFormat的核心是自定义一个实现接口RecordReader类似于LineRecordReader的类,该类的核心也正是重写接口RecordReader中的几大方法, 定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader
示例,数据每一行为 “物体,x坐标,y坐标,z坐标” ball 3.5,12.7,9.0 car 15,23.76,42.23 device 0.0,12.4,-67.1 每一行将要被解析为<Text, Point3D>(Point3D是自定义的数据类型)
则自定义的ObjectPositionInputFormat 类的编写如下
public class ObjectPositionInputFormat extends FileInputFormat<Text, Point3D> {
//如果是要指定的输入文件不被分片,则重写isSplitable()方法
@override
protected boolean isSplitable(JobContext context, Path file){
return false; /默认是true false表示不分片
}
@override
public RecordReader<Text, Point3D> getRecordReader (InputSplit input, JobConf job, Reporter reporter)throws IOException {
reporter.setStatus(input.toString());
return new ObjPosRecordReader(job, (FileSplit)input);
}
}