你也许注意到了,今天的源码阅读计划后面有一个伪。没错,我们今天不读源码了,主要是因为我懒,想玩王者荣耀觉得前一章的 shuff write 源码的篇幅太长,害怕大家看完之后摸不着头脑。于是今天制作了 shuffle read 的图解版,顺便解决学习 spark 以来困扰我许久的几个问题。
问题
key 与如何与分区对应的
主要取决于分区器
分区发生在什么时候
建立初始RDD 以及 shuffle的时候
key 与分区是一对一映射吗
不是。一个分区可以包含多个key,但同一个 key 只能存在一个分区中(如果使用hash 分区器)
如果同一个分区会存在多个key,那么在使用聚合函数的时候会不会把多个key的value聚合到一起?
当然不会,如果这样聚合就出错了。虽然聚合函数的最小处理对象是一个分区,但是实际上还是对 key 进行聚合。
combineByKeyWithClassTag 中的 mergeValue 和 mergeCombiners 是否前者作用与分区内,后者作用于分区外(网上大部分都是这么说的,但其实是错误的)
不是,具体作用下面会讲解
图解 shuffle write
步骤一、分区 聚合
这里是使用了 hash 表存放了聚合后的数据。与普通的 map 不同,数据的存储在一个 data 数组中。规则为索引为偶数的元素存放 KEY ,而对应的 value 存放在 KEY 的下一个元素,注意存放的 KEY 已经不是原始数据的 key,而是 key 以及其对的分区。由于在插入时使用二次探测法,数组中可能存在空值。用k1,v1表示原始数据键值对;key=>p(key)表示分区函数。数据表示流程图如下:
步骤二、排序并写入临时文件
如果判断到 data 数组所占用的内存达到一定的阈值,就会对其中的数据进行排序。排序的方式为先对分区进行排序,分区内按照 key 的 hash 值进行排序。然后将排序好的数据写入临时文件,并产生一个空的 data 数组。这里可能发生多次写入临时文件的操作,也有可能最后仍有数据驻留在内存中而没有写入临时文件。例如,本分区一共有 10M 数据,内存阈值为 3M,则一共会发生 3 次写临时文件的操作以及最后会驻留1M的数据在内存中。需要注意的是这里并没有写入分区信息,只是写入原始的 key 以及对应聚合好的数据。
步骤三、对所有的临时文件以及内存中驻留的文件进行排序
由于存在多个临时文件并且内存中还有驻留的数据,因此需要对所有的数据进行排序并写入一个大文件中以减少临时文件的个数。这里的排序算法使用的方法大概是这样的:
每次取出第一行的第一个元素就能得到排序结果,图中每一个竖向的数组就可以看做一个排序完成的临时文件。只不过在 shuffle write 并不是取前一行,而是取前一个分区。虽然文件中只存在 k c。但是在写临时文件时记录了每个分区对应的数据条数,因此仍然可以按分区取数据。
这里还需要注意的一个地方是对于同一个 key 可能分布在不同的临时文件中,因此在排序的时候仍然需要对相同的 key 进行聚合。聚合方法其实很简单,例如获取到一条数据 (k1,c1),如果下一条数据 (key,c) key == k1 就将 mergeCombiners 将 c1 与 c聚合,否则 key == k1 的数据已经全部读取完成,也就是剩下的数据中不会存在 key == k1 的数据。
按照上面的方法不断的读取,聚合数据,写入文件就能将临时文件和内存中的所有数据进行聚合,排序并输出到一个大文件中。随后将每一个分区对应数据条数写入索引文件中,以便于记录大文件中某一条记录属于哪一个分区。完成以上步骤便完成了 shuffle write。
总结
读者: 等等,我要先吐个槽。说好的图解呢,怎么就两个图?
作者: 本来有很多图的,但是我室友找我开黑要去跑步就只画了两个图。
其实搞清楚 shuffle write 的大致步骤还是很简单的,但是具体到每一个细节就需要仔细的阅读源码了。这里推荐一波作者的spark源码阅读计划。很混乱详细的解析了关于 shuffle write 每一部分代码,配合今天的图解食用更佳。