章二 关于MapReduce

MapReduce是一种可用于数据处理的编程模型

所以对比一下,我加上:Hadoop是一个分布式系统基础架构

MapReduce程序本质上是并行运行的,因此可以将大规模的数据分析任务分发给任何一个具有足够多机器的数据中心

MapReduce的优势在于处理大规模数据集

2.1 气象数据集

数据格式?*

2.2 使用Unix工具来分析数据*

按行处理的话,所需时间过长

为了加快处理速度,需要并行处理程序来进行数据分析:

使用计算机上所有可用的硬件线程处理,每个线程负责处理不同年份的数据

2.3 使用Hadoop来分析数据

需要把查询表示成Mapreduce作业

2.3.1 map和reduce?*

MapReduce任务过程分为两个处理阶段:map阶段和reduce阶段

每个阶段都以键值对作为输入和输出,其类型由程序员选择。程序员还需要写两个函数map和reduce函数

输入:NCDC原始数据,以文本格式输入,将数据集的每一行作为文本输入。

键:某一行起始位置相对于文本起始位置的偏移量(忽略)

map:取出年份和气温

map函数的输出经由MapReduce框架处理后,最后发送到reduce函数 处理基于键来对键值对进行排序和分组

map实例,无import

public class MaxTemperatureMapper
    extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>
    //Mapper类是一个泛值类型,它有四个形参类型,分别指定map函数的输入值,输入值,输入值类型,输出值类型
    /**-
     * 长整型数偏移量
     * 一行文本
     * 年份
     * 气温(整数)
     */
    /*Hadoop本身提供了一套可优化的网络序列化传输的基本类型,而不是之间使用Java基本类型,这些类型都在
    org.apache.hadoop.io包中
    LongWritable:Long; Text:String; IntWritable:Integer;
    */
    {
        private static final int MISSING =9999;   

        @Override
        /*@Override 的作用是:如果想重写父类的方法,比如toString()方法的话,在方法前面加上@Override 系统可以帮你检查方法的正确性。
        Override的用法:Override:java.lang.Override是一个marker annotation类型,它被用作标注方法。
        它说明了被标注的方法重载了父类的方法,起到了断言的作用。*/ 
        public void map(LongWritable key,Text value,Context context) //map方法的输入是:一个键和一个值;map方法话提供Contexrt实例用来 输出内容的写入
            throws IOException,InterruptedException 

            {
                String line = value.toString();  //将包含有一行输入的Text值转换成Java的String类型,
                String year = line.substring(15,19);//之后用substring方法来提取我们感兴趣的列
                int airTemperature;
                if (line.charAt(87)=='+')
                { //parseInt doesn't like leading plus signs
                    airTemperature =Integer.parseInt(line.substring(88,92));
                }
                else
                {
                    airTemperature =Integer.parseInt(line.substring(87,98));
                }
                String quality =line.substring(92,93);
                if(airTemperature!=MISSING && quality.matches("[01459]"))//只有气温数据不缺失,且对应质量代码显示为正确的气温读书时,这些数据才会被写入输出记录中
                {
                    context.write(new Text(year),new IntWritable(airTemperature)); //将年份数据按Text对象进行读/写,将气温值封装在IntWritable类型中
                }

            }

    }



reduce实例

public class MaxTemperatureRedycer
        extends Reducer<Text,IntWritable,Text,IntWritable> //同样reduce函数也有四个形式参数用于指定输入和输出类型,且输入类型必须匹配map的输出类型
        {
            @Override
            public void reduce(Text key,Iterable<IntWritable> values,Context context)
            throws IOException,InterruptedException
            {
                int maxValue = Integer.MAX_VALUE;
                for(IntWritable value: values)
                {
                    maxValue=Math.max(maxValue,value.get());
                }
                context.write(key,new IntWritable(maxValue));
            }
        }

运行MapReduce作业

public class MaxTemperature
        {
            public static void main(String[] args)  throws Exception
            {
                if(args.length!=2)
                {  
                    System.out.println("Usage:MaxTemperature <input path> <output path>");
                    System.exit(-1);
                }

                Job job =new Job();
                //Job对象指定作业规范,我们可以用他来控制整个作业的运行
                //在Hadoop集群上运行这个作业时,要把代码打包成一个JAR文件。不必明确指定JAR文件的名称,在JoB对象的setJarByClass()方法中传递一个类即可
                //Hadoop可以利用这个类来查找包含它的JAR文件,从而找到相关的JAR文件

                job.setJarByClass("MaxTemperature.class");
                job.setJobName("Max Temperature");    

                //构建了Job对象后,需要指定输入和输出数据的路径,
                FileInputFormat.addInputPath(job,new Path(args[0]));
                //调用 FileInputFormat 类的静态方法 addInputPath()来的定义输出数据的路径 
                //这个路径:单个文件,一个目录(此时,将此目录下所有的文件当作输入),符合特定文件模式的一系列文件
                //多次调用 addInputPath() 实现多路径的输出
                FileOutputFormat.addOutputPath(job,new Path(args[1]));
                //调用 FileOutputFormat 类的静态方法 addOutputPath() 来指定一个输出路径
                //只能有一个输出路径
                //这个方法指定的是 reduce 函数输出文件的写入目录
                //此目录在运行作业前不能存在,否则,Hadoop将会报错并拒绝运行作业(防止意外覆盖)




                job.setMapperClass(MaxTemperatureMapper.class); //setMapperClass():指定要用的map类型
                job.setReduceClass(MaxTemperatureReducer.class);//setReduceClass(): 指定要用的reduce类型

                job.setOutputKeyClass(Text.class); 
                job.setOutputValueClass(IntWritable.class);
                //setOutputKeyClass() setOutputValueClass():reduce函数的输出类型,必须和Reduce类产生的相匹配
                //map 函数的输出类型默认情况下和 reduce 函数是相同的 如果 mapper 产出和 reducer 相同的类型时,不需要单独设置
                //如果不同,必须要使用 setMapOutputKeyClass() 和 setMapOutputValueClass() 来设置map函数的输出类型

                //输入的类型通过输入格式来控制

                System.exit(job.waitForCompletion(true)?0:1);
                //设置完 map 和 reduce 函数的类后,可以开始作业 waitForCompletion() 方法提交作业并等待执行完成
                //此方法的唯一一个参数是一个标识,指示是否已经详细输出,当标识为ture时,作业讲其进度信息写到控制台
                //返回一个bool值,标识执行的ture/false

            }
        }

2.3.2 Java MapReduce

reduce阶段

基于来对键值对进行排序和分组

遍历整个列表并从中找出最大的数

2.3.2 Java MapReduce*?

2.3.2.1 运行测试

2.4 横向扩展scaling out

为了实现横向扩展:需要把数据存储在分布式文件系统中(如HDFS),通过使用Hadoop资源管理系统YARN,Hadoop可以将MapReduce计算转移到存储有部分数据的各台机器上

2.4.1 数据流

MapReduce job:是客户端需要执行的一个工作单元,包括输入数据,MapReduc程序和配置信息。

Hadoop将任务分成若干个task执行,其中包括两类任务:map任务和reduce任务

这些任务运行在集群的节点上,并通过YARN进行调度。如果一个任务失败,它将会在不同的节点上重新自动运行

Hadoop将MapRduce的输入数据分成等长的小数据块,称为输入分片(input split),Hadoop为每一个分片构建一个map任务,并让该任务来运行用户自定义的mao函数,从而处理分片的每条记录

大量的分片使得处理每个分片所需的时间<处理整个输入数据所花的时间

如果分片切的太小,那么管理分片的总时间和构建MAP任务的总时间将决定作业的整个执行时间

一般:分片大小趋于HDHF的一个块的大小:128MB(可调整)

数据本地优化:Hadoop在存有输入数据的节点岛上运行map任务,避免使用集群带宽

有时候,本地节点在运行其他任务,此时作业调度需要从某一数据块所在的机架中的一个节点上寻找一个空闲的map槽(slot)来运行该map任务分片,极少时候,会使用其他机架的节点(意味着有机架与机架的网络传输)

为何最佳分片应该与块大小相同:确保可以存储在单个节点的最大输入块的大小,如果超过,则需要两个及以上的HDFS节点,那么分片中的部分数据,需要网络传输到map任务运行的节点(效率变低)

map任务将其输出存储到本地硬盘

1.map输入为中间结果,由reduce任务处理才有最终结果

2.作业完成,map结果无需保存

如果运行map任务的节点 在map中间结果传输给reduc任务前失败,Hadoop将在另一个节点上运行这个map任务,以再次构建map中间结果

reduce任务并不具有数据本地化优势:单个reduce任务的输入通常来源于所有mapper的输出

排序过的map输出需通过网络传给运行reduce任务的节点,数据在reduce端合并,然后由用户定义的reduce函数处理,reduce的输入通常存储在HDFS上

reduce任务的数量是独立指定

如果reduce任务过多,每个map任务就对其输入进行分区(partition)即为每个reduce任务建立一个分区,每个分区有许多键及其对应值,每个件的对应 键-值 记录都在同一分区中,分区可以由用户定义的分区函数控制,但是通常用默认的partitioner通过哈希函数来分区

map任务和reduce任务之间的数据流被称为:混洗shuffle

当数据处理完全并行,无需混洗,可能无reduce任务,那么此种情况下,唯一的非本地节点数据传输是map任务将结果写入HDFS

2.4.2 combiner函数

集群上的可用带宽限制了MapRedcue作业的数量,因此尽量避免map和reduce任务之间的传输

Hadoop容许用户针对map任务的输出指定一个combiner,combiner含的输出作为reduce含的输入

由于combiner属于优化方案,Hadoop无法确定要对一个指定的map任务需要调用多少次combiner(因此无论调用combiner多少次,reduce的输出结果都是一样的)

combiner 函数不能取代 reduce 函数

指定一个combiner

        public class MaxTemperatureWithCombiners
        {
            public static void main(String[] args) throws Exception
            {
                if(args.length!=2)
                {
                    System.err.println("Usage: MaxTemperatureWithCombiner<input path>"+"<output path>");
                    System.exit(1);
                }

                Job job = new Job();
                job.setJarByClass(MaxTemperatureWithCombiner.class); 
                job.setJobName("MaxTemperature");

                FileInputFormat.addInputPath(job,new Path(args[0]));
                FileOutputFormat.addOutputPath(job,new Path(args[1]));

                job.setMapperClass(MaxTemperatureMapper.class);
                job.setCombinerClass(MaxTemperatureReducer.class);
                Job.setReduceClass(MaxTemperatureReducer.class);

                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Integer.class);

                System.exit(job.waitForCompletion(true)?0:1);
            }
        }

2.4.3 运行分布式的MapReduce作业

nothing

2.5 Hadoop Streaming

Hadoop 提供了 MapReduce 的 API ,允许你使用非 Java 的其他语言来写自己的map和reduce函数

Hadoop Steaming 使用了 Unix 标准流为 Hadoop 和应用程序值之间的接口

map 的输入数据通过标准输入流传递给 map 函数,并且是一行一行地传输,最后将结果写到标准输出

map 输出的键-值对是一个以制表符分隔的行,reduce 函数的输入格式和此相同并通过标注输入流进行传输。reduce 函数从标准输入流中读取输入行,改输入已由 Hadoop 框架根据键排过序,最后将结果写入标准输出

2.5.1 Ruby版本

不会Ruby

2.5.2 Python版本

暂略

发表评论