spark内存管理器--MemoryManager源码解析

MemoryManager内存管理器内存管理器可以说是spark内核中最重要的基础模块之一,shuffle时的排序,rdd缓存,展开内存,广播变量,Task运行结果的存储等等,凡是需要使用内存的地方都需要向内存管理器定额申请。我认为内存管理器的主要作用是为了尽可能减小内存溢出的同时提高内存利用率。旧版本的spark的内存管理是静态内存管理器StaticMemoryManager,而新版本(应该是从...

MemoryManager内存管理器

内存管理器可以说是spark内核中最重要的基础模块之一,shuffle时的排序,rdd缓存,展开内存,广播变量,Task运行结果的存储等等,凡是需要使用内存的地方都需要向内存管理器定额申请。我认为内存管理器的主要作用是为了尽可能减小内存溢出的同时提高内存利用率。旧版本的spark的内存管理是静态内存管理器StaticMemoryManager,而新版本(应该是从1.6之后吧,记不清了)则改成了统一内存管理器UnifiedMemoryManager,同一内存管理器相对于静态内存管理器最大的区别在于执行内存和存储内存二者之间没有明确的界限,可以相互借用,但是执行内存的优先级更高,也就是说如果执行内存不够用就会挤占存储内存,这时会将一部分缓存的rdd溢写到磁盘上直到腾出足够的空间。但是执行内存任何情况下都不会被挤占,想想这也可以理解,毕竟执行内存是用于shuffle时排序的,这只能在内存中进行,而rdd缓存的要求就没有这么严格。
有几个参数控制各个部分内存的使用比例,

  • spark.memory.fraction,默认值0.6,这个参数控制spark内存管理器管理的内存占内存存的比例(准确地说是:堆内存-300m,300m是为永久代预留),也就是说执行内存和存储内存加起来只有(堆内存-300m)的0.6,剩余的0.4是用于用户代码执行过程中的内存占用,比如你的代码中可能会加载一些较大的文件到内存中,或者做一些排序,用户代码使用的内存并不受内存管理器管理,所以需要预留一定的比例。
  • spark.memory.storageFraction,默认值0.5,顾名思义,这个值决定了存储内存的占比,注意是占内存管理器管理的那部分内存的比例,剩余的部分用作执行内存。例如,默认情况下,存储内存占堆内存的比例是0.6 * 0.5 = 0.3(当然准确地说是占堆内存-300m的比例)。

MemoryManager概述

我们首先整体看一下MemoryManager这个类,

 maxOnHeapStorageMemory maxOffHeapStorageMemory setMemoryStore acquireStorageMemory acquireUnrollMemory acquireExecutionMemory releaseExecutionMemory releaseAllExecutionMemoryForTask releaseStorageMemory releaseAllStorageMemory releaseUnrollMemory executionMemoryUsed storageMemoryUsed getExecutionMemoryUsageForTask

可以发现,MemoryManager内部的方法比较少而且是有规律的,它将内存在功能上分为三种:StorageMemory,UnrollMemory,ExecutionMemory,
针对这三种内存分别有申请内存的方法和释放内存的方法,并且三种申请内存的方法都是抽象方法,由子类实现。
此外,我们看一下MemoryManager内部有哪些成员变量:

 protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP) protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP) protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP) protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)

这四个成员变量分别代表四种内存池。这里要注意的是,MemoryPool的构造其中有一个Object类型参数用于同步锁,MemoryPool内部的一些方法会获取该对象锁用于同步。
我们看一下他们的初始化:

 onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory) onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory) offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory) offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)

MemoryManager.releaseExecutionMemory

其实就是调用ExecutionMemoryPool的相关方法,

  private[memory]  def releaseExecutionMemory(numBytes: Long,taskAttemptId: Long,memoryMode: MemoryMode): Unit = synchronized { memoryMode match {case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId) }  }

ExecutionMemoryPool.releaseMemory

代码逻辑很简单,就不多说了。
其实从这个方法,我们大概可以看出,spark内存管理的含义,其实spark的内存管理说到底就是对内存使用量的记录和管理,而并不是像操作系统或jvm那样真正地进行内存的分配和回收。

def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized {// 从内部的簿记量中获取该任务使用的内存val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)// 检查要释放的内存是否超过了该任务实际使用的内存,并打印告警日志var memoryToFree = if (curMem < numBytes) {  logWarning( s“Internal error: release called on $numBytes bytes but task only has $curMem bytes “  s“of memory from the $poolName pool“)  curMem} else {  numBytes}if (memoryForTask.contains(taskAttemptId)) {  // 更新簿记量  memoryForTask(taskAttemptId) -= memoryToFree  // 如果该任务的内存使用量小于等于0,那么从簿记量中移除该任务  if (memoryForTask(taskAttemptId) <= 0) { memoryForTask.remove(taskAttemptId)  }}// 最后通知其他等待的线程// 因为可能会有其他的任务在等待获取执行内存lock.notifyAll() // Notify waiters in acquireMemory() that memory has been freed}

MemoryManager.releaseAllExecutionMemoryForTask

把堆上的执行内存和直接内存的执行内存中该任务使用的内存都释放掉,
onHeapExecutionMemoryPool和offHeapExecutionMemoryPool是同一个类,只是一个记录执行内存对直接内存的使用,一个记录执行内存对堆内存的使用。

private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)    offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)}

MemoryManager.releaseStorageMemory

对于存储内存的使用的记录并没有执行内存那么细,不会记录每个RDD使用了多少内存

def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {memoryMode match {  case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)  case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)}}

MemoryManager.releaseUnrollMemory

这里,我们看一下释放展开内存的方法,发现展开内存使用的就是存储内存。回顾一下BlockManager部分,展开内存的申请主要是在将数据通过MemoryStore存储成块时需要将数据临时放在内存中,这时就需要申请展开内存。

final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {releaseStorageMemory(numBytes, memoryMode)}

小结

从上面分析的几个释放内存的方法不难看出,所谓的释放内存其实只是对内存管理器内部的一些簿记量的改变,这就要求外部的调用者必须确保它们确实释放了这么多的内存,否则内存管理就会和实际的内存使用情况出现很大偏差。当然,好在内存管理器是spark内部的模块,并不向用户开放,所以在用户代码中不会调用内存管理模块。

UnifiedMemoryManager

开篇我们讲到,spark的内存管理器分为两种,而新的版本默认都是使用统一内存管理器UnifiedMemoryManager,后面静态内存管理器会逐渐启用,所以这里我们也重点分析统一内存管理。
前面,我们分析了父类MemoryManager中释放内存的几个方法,而申请内存的几个方法都是抽象方法,这些方法的实现都是在子类中,也就是UnifiedMemoryManager中实现的。

UnifiedMemoryManager.acquireExecutionMemory

这个方法是用来申请执行内存的。其中定义了几个局部方法,maybeGrowExecutionPool方法用来挤占存储内存以扩展执行内存空间;
computeMaxExecutionPoolSize方法用来计算最大的执行内存大小。
最后调用了executionPool.acquireMemory方法实际申请执行内存。

override private[memory] def acquireExecutionMemory(  numBytes: Long,  taskAttemptId: Long,  memoryMode: MemoryMode): Long = synchronized {// 检查内存大小是否正确assertInvariants()assert(numBytes >= 0)// 根据堆内存还是直接内存决定使用不同的内存池和内存大小val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {  case MemoryMode.ON_HEAP => ( onHeapExecutionMemoryPool, onHeapStorageMemoryPool, onHeapStorageRegionSize, maxHeapMemory)  case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, offHeapStorageMemory, maxOffHeapMemory)}/** * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool. * * When acquiring memory for a task, the execution pool may need to make multiple * attempts. Each attempt must be able to evict storage in case another task jumps in * and caches a large block between the attempts. This is called once per attempt. */// 通过挤占存储内存来扩张执行内存,// 通过将缓存的块溢写到磁盘上,从而为执行内存腾出空间def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {  if (extraMemoryNeeded > 0) { // There is not enough free memory in the execution pool, so try to reclaim memory from // storage. We can reclaim any free memory from the storage pool. If the storage pool // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim // the memory that storage has borrowed from execution. // 我们可以将剩余的存储内存都借过来用作执行内存 // 另外,如果存储内存向执行内存借用了一部分内存,也就是说此时存储内存的实际大小大于配置的值 // 那么我们就将所有的借用的存储内存都还回来 val memoryReclaimableFromStorage = math.max(storagePool.memoryFree,storagePool.poolSize - storageRegionSize) if (memoryReclaimableFromStorage > 0) {// Only reclaim as much space as is necessary and available:// 只腾出必要大小的内存空间,这个方法会将内存中的block挤到磁盘中val spaceToReclaim = storagePool.freeSpaceToShrinkPool(  math.min(extraMemoryNeeded, memoryReclaimableFromStorage))// 更新一些簿记量,存储内存少了这么多内存,相应的执行内存增加了这么多内存storagePool.decrementPoolSize(spaceToReclaim) 
源文地址:https://www.guoxiongfei.cn/cntech/19275.html