Spark源码解析-Shuffle

说明:

  1. 数据结构
    1. 聚合 => 类HashMap
    2. 排序 => 类Array
  2. Shuffle方式
    1. 2.x版本,shuffle方式全部统一到 Sort Shuffle

Shuffle Write(map task)

shuffle write的通用框架需要执行的三个步骤是:数据聚合(combine)(可选) => 排序(可选) => 分区

  • BypassMergeSortShuffleWriter

    特点:不需要map端聚合和排序

    触发条件:1. 没有map端聚合 2.分区数不多(nums < 200)

    优缺点:

    ​ 优点:

    1. 速度快,直接将record输出到不同的文件中

    ​ 缺点:

    1. 每个分区都需要一个buffer缓冲区(spark.shuffle.file.buffer,默认32k)(内存IO)
    2. 同时每个shuffle write需要创建分区个数个临时shuffle文件(磁盘IO)

    文件个数:

    ​ 每个MapTask,都会产生分区个数个临时shuffle文件和产生1个最终shuffle文件+1个索引文件,

    ​ 即,如果有N个mapTask(2个分区),最终会产生2N个文件

    ​ 第1个mapTask => 产生2个临时shuffle文件 => 1个最终shuffle文件+1个索引文件

    ​ 第2个mapTask => 产生2个临时shuffle文件 => 1个最终shuffle文件+1个索引文件

    源码解析:

    // 1.为每个分区创建一个临时的shuffle文件
    for (int i = 0; i < numPartitions; i++) {
    final File file = tempShuffleBlockIdPlusFile._2()
    // 封装每个分区的输出流对象(DiskBlockObjectWriter)
    partitionWriters[i] =
    blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
    }

    // 2.遍历数据,将数据写入到对应的BufferedOutputStream中
    while (records.hasNext()) {
    final Product2<K, V> record = records.next();
    final K key = record._1();
    partitionWriters[partitioner.getPartition(key)]
    .write(key, record._2());
    }

    // 3.将每个分区的数据写入到磁盘上
    for (int i = 0; i < numPartitions; i++) {
    final DiskBlockObjectWriter writer = partitionWriters[i];
    // 1.flush 2.new fileSegment(file, offset, length)
    partitionWriterSegments[i] = writer.commitAndGet();
    }

    // 4.构建最终的shuffle输出文件
    File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    File tmp = Utils.tempFileWith(output);
    try {
    // 4.1 将每个分区的临时shuffle文件合并成一个最终的shuffle文件
    partitionLengths = writePartitionedFile(tmp);
    // 4.2 将每个分区的offset写入index文件
    shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
    }

    过程:

    1. 为每个分区创建一个临时的shuffle文件 (文件句柄)
    2. 遍历数据,将数据写入到对应的BufferedOutputStream中(这里会用到buffer缓冲区,带来内存IO)
    3. 将每个分区的数据写入到磁盘上(磁盘IO)
    4. 将每个分区的临时shuffle文件合并成一个最终的shuffle文件和一个index文件(合并过程会带来额外的磁盘IO)
  • SortShuffleWriter

  • UnsafeShuffleWriter

Shuffle Read(reduce task)

Author: Red
Link: http://yoursite.com/2019/05/04/archives/spark/Spark%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90-Shuffle/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.