0
点赞
收藏
分享

微信扫一扫

Spark_rdd_1


文章目录

  • ​​1.RDD特点:​​
  • ​​2.RDD的 5大属性​​
  • ​​3.RDD的执行原理​​
  • ​​4.Spark的核心组件​​

1.RDD特点:

Spark_rdd_1_移动计算

可变:

  • 存储的弹性
  • 容错的弹性
  • 计算的弹性
  • 分片的弹性

RDD 代码中是一个抽象类, 代表弹性的, 不可变, 可分区, 里面的元素可并行计算的集合, 为弹性分布式数据集。

RDD 不保存数据, 但是有血缘关系。

不可变的是逻辑, 如果想加入新的逻辑, 必须封装。

Spark_rdd_1_封装_02

2.RDD的 5大属性

Spark_rdd_1_封装_03

  1. 分区列表
  2. 分区计算函数
  3. 多个RDD有依赖关系
  4. 分区器: 一个分区的规则, 和Kafka 类似
  5. 首选位置, 判断发送给哪一个节点最优

Spark_rdd_1_封装_04
上图就是首选位置的优处

移动节点的性能不如移动计算。

Spark_rdd_1_移动计算_05

3.RDD的执行原理

启动Yarn的执行原理:

Spark_rdd_1_移动计算_06
Spark在Yarn中的执行原理:

Spark_rdd_1_移动计算_07
Spark在Driver中的RDD的执行原理:

Spark_rdd_1_spark_08
Spark在Driver和Executor的转换

Spark_rdd_1_封装_09

4.Spark的核心组件

  1. Master和Worker: 计算相关组件
  2. Driver, Executor: 资源相关组件
  3. Driver -> ApplicationMaster -> Master

RDD, 累加器 只写, 广播变量 只读
Driver: Client Executor: Server

模拟一个Driver 发送RDD 到Executor的代码逻辑

Driver:

object Driver {
def main(args: Array[String]): Unit = {
val socket = new Socket("localhost", 9999)
val socket2 = new Socket("localhost", 8888)
val task = new Task()

val outputStream = socket.getOutputStream
val objectOutputStream = new ObjectOutputStream(outputStream)

val subTask = new SubTask()
subTask.datas = task.datas.take(2)
subTask.logic = task.logic

objectOutputStream.writeObject(subTask)
objectOutputStream.flush()
objectOutputStream.close()
socket.close()

val outputStream1 = socket2.getOutputStream
val objectOutputStream1 = new ObjectOutputStream(outputStream1)

val subTask1 = new SubTask()
subTask1.datas = task.datas.takeRight(2)
subTask1.logic = task.logic

objectOutputStream1.writeObject(subTask1)
objectOutputStream1.flush()
objectOutputStream1.close()
socket2.close()
}

}

Task: 相对于 RDD

class Task extends Serializable {
val datas = List(1, 2, 3, 4)
// val logic = (num: Int) => {
// num * 2
//
// }
RDD
val logic: (Int) => Int = {_ * 2}


}

SubTask: 相对于一个一个Task

class SubTask extends Serializable {
var datas: List[Int] = _
var logic: (Int) => Int = _


def computer() = {
datas.map(logic)
}


}

Executor:

object Executor {
def main(args: Array[String]): Unit = {
val serverSocket = new ServerSocket(8888)

val socket = serverSocket.accept()
val inputStream = socket.getInputStream
val objectInputStream = new ObjectInputStream(inputStream)
val task = objectInputStream.readObject().asInstanceOf[SubTask]
val ints = task.computer()
println("计算结果为" + ints)
objectInputStream.close()
socket.close()
serverSocket.close()
}

}

Executor2 :

object Executor2 {
def main(args: Array[String]): Unit = {
val serverSocket = new ServerSocket(9999)

val socket = serverSocket.accept()
val inputStream = socket.getInputStream
val objectInputStream = new ObjectInputStream(inputStream)
val task = objectInputStream.readObject().asInstanceOf[SubTask]
val ints = task.computer()
println("计算结果为" + ints)
objectInputStream.close()
socket.close()
serverSocket.close()
}

}

Spark_rdd_1_Spark_10
Spark_rdd_1_移动计算_11

举报

相关推荐

0 条评论