本文基于Hadoop 3.X
简介
Hadoop MapReduce 最初理论源于 Google 2004年论文<MapReduce: Simplified Data Processing on Large Clusters> ^1, 是一个分布式计算程序框架,具备一定的可靠性与容错性。常用于处理大数据并行计算分析。MapReduce 框架包含两个重要的任务类型,即 Map 和 Reduce。
计算作业将输入数据集拆分为独立的文件块,这些文件块由 Map 任务以并行的方式处理,其中单个元素被分解为键-值对。随后MR框架对映射的输出进行排序,再将数据再输入给 Reduce 任务。作业的输入和输出一般都存储在文件系统 (HDFS) 中。
系统架构
MapReduce 系统包含一个主资源管理器 (ResourceManager),每个节点的 NodeManager 以及 每个计算程序的 MRAppMaster(详情见 YARN 架构)。一个完整的 MapReduce 计算程序在分布式运行时有三类进程:
MRAppMaster: 既Driver程序,负责整个程序的过程调度及状态协调。
MapTask:负责 Map 阶段的整个数据处理流程。
ReduceTask:负责 Reduce 阶段的整个数据处理流程。
适用场景
适用于海量大数据离线计算分析(OLAP),对于计算耗时敏感性不高(非毫秒级或者秒级返回结果),以及需要使用代码实现的复杂计算逻辑。
优点
支持海量大数据 (PB 级以上) 离线计算;可从 HDFS中 读取多种格式的数据文件。
编程简单:通常实现
org.apache.hadoop.mapred.Mapper
与org.apache.hadoop.mapred.Reducer
等接口以及编写“Driver”
即可完成分布式并行运算。较强的扩展性:当计算资源不足的时,可通过简单的机器扩容以达到水平扩展,计算程序或系统本身无需代码更变。
较高的容错性:当一个或多个计算任务失败时,系统会将其分配到其他机器节点上进行重试,从而保证任务不至于一次性失败。
缺点
- 时效性较低:不支持实时计算,无法达到像普通 RDBMS 一样的毫秒级查询分析。
- 无法进行流式计算:由于流式计算的数据源为动态的,但 MapReduce 框架所针对的均为静态数据,所以无法进行流式计算。
- 不能较好处理数据流转:每个 MapReduce 任务必须将数据写入文件系统(HDFS)中, 从而在处理有前后依赖性(DAG/有向无环图)的计算任务时,需频繁将中间数据写入磁盘,造成大量 IO 消耗。
核心组件
以下罗列出MapReduce框架中的主要核心组件,并阐明功能与使用方法。随着Hadoop MapReduce版本的更新,需不断对照源码进行理解。
1 | <dependency> |
Writable
由于 Java 自身的序列化机制 (java.io.Serializable) 较重,对象在被序列化时会携带额外信息。因此为了减少存储占用,以及网络传输带宽,Hadoop 设计了自己的序列化机制。该序列化机制 (Writable) 更紧凑,高效,且兼容性更强。需要序列化的自定义类对象可实现org.apache.hadoop.io.Writable
或org.apache.hadoop.io.WritableComparable
接口。
1 | .Public |
注意: 序列化与反序列化时,Class-Field顺序必须保持一致!
1 | .Public |
注意: 如果自定义类对象需作为 MR 中的 KEY,则必须实现 java.lang.Comparable
接口。因为在 Shuffle 过程中会针对 KEY 进行排序!
常用数据类型对比
Java 类型 | Hadoop Writable 类型 |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
null | NullWritable |
InputFormat
InputFormat 负责对原始数据源进行读取操作。通常会先针对数据文件切片,并通过对应的 RecordReader 进行读取。默认的抽象类为org.apache.hadoop.mapred.FileInputFormat
1 | .Public |
在默认情况下切片大小等于 HDFS 中存储的文件块 (Block) 大小 (128或256MB)。也可以通过 mapreduce.input.fileinputformat.split.minsize 设置切片最小字节数,以及 mapreduce.input.fileinputformat.split.maxsize 设置切片最大字节数。
RecordReader 具体可参考此[文档](RecordReader (Apache Hadoop Main 3.2.1 API))。
1 | /** RecordReader reads <key, value> pairs from an InputSplit. */ |
Mapper
Mapper负责MapReduce - MapTask的核心逻辑。具体可参考此文档。
1 | .Public |
Reducer
Reducer负责MapReduce - ReduceTask的核心逻辑。具体可参考此文档。
1 |
|
Partitioner
当需要将数据按照不同类别 / 属性分开存储时,则需要使用 Partitioner。 默认分区是根据 KEY 的 hashCode 和 reduceTasks 个数取余得到 (org.apache.hadoop.mapreduce.lib.partition.HashPartitioner)。若按照自定义条件进行分区,则需要继承org.apache.hadoop.mapreduce.Partitioner
类并重写 getPartition(…) 方法。由此MR框架可根据自定义逻辑将结果 Key-Value 进行分区。
1 | .Public |
在Job中设置自定义分区逻辑后,还要根据 Partitioner 的条件在Job里设置相应的 Reduce Task 数量。
注意:
当 ReduceTask 数量 大于 getPartition(…) 返回的最大数值,则系统会多生成(ReduceTask数 - getPartition 最大值)空的输出文件,任务成功。
当 ReduceTask 数量 大于 1 且 小于 getPartition(…) 返回的最大数值,则有一部分分区数据无法存储,系统抛出 Exception,任务失败。
当 ReduceTask 数量 等于 1,则无论MapTask端输出多少个分区文件,最终都交给一个 ReduceTask,结果存储在一个文件中 (part-r-00000)。任务成功,但耗时可能较高。
Combiner
Combiner 是 MapReduce 程序之外的一种组件,其父类依旧是org.apache.hadoop.mapred.Reducer
。Combiner在每个MapTask所在节点运行,而Reducer则负责处理全局Mapper的输出结果。Combiner的作用是为了对每个MapTask的输出进行局部汇总,以减少网络开销。但并非所有计算场景都可以使用Combiner。
OutputFormat
OutputFormat 负责 MapReduce 计算结果的最终输出所需的 RecordWriter 创建/获取,在ReduceTask之后执行。所有MR输出均需要实现该类 (org.apache.hadoop.mapred.OutputFormat
)。系统默认的输出格式为 TextOutputFormat,继承于org.apache.hadoop.mapred.FileOutputFormat
。
OutputFormat 具体可参考此文档。
1 | .Public |
RecordWriter 具体可参考此文档。
1 | .Public |
Driver
MapReduce Driver 程序包含整个MR过程 Main 方法入口,负责创建并配置计算任务 (org.apache.hadoop.mapreduce.Job
)。通过 Job.getInstance(config) 可创建任务。其中基础参数由org.apache.hadoop.conf.Configuration
或org.apache.hadoop.mapred.JobConf
设置。
Job类中常用方法如下,具体可参考此文档。
1 |
|
工作原理
MapTask
MapTask主要运行阶段
- 数据读取:由 InputFormat 获取相应的 RecordReader 以读取数据源文件。之后通过 InputSplit 解析出相对的 Key - Value。
- Map阶段:基于输入的 Key - Value 执行用户自定义的 Mapper 逻辑,随后输出一组新的 Key - Value。
- 数据采集:Mapper 输出的数据在通过OutputCollector.collect(K key, V value) 方法后被写入环形缓冲区 (kvbuffer, 当前默认100MB) 进行分区和快速排序。
- 数据溢写:当存储量达到环形缓冲区大小 80% 后进行反向溢写 (Spill) ,MapReduce 将数据写到本地磁盘上,形成一个临时文件。
- 文件合并:将多个溢写的临时小文件合并成一个数据文件,同时生成索引文件 以等待 ReduceTask 读取。
MapTask并行度
一个 MapReduce 任务 Map 阶段的并行度与客户端提交的数据切片个数相关。系统会为每一个切片分配一个 MapTask 以并行处理。在默认情况下切片大小 等于 HDFS 中的文件块 (Block) 大小。也可以通过 mapreduce.input.fileinputformat.split.minsize
设置切片最小字节数,mapreduce.input.fileinputformat.split.maxsize
设置切片最大字节数。
org.apache.hadoop.mapred.FileInputFormat
类中的 computeSplitSize 方法包含具体取值逻辑。
1 | .Public |
ReduceTask
ReduceTask主要运行阶段
数据拉取:每个 ReduceTask 主动从HDFS中读取相应分区的数据,并复制到本地磁盘中(如在阈值之内则直接写入内存)。
归并排序:将同一分区,源于不同 MapTask 的数据文件进行归并排序。
Reduce阶段:执行用户自定义 Reducer 逻辑。
结果输出:由 OutputFormat 获取相应的 RecorderWriter 以按照一定格式将结果输出。
ReduceTask并行度
ReduceTask 并行度与数据分区密切相关,可通过 Job.setNumReduceTasks(Int number) 或者 mapreduce.job.reduces 设置。默认分区计算逻辑由 HashPartitioner 承载。也可以通过自定义分区器来实现不同的分区逻辑。
若一个MapReduce任务 ReduceTask 为数量0,则表示没有Reduce阶段,结果文件数与MapTask个数一致。 ReduceTask 默认个数为1,如果数据分布不均匀,则会在ReduceTask产生数据倾斜!
完整流程
输入分片 -> Map阶段 -> Combiner阶段 (可选) -> Shuffle阶段 -> Reduce阶段 -> 结果输出
常见问题
什么是Shuffle
在 Mapper.map(…) 方法之后,Reducer.reduce(…) 方法之前的处理过程,通常叫做Shuffle。其中Shuffle的缓冲区大小会影响到 计算的执行效率,缓冲区越大,磁盘 IO 的次数越少,执行速度就越快。
Reduce Join
Reduce Join 用于关联2个数据源数据,实现 SQL 中 JOIN 的功能。
在 Map 阶段,读取来自不同数据源文件输入的 Key - Value 对,并打标签以区分。 随后使用关联字段作为 Key,其余部分和标志作为Value,最后输出。
在 Reduce 阶段,将每一个分组中来源自不同数据源的 Value(根据标签)分开,并按一定逻辑进行合并输出。
Map Join
当两个需要关联的数据源中,存在一个数据量级较小的时候,可将该数据源全部加载至内存,按关键字建立索引。数据量级较大的则作为 Map 部分输入,在Maper.map() 方法中针对每一个键值对,使其按关联字段与事先加载到内存的小数据集进行连接。最后将结果按 Key 输出,即可得到关联后的数据。
Map Join 需使用Hadoop中的 DistributedCache 把数量级小的数据源分发到各个计算节点;在每个 Mapper 的 setup 阶段加载到内存中进行缓存。Map Join 过程中无ReduceTask。
Map Join 方法常用于解决普通Join过程中 数据倾斜 的问题。
当前直接使用 MapReduce 框架编程以实现大数据计算的方案逐渐被淘汰,可选择 Apache Hive + UDF,或 Apache Spark 等更先进的框架/计算引擎代替!
作者后续也将着重介绍相关技术与实战经验。