case smt: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId mapOutputTracker.registerMapOutput( shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
// 获取 block 对应的输入流 val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) => // Note: the asKeyValueIterator below wraps a key/value iterator inside of a // NextIterator. The NextIterator makes sure that close() is called on the // underlying InputStream when all records have been read. serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator }
// Update the context task metrics for each record read. val readMetrics = context.taskMetrics.createTempShuffleReadMetrics() val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( recordIter.map { record => readMetrics.incRecordsRead(1) record }, context.taskMetrics().mergeShuffleReadMetrics())
// An interruptible iterator must be used here in order to support task cancellation // val interruptibleIter = newInterruptibleIterator[(Any, Any)](context, metricIter) val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { // 如果需要聚合 if (dep.mapSideCombine) { // We are reading values that are already combined val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]] dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context) } else { // We don't know the value type, but also don't care -- the dependency *should* // have made sure its compatible w/ this aggregator, which will convert the value // type to the combined type C val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]] dep.aggregator.get.combineValuesByKey(keyValuesIterator, context) } } else { interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]] }
overridedefiterator: Iterator[(K, C)] = { if (currentMap == null) { thrownewIllegalStateException( "ExternalAppendOnlyMap.iterator is destructive and should only be called once.") } // 如果没有生成临时文件 if (spilledMaps.isEmpty) { CompletionIterator[(K, C), Iterator[(K, C)]]( destructiveIterator(currentMap.iterator), freeCurrentMap()) } else { newExternalIterator() }
overridedefnext(): (K, C) = { if (mergeHeap.isEmpty) { thrownewNoSuchElementException } // Select a key from the StreamBuffer that holds the lowest key hash // 这里需要注意的是每一个 StreamBuffer 中的数组的元素是按照 key 经过排序的,mergeHeap 中的 StreamBuffer 也是按照 minKeyHash 进行排序的。也就是从 mergeHeap 每取出一个StreamBuffer,其对应的数组中 key 的 hash 一定是目前 mergeHeap 中所有数组中 key 最小的,如果能理解这一点,那这里的 merge 就基本可以理解了。 val minBuffer = mergeHeap.dequeue() val minPairs = minBuffer.pairs val minHash = minBuffer.minKeyHash val minPair = removeFromBuffer(minPairs, 0) val minKey = minPair._1 var minCombiner = minPair._2 assert(hashKey(minPair) == minHash)
// For all other streams that may have this key (i.e. have the same minimum key hash), // merge in the corresponding value (if any) from that stream val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer) // 如果下一个 StreamBuffer 中的 minKeyHash 相同,则可能会含有相同的 key,则需要合并。这里由于是经过排序的,所以不用遍历所有的 StreamBuffer。只要下一个不同,则后面一定不会有与当前 minKeyHash 相同的的 StreamBuffer。 while (mergeHeap.nonEmpty && mergeHeap.head.minKeyHash == minHash) { val newBuffer = mergeHeap.dequeue() // 这里如果有相同的 key 则进行合并 // 注意,hash 相同 key 不一定相同 minCombiner = mergeIfKeyExists(minKey, minCombiner, newBuffer) mergedBuffers += newBuffer }
// Repopulate each visited stream buffer and add it back to the queue if it is non-empty mergedBuffers.foreach { buffer => // 如果 key 被合并完了,就需要读取下一批hash相同的key的数据到ArrayBuffer的数组中 if (buffer.isEmpty) { readNextHashCode(buffer.iterator, buffer.pairs) } if (!buffer.isEmpty) { // 刚才dequeue的 ArrayBuffer的数组中可能有没合并完的数据 // 或者有新读取的数据则需要继续放入 mergeHeap 中进行合并 mergeHeap.enqueue(buffer) } } // 返回合并完的数据 // 注意这里的 key 只是按照 hash 进行排序的,在 ExternalSorter 才是按照用户定义的排序方式进行排序 (minKey, minCombiner) }
val resultIter = dep.keyOrdering match { caseSome(keyOrd: Ordering[K]) => // Create an ExternalSorter to sort the data. val sorter = newExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer) sorter.insertAll(aggregatedIter) context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) // Use completion callback to stop sorter if task was finished/cancelled. context.addTaskCompletionListener(_ => { sorter.stop() }) CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) caseNone => aggregatedIter }
resultIter match { case _: InterruptibleIterator[Product2[K, C]] => resultIter case _ => // Use another interruptible iterator here to support task cancellation as aggregator // or(and) sorter may have consumed previous interruptible iterator. newInterruptibleIterator[Product2[K, C]](context, resultIter) }