Spark-最大化内存的使用效率

发布于 — 2022 年 04 月 01 日
#Spark

Reserved Memory

堆内内存可以划分为

  1. Reserved Memory,
  2. User Memory
  3. Storage Memory
  4. Execution Memory

其中Reserved Memory固定为300MB.

User Memory

再看一下之前在广播变量里提到的一段代码

1
2
3
4
val dict = List(spark, tune)
val words = spark.sparkContext.textFile(~/words.csv)
val keywords = words.filter(word => dict.contains(word))
keywords.map((_, 1)).reduceByKey(_ + _).collect

这里的dict变量, 会被分发到每个Executor上, 每个Executor上会同时存在N份(N为当前Executor上并行执行的task数量). 这些数据被存储到了堆内内存的User Memory区域

当使用广播变量将这个变量进行分发后, 这个数据就只会在Executor上存储一份. 并且这部分数据也不再存储到User Memory中, 转到了Storage Memory的存储区域

估算公式

User Memroy = 应用内自定义数据结构的对象总大小 * Executor的线程池大小

计算公式:

(spark.executor.memory - 300MB) * ( 1- spark.memory.fraction )

Storage Memory

Spark存储系统主要有3个对象

  • Shuffle中间文件
  • RDD缓存
  • 广播变量

它们都由Executor上的BlockManager进行管理, 对于数据在内存和磁盘中的存储, BlockManager利用MemoryStoreDiskStore进行抽象和封装

广播变量所携带的数据内容会物化到MemoryStore中, 以Executor为粒度为所有Task提供唯一的一份数据拷贝.

广播变量消耗的就是 Storage Memory 内存区域

估算公式

Storage Memory = ( 所有broadcast变量的大小 + 数据缓存大小 ) / Executor数量

计算公式:

(spark.executor.memory - 300MB) * spark.memory.fraction * spark.memory.storageFraction

Execution Memory

估算公式

Execution Memory = 线程数 * dataSet大小 / 并行度

计算公式:

(spark.executor.memory - 300MB) * spark.memory.fraction * ( 1 - spark.memory.storageFraction )