当前位置:网站首页>Spark source code - code analysis of core RDD part (I)
Spark source code - code analysis of core RDD part (I)
2022-07-19 05:48:00 【Stewed seafood in a pot】
1.RDD Overview of abstract classes
Constructor and member properties
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
SparkContext It's the way to Spark The only entrance to the cluster , Can be used in Spark Create... In the cluster RDDs 、 Accumulate and broadcast variables ( BroadcastVariables ).
// Encapsulate two data members One is sparkcontext, The other is dependency namely SparkContext example , It will not be serialized
// Dependency relationship Dependency Sequence , It will not be serialized . So-called Dependency, It means the current RDD For others RDD Dependency of
Spark Also the whole Spark Applications ( Application ) One of the most important objects in , It's the whole thing Application The core of operation scheduling ( It doesn't mean resource scheduling ).
2.
private def sc: SparkContext = { // Define a SparkContest The variable of sc
if (_sc == null) { If this variable does not pass in data, an error will be reported
throw new SparkException(
"This RDD lacks a SparkContext. It could happen in the following cases: \n(1) RDD " +
"transformations and actions are NOT invoked by the driver, but inside of other " +
"transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid " +
"because the values transformation and count action cannot be performed inside of the " +
"rdd1.map transformation. For more information, see SPARK-5063.\n(2) When a Spark " +
"Streaming job recovers from checkpoint, this exception will be hit if a reference to " +
"an RDD not defined by the streaming job is used in DStream operations. For more " +
"information, See SPARK-13758.")
}
_sc
}
3.
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))
private[spark] def conf = sc.conf
// Construct a RDD, Have a one-to-one dependency on only one parent in addition , There is another auxiliary construction method , It only receives one RDD oneParent As a parameter , The oneParent Corresponding SparkContext And one-on-one dependence OneToOneDependency To construct the RDD.
4.RDD Abstract methods of classes
def compute(split: Partition, context: TaskContext): Iterator[T] / def compute(split: Partition, context: TaskContext): Iterator[T]
protected def getPartitions: Array[Partition]
protected def getDependencies: Seq[Dependency[_]] = deps
compute(): Calculation RDD A section of split The data in , Returns the iterator of the corresponding data type .
getPartitions(): obtain RDD Array of all partitions .
getDependencies(): obtain RDD All dependencies of , The default return is deps.
getPreferredLocations(): Get the calculation partition split Preferred location ( Such as HDFS The position of the upper block ) Array , This is optional .
5.pessist ( Persistence )
def persist(newLevel: StorageLevel): this.type = { // Put this RDD Persistence
if (isLocallyCheckpointed) {
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else { // This means that the user has called localCheckpoint(), The user should have put this RDD Mark as persistent .
ad locum , We should use the old storage level explicitly requested by the user ( Adjust it to use disk )
persist(newLevel, allowOverride = false)
}
}
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)// take RDD Set as the default storage type
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).// Put this RDD Keep as the default storage level
*/
def cache(): this.type = persist() // cache() It's called persist()
Why persistence
stay Spark in ,RDD Using the mechanism of lazy evaluation , Every time you encounter an action operation , They're going to do calculations from scratch . If the whole Spark There is only one action in the program , Of course, there won't be any problem . But in some cases , We need to call different actions many times , That means , Every time an action is called , Will trigger a calculation from scratch . This is for iterative computation , The price is high , Iterative computation often needs to reuse the same set of data many times , It can be done through persistence ( cache ) This mechanism avoids the overhead of repeated computation . in addition , When there is an error in the calculation process and it needs to be recalculated ,spark To the parent RDD Looking for data , At this time, you also need a parent RDD persist
6.
7. Delete persistent data
def unpersist(blocking: Boolean = false): this.type = { take RDD Mark as non persistent , And delete all its blocks from memory and disk .
logInfo(s"Removing RDD $id from persistence list")
sc.unpersistRDD(id, blocking)
storageLevel = StorageLevel.NONE
this
}
Spark Automatically monitor cache usage on each node , And in the least recently used way (LRU) Remove old data blocks from memory .
If you want to remove one manually RDD, Instead of waiting for the RDD By Spark Automatic removal , have access to RDD.unpersist() Method
8. stay RDD in dependencies_ It is specially used to store the current RDD The father of dependency Sequence .
final def dependencies: Seq[Dependency[_]] = {
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
if (dependencies_ == null) {
stateLock.synchronized {
if (dependencies_ == null) {
dependencies_ = getDependencies
}
}
}
dependencies_
}
}
1). First from CheckPoint In order to get RDD, And these RDD Encapsulated into OneToOneDependency list , And back to RDD Dependence .
2). No, CheckPoint, if dependencies_ by null, call getDependencies Assign a value to dependencies_, Finally back to dependencies_.
边栏推荐
- PyTorch学习笔记【4】:从图像学习
- Subtotal of pytorch installation of cuda11.0
- Pointnet++代码详解(四):index_points函数
- JNA加载DLL及在jar中的运用
- 模型时间复杂度和空间复杂度
- Dlib library and Dat file address
- 【语音识别】kaldi安装心得
- Edge AI边缘智能:Communication-Efficient Edge AI: Algorithms and Systems(未完待续)
- Write a timed self-test
- 5. Business analysis of data acquisition channel construction
猜你喜欢
随机推荐
INRIAPerson数据集转化为yolo训练格式并可视化
Wechat applet password display hidden (small eyes)
MySQL comma separated data for branches
Bottomsheetdialogfragment imitation Tiktok comment box
Try some methods to solve the delay of yolov5 reasoning RTSP
ES6 adds -let and const (disadvantages of VaR & let and const)
USB to TTL ch340 module installation (win10)
C语言实现迭代实现二分查找
The widerperson data set is transformed into yolov5 training format and added to crowdhuman
MySQL 服务正在启动 . MySQL 服务无法启动
Android realizes truly safe exit from App
2021-04-18
Selective Kernel Networks方法简单整理
PyTorch学习笔记【3】:使用神经网络拟合数据
2021-05-21
CV-Model【3】:VGG16
[first launch in the whole network] will an abnormal main thread cause the JVM to exit?
用C语言实现猜数游戏
Unable to determine Electron version. Please specify an Electron version
Run yolov5 process record based on mindspire








