CS-Notes/notes/笔记/MapReduce.md.txt
2018-02-22 14:47:22 +08:00

177 lines
6.3 KiB
Plaintext
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 直观理解
统计一堆牌中有多少张黑桃最简单的方法是一张一张去数。而 MapReduce 的方法是
1. 把这堆牌分配给多个玩家;
2. 让每个玩家数自己手中有多少张黑桃;
3. 把所有玩家数出的黑桃数加起来就是最后的结果。
通过让多个玩家来并行地进行统计MapReduce 可以将统计时间大大缩短。
# 大数据
从大量数据中快速提取出有价值的信息。注意到这里的数据不一定需要同种类型的数据,可以是多种多样的数据,包括非结构化数据、半结构化数据以及结构化数据。
# 基本原理
MapReduce 用来对大数据中的海量数据进行离线分析。
MapReduce 分为两个阶段Map 阶段和 Reduce 阶段。
在下图中file 包含多个 block每个块代表一个海量数据。file 被划分为多个 split每个分片由一个 Mapper Task 去处理。Map 过程中输入的是 [K1, V1] 数据这种是一种键值对形式的数据键为泛型 K1 的类型值为泛型 V1 的类型。输入也是一个泛型的键值对 [K2, V2]。Shuffle 过程主要是对 Map 阶段输出的键值对进行整合将键相同的键值对整合到一组中得到 [K2, {V2,...}] 的整合数据作为 Reudce 阶段的输入。Reduce 处理完之后得到 [K3, V3] 键值对Hadoop 会将输出的数据存到分布式文件系统 HDFS 中。
![](index_files/e2026020-d669-4faa-9e16-ca726619bb9f.jpg)
# WordCount 实战
## 开发环境配置
创建 Maven 项目 pom.xml 中添加以下依赖
```
    <properties>
        <hadoopVersion>3.0.0</hadoopVersion>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoopVersion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoopVersion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoopVersion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoopVersion}</version>
        </dependency>
    </dependencies>
```
## 文本
```html
the weather is good
today is good
good weather is good
today has good weather
```
将文本的每一行分成一片每一片的数据提供给 Mapper 进行处理因此需要 4  Mapper。其中 K1 为行号V1 位该行的字符串。
```html
Split-0: [0, "the weather is good]
Split-1: [1, "today is good"]
Split-2: [2, "good weather is good"]
Split-3: [3, "today has good weather]
```
## Mapper
Mapper 类需要提供四个泛型分别为 K1, V1, K2, V2。
```java
class WordCountMappper extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));
        }
    }
}
```
```html
Mapper-0: ["the", 1], ["weather", 1], ["is", 1], ["good", 1]
Mapper-1: ["today", 1], ["is", 1], ["good", 1]
Mapper-2: ["good", 2], ["weather", 1], ["is", 1]
Mapper-3: [today", 1], ["has", 1], ["good", 1], ["weater", 1]
```
## Shuffle
排序并将拥有相同键的数据整合。
```html
["good", {1, 1, 2, 1}]
["has", {1}]
["is", {1, 1, 1}]
["the", {1}]
["today", {1, 2}]
["weater", {1,1}]
```
## Reducer
Reducer 同样要指定 4 个泛型K2, V2, K3, V3
```java
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        Integer count = 0;
        for (IntWritable value : values) {
            count += value.get();
        }
        context.write(key, new IntWritable(count));
    }
}
```
```html
Reducer-0: ["good", 5]
Reducer-1: ["has", 1]
Reducer-2: ["is", 3]
Reducer-3: ["the", 1]
Reducer-4: ["today", 2]
Reducer-5: ["weather", 3]
```
## Job
将项目打包成 jar 包后使用 Hadoop 来运行。
```java
public class WordCountMapReduce {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
// 新建 Job
        Job job = Job.getInstance(conf, "wordcount");
        job.setJarByClass(WordCountMapReduce.class);
        job.setMapperClass(WordCountMappper.class);
        job.setReducerClass(WordCountReducer.class);
        // 设置 Mapper  Reducer  Key Value 类型
        job.setMapOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
// 设置 HDFS 路径
        FileInputFormat.setInputPaths(job, new Path("hdfs://domain:8020/words"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://domain:8020/words"));
// 运行
        boolean b = job.waitForCompletion(true);
        if (!b) {
            System.err.println("This task has failed!!!");
        }
    }
}
```
# 参考资料
- [Hadoop MapReduce 入门](http://www.jikexueyuan.com/course/2686.html)