protecteddefmaybeSpill(collection: C, currentMemory: Long): Boolean = { var shouldSpill = false if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold val granted = acquireMemory(amountToRequest) myMemoryThreshold += granted // If we were granted too little memory to grow further (either tryToAcquire returned 0, // or we already had more memory than myMemoryThreshold), spill the current collection shouldSpill = currentMemory >= myMemoryThreshold } shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold // Actually spill if (shouldSpill) { _spillCount += 1 logSpillage(currentMemory) spill(collection) _elementsRead = 0 _memoryBytesSpilled += currentMemory releaseMemory() } shouldSpill }
private[this] defspillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator) : SpilledFile = { val (blockId, file) = diskBlockManager.createTempShuffleBlock() // These variables are reset after each flush var objectsWritten: Long = 0 val spillMetrics: ShuffleWriteMetrics = newShuffleWriteMetrics val writer: DiskBlockObjectWriter = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics) // List of batch sizes (bytes) in the order they are written to disk val batchSizes = newArrayBuffer[Long] // How many elements we have in each partition // 用于记录每一个分区有多少条数据 // 由于刚才数据写入文件没有写入key对应的分区,因此需要记录每一个分区有多少条数据,然后根据数据的偏移量就可以判断该数据属于哪个分区 val elementsPerPartition = newArray[Long](numPartitions) // Flush the disk writer's contents to disk, and update relevant variables. // The writer is committed at the end of this process. defflush(): Unit = { val segment = writer.commitAndGet() batchSizes += segment.length _diskBytesSpilled += segment.length objectsWritten = 0 } var success = false try { while (inMemoryIterator.hasNext) { val partitionId = inMemoryIterator.nextPartition() require(partitionId >= 0 && partitionId < numPartitions, s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})") inMemoryIterator.writeNext(writer) elementsPerPartition(partitionId) += 1 objectsWritten += 1 if (objectsWritten == serializerBatchSize) { //写入 serializerBatchSize 条数据便刷新一次缓存 // batchSize 在类中定义的如下: // 可以看出如果不存在配置默认为 10000 条 // private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000) flush() } } if (objectsWritten > 0) { flush() } else { writer.revertPartialWritesAndClose() } success = true } finally { if (success) { writer.close() } else { // This code path only happens if an exception was thrown above before we set success; // close our stuff and let the exception be thrown further writer.revertPartialWritesAndClose() if (file.exists()) { if (!file.delete()) { logWarning(s"Error deleting ${file}") } } } } //最后记录临时文件的信息 SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
// Track location of each range in the output file val lengths = newArray[Long](numPartitions) val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics().shuffleWriteMetrics)
// 这里的spills 就是 刚才产生的临时文件的数据 如果为空逻辑就比较简单,直接进行排序并写入文件 if (spills.isEmpty) { // Case where we only have in-memory data val collection = if (aggregator.isDefined) map else buffer val it = collection.destructiveSortedWritablePartitionedIterator(comparator) //这里同样使用 destructiveSortedWritablePartitionedIterator 进行排序 while (it.hasNext) { val partitionId = it.nextPartition() while (it.hasNext && it.nextPartition() == partitionId) { it.writeNext(writer) } val segment = writer.commitAndGet() lengths(partitionId) = segment.length } } else {//重点看不为空的时候,这里调用了 partitionedIterator 方法 // We must perform merge-sort; get an iterator by partition and write everything directly. for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { for (elem <- elements) { writer.write(elem._1, elem._2) } val segment = writer.commitAndGet() lengths(id) = segment.length } } }
defpartitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { val usingMap = aggregator.isDefined val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer if (spills.isEmpty) {// 这里又一次判断了是否为空,直接看有临时文件的部分 // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps // we don't even need to sort by anything other than partition ID if (!ordering.isDefined) { // The user hasn't requested sorted keys, so only sort by partition ID, not key groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None))) } else { // We do need to sort by both partition ID and key groupByPartition(destructiveIterator( collection.partitionedDestructiveSortedIterator(Some(keyComparator)))) } } else { // Merge spilled and in-memory data // 这里传入了临时文件 spills 和 排序后的缓存文件 merge(spills, destructiveIterator( collection.partitionedDestructiveSortedIterator(comparator))) } }