Hadoop 中 HDFS 与 MapReducer 在Java之上的应用

概述

主要原因是因为很快就要参加云计算的技能大赛。原本以为是明年三月份,结果定在了十月初的样子,囫囵吞枣的把Hadoop相关的过了一遍。

Hadoop 分布式环境是组员给搭建的,就不再纂述。(其实是因为我搭半天懒得学了2333...

所谓 HDFS 是指一套分布式存储系统,主要目的是为了能够容下更多的数据。常理来看,一台电脑的容量总是有限,而如果这台电脑可以无线增加容量,便可以达到 "无限" 的容量。简单来说,HDFS 就是这样一套存储系统。

而 MapReducer 则是计算数据的一个框架,其实应该分为两个单词: Mapper 和 Reducer 。Mapper 是指对数据的映射,而 Reducer则是对映射(Mapper)到的数据进行整理、输出。

配置

我们需要引入:hadoop-common 、hadoop-hdfs 、hdoop-client 三个包。
需要注意的是,这些版本最好高于你所安装的版本,但也不要太高,我所安装的 Hadoop 是2.7.6,而包的版本都是2.8。

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.8.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.8.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.8.0</version>
    </dependency>

但配置这些远远是不够的,我们虽然大部分操作都是基于远程调用,但有些操作 Hadoop 依然需要你将其下载在本地,并配置环境变量,以及 winutils , 否则会报错!这里就不在纂述过程。

HDFS

首先我们需要去获得 HDFS 的基本操作对象 FileSystem 。首先第一个参数为你的 HDFS 运行在那个端口。然后是第二个参数: Configuration 可以通过它去设置 Hadoop 的默认设置,不过这里没有用到。
其次,第三个参数代表你使用的用户名,如果用户名不具有操作权限,则报错。

@Before
public void start() throws URISyntaxException, IOException, InterruptedException {
    Configuration configuration = new Configuration();
    fileSystem = FileSystem.get(new URI("hdfs://192.168.100.10:9000")
            , configuration, "root");

}

创建文件

@Test
public void create() throws IOException {
    Path path = new Path("/WordCount/hello1.txt");
    FSDataOutputStream out = fileSystem.create(path);
    out.writeUTF("hello word hello");
    out.flush();
}

获取路径下的所有文件

/**
 * 获取路径下的所有文件
 * @throws IOException
 */
@Test
public void listFiles() throws IOException {
    Path path = new Path("/hdfsapi/test/");
    FileStatus[] files = fileSystem.listStatus(path);
    //递归显示
    //FileStatus[] files = fileSystem.listFiles(path,true);
    for (FileStatus file : files) {
        System.out.println(file.getPath());
    }

}

上传大文件:带进度线程

/**
 * 上传大文件:带进度显示
 * @throws IOException
 */
@Test
public void copyFormLocalBigFile() throws IOException {
    InputStream in = new BufferedInputStream(new FileInputStream(new File("C:\\Users\\Administrator\\Desktop\\jdk-8u181-linux-x64.tar.gz")));
    Path path = new Path("/hdfsapi/test/jdk.tar.gz");
    FSDataOutputStream out = fileSystem.create(path, new Progressable() {
        public void progress() {
            System.out.println(".");
        }
    });

    IOUtils.copyBytes(in,out,4096);

}

方法该有的都有,大多相似,仅记录到此。

使用 HDFS 编写词频计数

简单的来说,就是去记录文本文件中,一个单词的出现次数。以下代码涉及到:获取文件列表、创建文件。

public static void main(String[] args) throws Exception {
    FileSystem fs = FileSystem.get(new URI("hdfs://192.168.100.10:9000"),
            new Configuration(), "root");
    Path path = new Path("/WordCount/");
    RemoteIterator<LocatedFileStatus> files = fs.listFiles(path, false);
    HashMap<String, Integer> result = new HashMap<String, Integer>();
    while (files.hasNext()) {
        LocatedFileStatus file = files.next();
        FSDataInputStream in = fs.open(file.getPath());
        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
        String line = "";
        while ((line = reader.readLine()) != null) {
            String[] split = line.split(" ");
            for (String s : split) {
                s = s.trim();
                if (result.get(s) != null){
                    result.put(s,result.get(s) + 1);
                }else{
                    result.put(s,1);
                }
            }
        }
        in.close();
        reader.close();
    }
    FSDataOutputStream out = fs.create(new Path("/WordCount/out.txt"));
    out.writeUTF(result.toString());
    out.flush();
}

MapReducer

在概述中我们就说到,我们 MapReducer 是指映射和整理两个功能。那么当我们获得一个数据集之时,则要先创建对其的映射。这里我们仍然使用 HDFS 中词频统计的样例。

Mapper

我们需要创建一个继承于 Mapper(org.apache.hadoop.mapreduce.Mapper) 的类,并实现其 map 方法。我们需要声明四个类型,前两者代表我们拿的数据的行数与内容。

简单的说,我们拿到一个数据文本,一般是采用以一行为分割依据,那么第一个参数代表这一行的首字符位置,其次则是这一行的内容。那么为什么本该是 Long 的类型要写成 LongWritable 呢?因为这些自定义的类型本身拥有序列号以及反序列化的特性。

2019080712210397.png

而后两个参数则是指,当我们映射完毕之后输出的 map 的类型,这里需要知道,我们的数据一般是采用 map 形式进行输出!

public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> 

map 方法其中有三个参数,第一个是指当前行的首字母的位置,也就是从 0 位到当前位置的偏移值。其二,当前行的内容,其三,输出 map 的对象。我们在前面说,我们映射完的对象是一个 map 形式的。而我们需要使用 context 对象进行写入这个 map。

因此代码就很简洁明了,我们将一行数据以空格为分隔符去分割,然后我们得到一系列的字母,并赋其值计次为1。

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String[] words = value.toString().split(" ");
    for (String word : words) {
        context.write(new Text(word.trim()),new IntWritable(1));
    }
}

Reduce

同样的,我们需要创建一个继承于 Reducer(org.apache.hadoop.mapreduce.Reducer) 的类。也是需要声明四个类型,前两者为接收到的映射类型,后两者为整理完毕输出的类型。

public class WordCountReducer  extends Reducer<Text, IntWritable,Text, IntWritable>

实现其方法 reduce。我们得到三个参数,第一个很显然是之前我们在 mapper 中写入的key,而第二个参数倒不是 value,而是value的一个迭代对象,它包含了数据中所有key值为当前key指的 value。为什么呢? 因为在执行 Reduce 整理操作之前实际上经历了 shuffing 层,该层会将其一个个的进行对应,然后发送到 Reduce 层。

14019352-0c530b046636edd0.jpg

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    Iterator<IntWritable> iterator = values.iterator();
    int count = 0;
    while(iterator.hasNext()){
        IntWritable value = iterator.next();
        count += value.get();
    }
    context.write(key,new IntWritable(count));

}

job(开始任务)

当完成以上任务的时候,我们就只剩下去提交任务了。

    //设置当前用户为root,与 HDFS中 FileSystem.get() 方法中我们输入的用户名功能相同。
    System.setProperty("HADOOP_USER_NAME","root");

    Configuration configuration = new Configuration();
    configuration.set("fs.defaultFS","hdfs://192.168.100.10:9000");

    //创建提交任务的对象
    Job job = Job.getInstance(configuration);

    //设置Jab在那个类执行
    job.setJarByClass(WordCountApp.class);
    //设置Mapper类
    job.setMapperClass(WordCountMapper.class);
    //设置Reduce类
    job.setReducerClass(WordCountReducer.class);

    //设置Mapper输出的key和value的类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    //设置Mapper结束后的Combiner操作,对Mapper做一次聚合,这样可以减少IO并提升性能,然后在shuffle层进行整理。
    //但是在某些计算任务的时候使用它则会出现错误,如:求平均值。
    job.setCombinerClass(WordCountReducer.class);

    //设置Reduce输出的key和value的类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    
    //作业输出和输入的路径
    FileInputFormat.setInputPaths(job,new Path("/WordCount/"));
    FileOutputFormat.setOutputPath(job,new Path("/WordCount/out1.txt"));

    //提交任务
    boolean result = job.waitForCompletion(true);
    System.out.println(result);

实现复杂类型

当我们将处理复杂的数据集的时候,我们不可能通过 Hadoop 提供的数据类型达到我们的目的,因此我们需要自定义一个类型。

我们需要实现 Writable(org.apache.hadoop.io.Writable)接口,并分别实现 write 方法和 readFields 方法。从名称可以看出,二者分别是读取和写入。

public class Access implements Writable {
    private String phone;
    private long up;
    private long down;
    private long sum;
    
public void write(DataOutput out) throws IOException {
    out.writeUTF(phone);
    out.writeLong(up);
    out.writeLong(down);
    out.writeLong(sum);
}

public void readFields(DataInput in) throws IOException {
    this.phone = in.readUTF();
    this.up = in.readLong();
    this.down = in.readLong();
    this.sum = in.readLong();

}

这样我们就可以复杂类型了。

# Hadoop