说明:
- 数据结构
- 聚合 => 类HashMap
- 排序 => 类Array
- Shuffle方式
- 2.x版本,shuffle方式全部统一到 Sort Shuffle
Shuffle Write(map task)
shuffle write
的通用框架需要执行的三个步骤是:数据聚合(combine
)(可选) => 排序(可选) => 分区
BypassMergeSortShuffleWriter
特点:不需要map端聚合和排序
触发条件:1. 没有map端聚合 2.分区数不多(
nums < 200
)优缺点:
优点:
- 速度快,直接将record输出到不同的文件中
缺点:
- 每个分区都需要一个buffer缓冲区(
spark.shuffle.file.buffer,默认32k
)(内存IO) - 同时每个shuffle write需要创建分区个数个临时shuffle文件(磁盘IO)
文件个数:
每个MapTask,都会产生分区个数个临时shuffle文件和产生1个最终shuffle文件+1个索引文件,
即,如果有N个mapTask(2个分区),最终会产生2N个文件
第1个mapTask => 产生2个临时shuffle文件 => 1个最终shuffle文件+1个索引文件
第2个mapTask => 产生2个临时shuffle文件 => 1个最终shuffle文件+1个索引文件
源码解析:
// 1.为每个分区创建一个临时的shuffle文件
for (int i = 0; i < numPartitions; i++) {
final File file = tempShuffleBlockIdPlusFile._2()
// 封装每个分区的输出流对象(DiskBlockObjectWriter)
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// 2.遍历数据,将数据写入到对应的BufferedOutputStream中
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)]
.write(key, record._2());
}
// 3.将每个分区的数据写入到磁盘上
for (int i = 0; i < numPartitions; i++) {
final DiskBlockObjectWriter writer = partitionWriters[i];
// 1.flush 2.new fileSegment(file, offset, length)
partitionWriterSegments[i] = writer.commitAndGet();
}
// 4.构建最终的shuffle输出文件
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
File tmp = Utils.tempFileWith(output);
try {
// 4.1 将每个分区的临时shuffle文件合并成一个最终的shuffle文件
partitionLengths = writePartitionedFile(tmp);
// 4.2 将每个分区的offset写入index文件
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
}过程:
- 为每个分区创建一个临时的shuffle文件 (文件句柄)
- 遍历数据,将数据写入到对应的BufferedOutputStream中(这里会用到buffer缓冲区,带来内存IO)
- 将每个分区的数据写入到磁盘上(磁盘IO)
- 将每个分区的临时shuffle文件合并成一个最终的shuffle文件和一个index文件(合并过程会带来额外的磁盘IO)
SortShuffleWriter
- UnsafeShuffleWriter