本文基于Hadoop 3.X

简介

Hadoop MapReduce 最初理论源于 Google 2004年论文<MapReduce: Simplified Data Processing on Large Clusters> ^1, 是一个分布式计算程序框架,具备一定的可靠性与容错性。常用于处理大数据并行计算分析。MapReduce 框架包含两个重要的任务类型,即 Map 和 Reduce。

计算作业将输入数据集拆分为独立的文件块,这些文件块由 Map 任务以并行的方式处理,其中单个元素被分解为键-值对。随后MR框架对映射的输出进行排序,再将数据再输入给 Reduce 任务。作业的输入和输出一般都存储在文件系统 (HDFS) 中。

系统架构

MapReduce 系统包含一个主资源管理器 (ResourceManager),每个节点的 NodeManager 以及 每个计算程序的 MRAppMaster(详情见 YARN 架构)。一个完整的 MapReduce 计算程序在分布式运行时有三类进程:

  • MRAppMaster: 既Driver程序,负责整个程序的过程调度及状态协调。

  • MapTask:负责 Map 阶段的整个数据处理流程。

  • ReduceTask:负责 Reduce 阶段的整个数据处理流程。

MapReduce-Flow

适用场景

适用于海量大数据离线计算分析(OLAP),对于计算耗时敏感性不高(非毫秒级或者秒级返回结果),以及需要使用代码实现的复杂计算逻辑。

优点

  1. 支持海量大数据 (PB 级以上) 离线计算;可从 HDFS中 读取多种格式的数据文件。

  2. 编程简单:通常实现 org.apache.hadoop.mapred.Mapperorg.apache.hadoop.mapred.Reducer 等接口以及编写“Driver” 即可完成分布式并行运算。

  3. 较强的扩展性:当计算资源不足的时,可通过简单的机器扩容以达到水平扩展,计算程序或系统本身无需代码更变。

  4. 较高的容错性:当一个或多个计算任务失败时,系统会将其分配到其他机器节点上进行重试,从而保证任务不至于一次性失败。

缺点

  1. 时效性较低:不支持实时计算,无法达到像普通 RDBMS 一样的毫秒级查询分析。
  2. 无法进行流式计算:由于流式计算的数据源为动态的,但 MapReduce 框架所针对的均为静态数据,所以无法进行流式计算。
  3. 不能较好处理数据流转:每个 MapReduce 任务必须将数据写入文件系统(HDFS)中, 从而在处理有前后依赖性(DAG/有向无环图)的计算任务时,需频繁将中间数据写入磁盘,造成大量 IO 消耗。

核心组件

以下罗列出MapReduce框架中的主要核心组件,并阐明功能与使用方法。随着Hadoop MapReduce版本的更新,需不断对照源码进行理解。

1
2
3
4
5
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.mr.version}</version>
</dependency>

Writable

由于 Java 自身的序列化机制 (java.io.Serializable) 较重,对象在被序列化时会携带额外信息。因此为了减少存储占用,以及网络传输带宽,Hadoop 设计了自己的序列化机制。该序列化机制 (Writable) 更紧凑,高效,且兼容性更强。需要序列化的自定义类对象可实现org.apache.hadoop.io.Writableorg.apache.hadoop.io.WritableComparable接口。

1
2
3
4
5
6
7
8
9
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Writable {
/** Serialize the fields of this object to <code>out</code> */
void write(DataOutput out) throws IOException;

/** Deserialize the fields of this object from <code>in</code> */
void readFields(DataInput in) throws IOException;
}

注意: 序列化与反序列化时,Class-Field顺序必须保持一致!

1
2
3
4
5
6
7
8
9
10
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {

void write(DataOutput out) throws IOException;

void readFields(DataInput in) throws IOException;

int compareTo(T object);
}

注意: 如果自定义类对象需作为 MR 中的 KEY,则必须实现 java.lang.Comparable接口。因为在 Shuffle 过程中会针对 KEY 进行排序!

常用数据类型对比

Java 类型 Hadoop Writable 类型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable
null NullWritable

InputFormat

InputFormat 负责对原始数据源进行读取操作。通常会先针对数据文件切片,并通过对应的 RecordReader 进行读取。默认的抽象类为org.apache.hadoop.mapred.FileInputFormat

1
2
3
4
5
6
7
8
9
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface InputFormat<K, V> {
/** Logically split the set of input files for the job. */
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

/** Get the RecordReader */
RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;
}
在默认情况下切片大小等于 HDFS 中存储的文件块 (Block) 大小 (128或256MB)。也可以通过 mapreduce.input.fileinputformat.split.minsize 设置切片最小字节数,以及 mapreduce.input.fileinputformat.split.maxsize 设置切片最大字节数。

RecordReader 具体可参考此[文档](RecordReader (Apache Hadoop Main 3.2.1 API))。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/** RecordReader reads <key, value> pairs from an InputSplit. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface RecordReader<K, V> extends Closeable {

/** Reads the next key/value pair from the input for processing */
boolean next(K key, V value) throws IOException;

/** Create an object of the appropriate type to be used as a key */
K createKey();

/** Create an object of the appropriate type to be used as a value */
V createValue();

/** Returns the current position in the input */
long getPos() throws IOException;

/** Close the RecordReader */
@Override
void close() throws IOException;

/** Return how much of the input has been processed by [0.0, 1.0] */
float getProgress() throws IOException;
}

Mapper

Mapper负责MapReduce - MapTask的核心逻辑。具体可参考此文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {}
/** Task Begins */
protected void setup(Context context ) throws IOException, InterruptedException { /* Custom Logic If Necessary */ }

@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
/* Custom Logic */
context.write((KEYOUT) _key, (VALUEOUT) _value);
}
/** Task Ends */
protected void cleanup(Context context) throws IOException, InterruptedException { /* Custom Logic If Necessary */}
/** Advanced Functions */
public void run(Context context) throws IOException, InterruptedException { /* ... */}
}

Reducer

Reducer负责MapReduce - ReduceTask的核心逻辑。具体可参考此文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Checkpointable
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
public abstract class Context implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {}
/** Task Begins */
protected void setup(Context context) throws IOException, InterruptedException {/* Custom Logic If Necessary */}
/** Key with Values */
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {
/* Custom Logic 1 */
for (VALUEIN value: values) {
/* Custom Logic 2 */
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
/** Task Ends */
protected void cleanup(Context context) throws IOException, InterruptedException { /* Custom Logic If Necessary */}
/** Advanced Usage */
public void run(Context context) throws IOException, InterruptedException { /* ... */}
}

Partitioner

当需要将数据按照不同类别 / 属性分开存储时,则需要使用 Partitioner。 默认分区是根据 KEY 的 hashCode 和 reduceTasks 个数取余得到 (org.apache.hadoop.mapreduce.lib.partition.HashPartitioner)。若按照自定义条件进行分区,则需要继承org.apache.hadoop.mapreduce.Partitioner类并重写 getPartition(…) 方法。由此MR框架可根据自定义逻辑将结果 Key-Value 进行分区。

1
2
3
4
5
6
7
8
9
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class Partitioner<KEY, VALUE> {
/**
* Get the partition number for a given key & value & total number of partitions.
* @return the partition number for the key.
*/
public abstract int getPartition(KEY key, VALUE value, int numPartitions);
}

在Job中设置自定义分区逻辑后,还要根据 Partitioner 的条件在Job里设置相应的 Reduce Task 数量

注意:

  • ReduceTask 数量 大于 getPartition(…) 返回的最大数值,则系统会多生成(ReduceTask数 - getPartition 最大值)空的输出文件,任务成功。

  • ReduceTask 数量 大于 1 且 小于 getPartition(…) 返回的最大数值,则有一部分分区数据无法存储,系统抛出 Exception,任务失败。

  • ReduceTask 数量 等于 1,则无论MapTask端输出多少个分区文件,最终都交给一个 ReduceTask,结果存储在一个文件中 (part-r-00000)。任务成功,但耗时可能较高。

Combiner

Combiner 是 MapReduce 程序之外的一种组件,其父类依旧是org.apache.hadoop.mapred.Reducer。Combiner在每个MapTask所在节点运行,而Reducer则负责处理全局Mapper的输出结果。Combiner的作用是为了对每个MapTask的输出进行局部汇总,以减少网络开销。但并非所有计算场景都可以使用Combiner

OutputFormat

OutputFormat 负责 MapReduce 计算结果的最终输出所需的 RecordWriter 创建/获取,在ReduceTask之后执行。所有MR输出均需要实现该类 (org.apache.hadoop.mapred.OutputFormat)。系统默认的输出格式为 TextOutputFormat,继承于org.apache.hadoop.mapred.FileOutputFormat

OutputFormat 具体可参考此文档

1
2
3
4
5
6
7
8
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface OutputFormat<K, V> {
/** Return RecordWriter to write the output for the job */
RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException;

void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;
}

RecordWriter 具体可参考此文档

1
2
3
4
5
6
7
8
9
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface RecordWriter<K, V> {
/** Writes a key/value pair */
void write(K key, V value) throws IOException;

/** Close the RecordWriter to future operations */
void close(Reporter reporter) throws IOException;
}

Driver

MapReduce Driver 程序包含整个MR过程 Main 方法入口,负责创建并配置计算任务 (org.apache.hadoop.mapreduce.Job)。通过 Job.getInstance(config) 可创建任务。其中基础参数由org.apache.hadoop.conf.Configurationorg.apache.hadoop.mapred.JobConf设置。

Job类中常用方法如下,具体可参考此文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Public
@Evolving
public class Job extends JobContextImpl implements JobContext, AutoCloseable {
// …… Fields & Methods ……
public void setJobName(String name) throws IllegalStateException { /* …… */ }

public void setUser(String user) { /* …… */ }

public void setJarByClass(Class<?> cls) { /* …… */ }

public void addCacheFile(URI uri) { /* …… */ }
public void setCacheFiles(URI[] files) { /* …… */ }

public void setMapperClass(Class<? extends Mapper> cls) throws IllegalStateException { /* …… */ }
public void setReducerClass(Class<? extends Reducer> cls) throws IllegalStateException { /* …… */ }

public void setPartitionerClass(Class<? extends Partitioner> cls) throws IllegalStateException { /* …… */ }
public void setCombinerClass(Class<? extends Reducer> cls) throws IllegalStateException { /* …… */ }

public void setMapOutputKeyClass(Class<?> theClass) throws IllegalStateException { /* …… */ }
public void setMapOutputValueClass(Class<?> theClass) throws IllegalStateException { /* …… */ }

public void setOutputKeyClass(Class<?> theClass) throws IllegalStateException { /* …… */ }
public void setOutputValueClass(Class<?> theClass) throws IllegalStateException { /* …… */ }

public void setNumReduceTasks(int tasks) throws IllegalStateException { /* …… */ }

public void submit() throws Exception { /* …… */ }
public boolean waitForCompletion(boolean verbose) throws Exception { /* …… */ }
}

工作原理

MR-Basic-Diagram

MapTask

MapTask主要运行阶段

  • 数据读取:由 InputFormat 获取相应的 RecordReader 以读取数据源文件。之后通过 InputSplit 解析出相对的 Key - Value
  • Map阶段:基于输入的 Key - Value 执行用户自定义的 Mapper 逻辑,随后输出一组新的 Key - Value
  • 数据采集:Mapper 输出的数据在通过OutputCollector.collect(K key, V value) 方法后被写入环形缓冲区 (kvbuffer, 当前默认100MB) 进行分区快速排序
  • 数据溢写:当存储量达到环形缓冲区大小 80% 后进行反向溢写 (Spill) ,MapReduce 将数据写到本地磁盘上,形成一个临时文件。
  • 文件合并:将多个溢写的临时小文件合并成一个数据文件,同时生成索引文件 以等待 ReduceTask 读取。

MapTask并行度

一个 MapReduce 任务 Map 阶段的并行度与客户端提交的数据切片个数相关。系统会为每一个切片分配一个 MapTask 以并行处理。在默认情况下切片大小 等于 HDFS 中的文件块 (Block) 大小。也可以通过 mapreduce.input.fileinputformat.split.minsize 设置切片最小字节数,mapreduce.input.fileinputformat.split.maxsize 设置切片最大字节数。

org.apache.hadoop.mapred.FileInputFormat 类中的 computeSplitSize 方法包含具体取值逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
// Prepare Logic
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
// Return Logic
}
// 计算方法
protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
}

ReduceTask

ReduceTask主要运行阶段

  • 数据拉取:每个 ReduceTask 主动从HDFS中读取相应分区的数据,并复制到本地磁盘中(如在阈值之内则直接写入内存)。

  • 归并排序:将同一分区,源于不同 MapTask 的数据文件进行归并排序

  • Reduce阶段:执行用户自定义 Reducer 逻辑。

  • 结果输出:由 OutputFormat 获取相应的 RecorderWriter 以按照一定格式将结果输出。

ReduceTask并行度

ReduceTask 并行度与数据分区密切相关,可通过 Job.setNumReduceTasks(Int number) 或者 mapreduce.job.reduces 设置。默认分区计算逻辑由 HashPartitioner 承载。也可以通过自定义分区器来实现不同的分区逻辑。

若一个MapReduce任务 ReduceTask 为数量0,则表示没有Reduce阶段,结果文件数与MapTask个数一致。 ReduceTask 默认个数为1,如果数据分布不均匀,则会在ReduceTask产生数据倾斜!

完整流程

MapReduce-Diagram

输入分片 -> Map阶段 -> Combiner阶段 (可选) -> Shuffle阶段 -> Reduce阶段 -> 结果输出

常见问题

什么是Shuffle

在 Mapper.map(…) 方法之后,Reducer.reduce(…) 方法之前的处理过程,通常叫做Shuffle。其中Shuffle的缓冲区大小会影响到 计算的执行效率,缓冲区越大,磁盘 IO 的次数越少,执行速度就越快。

Reduce Join

Reduce Join 用于关联2个数据源数据,实现 SQL 中 JOIN 的功能。

在 Map 阶段,读取来自不同数据源文件输入的 Key - Value 对,并打标签以区分。 随后使用关联字段作为 Key,其余部分和标志作为Value,最后输出。
在 Reduce 阶段,将每一个分组中来源自不同数据源的 Value(根据标签)分开,并按一定逻辑进行合并输出。

Map Join

当两个需要关联的数据源中,存在一个数据量级较小的时候,可将该数据源全部加载至内存,按关键字建立索引。数据量级较大的则作为 Map 部分输入,在Maper.map() 方法中针对每一个键值对,使其按关联字段与事先加载到内存的小数据集进行连接。最后将结果按 Key 输出,即可得到关联后的数据。

Map Join 需使用Hadoop中的 DistributedCache 把数量级小的数据源分发到各个计算节点;在每个 Mapper 的 setup 阶段加载到内存中进行缓存。Map Join 过程中无ReduceTask。

Map Join 方法常用于解决普通Join过程中 数据倾斜 的问题。

当前直接使用 MapReduce 框架编程以实现大数据计算的方案逐渐被淘汰,可选择 Apache Hive + UDF,或 Apache Spark 等更先进的框架/计算引擎代替!

作者后续也将着重介绍相关技术与实战经验。

Medivh's Diary