0
点赞
收藏
分享

微信扫一扫

【回顾】Spark 分布式计算模拟



文章目录

  • ​​前言​​
  • ​​一、模拟客户端、服务端数据通信​​
  • ​​二、模拟客户端向服务端发送计算任务​​
  • ​​三、客户端、服务端分布式集群计算模拟​​

前言

上一篇我们提到过,对于 Spark 框架有两个核心组件:Driver​Excutor​
1、Driver
Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。Driver 在 Spark 作业执行时主要负责:
        ➢ 将用户程序转化为作业(job)
        ➢ 在 Executor 之间调度任务 (task)
        ➢ 跟踪 Executor 的执行情况
        ➢ 通过 UI 展示查询运行情况
实际上,我们无法准确地描述 Driver 的定义,因为在整个的编程过程中没有看到任何有关Driver 的字眼。所以简单理解,所谓的 Driver 就是驱使整个应用运行起来的程序,也称之为Driver 类。
2、Executor
Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行。
Executor 有两个核心功能:
        ➢ 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程
        ➢ 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

一、模拟客户端、服务端数据通信

【回顾】Spark 分布式计算模拟_spark


通过以上的概念描述,我们可以将Driver、Excutor看成是客户端、服务端的数据通信模式。首先我们先来模拟回顾一下客户端、服务端数据通信的流程。

我们需要创建两个类 Driver、Executor 来分别充当客户端、服务端。

  • 客户端 Driver :需要请求连接到服务端;连接成功后向其发送数据;发送数据后结束。
  • 服务端 Executor:需要启动服务,等待客户端的请求;客户端请求后,接收客户端的连接请求;接收客户端发送的数据后,结束。

其实,还有一步应该是服务端将结果返回给客户端,客户端同时接收服务端的数据,这里就不再深入。

具体代码实现如下:

package test02

import java.io.{ObjectOutputStream, OutputStream}
import java.net.Socket

object Driver {
def main(args: Array[String]): Unit = {
// 建立客户端,准备发送数据
val client: Socket = new Socket("localhost",9999)
println("连接服务端成功!")

// 客户端发送数据
val out: OutputStream = client.getOutputStream
out.write(2)
out.flush()
println("数据传输完毕!")

// 关闭资源
out.close()
client.close()
}
}

package test02

import java.io.InputStream
import java.net.{ServerSocket, Socket}

object Executor {
def main(args: Array[String]): Unit = {
// 启动服务器,准备接收数据
val server: ServerSocket = new ServerSocket(9999)
println("正在等待客户端的连接中......")

// 等待客户端的连接
val client: Socket = server.accept()
println("客户端的连接成功!")
// 接收信息
val in: InputStream = client.getInputStream
val data: Int = in.read()
println("服务端接收的数据是:",data)

// 关闭资源
in.close()
client.close()
server.close()
}
}

客户端、服务端数据通信演示:

【回顾】Spark 分布式计算模拟_big data_02


首先运行Executor启动服务端,等待客户端的请求连接。然后运行Driver请求连接服务器,服务器端接手请求后,客户端开始发送数据2,服务端获取到传送的数据后进行输出。

​​返回顶部​​

二、模拟客户端向服务端发送计算任务

【回顾】Spark 分布式计算模拟_big data_03


Driver 在 Spark 作业执行时主要作用就是:
      ➢ 将用户程序转化为作业(job)
      ➢ 在 Executor 之间调度任务 (task)
      ➢ 跟踪 Executor 的执行情况

Executor主要任务就是运行Spark应用。

这里我们将进一步模拟任务的调度,新建一个Task类含有数据、逻辑(处理方式)、计算功能。整体思路就是,在通信的时候,将数据转为计算任务(将List中的每个数据乘2)。以类的形式模拟任务task调度到Executor中,并在Executor中调用类的计算功能实现应用的运行。

具体代码实现如下:

package test02

class Task extends Serializable{
// 数据
val data: Seq[Int] = List(1,2,3)
// 逻辑 --- 匿名函数
val logic: Int =>Int = _*2
// 计算
def compute(): Seq[Int] = {
data.map(item => logic(item))
}
}

package test02

import java.io.{ObjectOutputStream, OutputStream}
import java.net.Socket

object Driver {
def main(args: Array[String]): Unit = {
// 建立客户端,准备发送数据
val client: Socket = new Socket("localhost",9999)
println("连接服务端成功!")

// 客户端发送数据
/*val out: OutputStream = client.getOutputStream
out.write(2)
out.flush()
println("数据传输完毕!")*/
// 发送task任务
val out = client.getOutputStream
val outputObject = new ObjectOutputStream(out)
val task = new Task()
outputObject.writeObject(task)
outputObject.flush()
println("任务发送完成!")

// 关闭资源
out.close()
outputObject.close()
client.close()
}
}

package test02

import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}

object Executor {
def main(args: Array[String]): Unit = {
// 启动服务器,准备接收数据
val server: ServerSocket = new ServerSocket(9999)
println("正在等待客户端的连接中......")

// 等待客户端的连接
val client: Socket = server.accept()
println("客户端的连接成功!")
// 接收信息
/*val in: InputStream = client.getInputStream
val data: Int = in.read()
println("服务端接收的数据是:",data)*/
val in = client.getInputStream
val inputObject = new ObjectInputStream(in)
val task = inputObject.readObject().asInstanceOf[Task]
val result: Seq[Int] = task.compute()
println("计算的结果为:",result)

// 关闭资源
in.close()
inputObject.close()
client.close()
server.close()
}
}

区别于之前的简单数据通信,这里在进行网络传输的时候是一个Task类对象(需要实现序列化,继承 Serializable),在流中也是以对象的流进行传输。特别提醒,在服务端接收到序列化对象的时候需要进行类型转换,scala中使用asInstanceOf算子实现对象类型的转换。

模拟客户端向服务端发送计算任务演示:

【回顾】Spark 分布式计算模拟_服务端_04

​​返回顶部​​

三、客户端、服务端分布式集群计算模拟

【回顾】Spark 分布式计算模拟_客户端_05


二中的单节点之间的处理并不能真正体现出分布式的效果,这里我们在添加一个Excutor(JVM进程),模拟集群中工作节点(Worker)下不止有一个Executor运行的情况。

集群:多个人在一起作同样的事 。
分布式 :多个人在一起作不同的事 。

当多个人在处理同一件事情的时候,会将总工作量拆分为一小份一小份,分发给不同的人去做,最终合并起来完成同一个任务。这里我们将Task类修改为数据、逻辑主体类,看作是整个大任务,新建SubTask类就是拆分的小任务,每个小任务中都有计算功能,其工作量具体由整个集群来进行分配。

package test02
// Task 提供数据
class Task extends Serializable{
// 数据
val data: List[Int] = List(1,2,3,4)
// 逻辑 --- 匿名函数
val logic: Int =>Int = _*2
}

package test02
// SubTask 对拆分的数据进行计算
class SubTask extends Serializable {
var data: List[Int] = _
var logic: Int => Int = _
// 计算
def compute(): Seq[Int] = {
data.map(item => logic(item))
}
}

在进行通信的时候,我们创建两个 client 来模拟两份 SubTask 子任务的分发,每个任务的逻辑也就是对数据的处理逻辑是与总任务保持一致的,数据量这里我们就进行均分:take()算子返回RDD中的前n个元素​takeRight()算子返回从右边开始起的n个算子​

package test02

import java.io.ObjectOutputStream
import java.net.Socket

object Driver {
def main(args: Array[String]): Unit = {
// 建立客户端,准备发送数据
val client1 = new Socket("localhost",9999)
val client2 = new Socket("localhost",8888)
println("连接服务端成功!")
// 1、客户端发送数据
/*val out: OutputStream = client.getOutputStream
out.write(2)
out.flush()
println("数据传输完毕!")*/
// 2、发送task任务
/*val out = client.getOutputStream
val outputObject = new ObjectOutputStream(out)
val task = new Task()
outputObject.writeObject(task)
outputObject.flush()
println("任务发送完成!")*/
// 3、拆分数据发送 task
// 第一份
val out1=client1.getOutputStream
val objectOutput1 = new ObjectOutputStream(out1)
val task = new Task()
val subTask1 = new SubTask()
subTask1.logic = task.logic
subTask1.data = task.data.take(2)
objectOutput1.writeObject(subTask1)
objectOutput1.flush()
// 第二份
val out2=client2.getOutputStream
val objectOutput2 = new ObjectOutputStream(out2)
val subTask2 = new SubTask()
subTask2.logic = task.logic
subTask2.data = task.data.takeRight(2)
objectOutput2.writeObject(subTask2)
objectOutput2.flush()
println("客户端数据发送完毕!")
// 关闭资源
out1.close()
objectOutput1.close()
client1.close()
}
}

接着使用两个Executor类接收来自client1、client2的同一个任务的两个不同子任务,完成分布式集群计算。

package test02

import java.io.ObjectInputStream
import java.net.{ServerSocket, Socket}

object Executor1 {
def main(args: Array[String]): Unit = {
// 启动服务器,准备接收数据
val server: ServerSocket = new ServerSocket(9999)
println("正在等待客户端的连接中......")
// 等待客户端的连接
val client: Socket = server.accept()
println("客户端的连接成功!")
// 接收信息
/*val in: InputStream = client.getInputStream
val data: Int = in.read()
println("服务端接收的数据是:",data)*/
val in = client.getInputStream
val inputObject = new ObjectInputStream(in)
val task = inputObject.readObject().asInstanceOf[SubTask]
val result: Seq[Int] = task.compute()
println("计算节点[9999]的结果为:",result)

// 关闭资源
in.close()
inputObject.close()
client.close()
server.close()
}
}

package test02

import java.io.ObjectInputStream
import java.net.{ServerSocket, Socket}

object Executor2 {
def main(args: Array[String]): Unit = {
// 启动服务器,准备接收数据
val server: ServerSocket = new ServerSocket(8888)
println("正在等待客户端的连接中......")

// 等待客户端的连接
val client: Socket = server.accept()
println("客户端的连接成功!")
// 接收信息
/*val in: InputStream = client.getInputStream
val data: Int = in.read()
println("服务端接收的数据是:",data)*/
// 接收数据计算
val in = client.getInputStream
val inputObject = new ObjectInputStream(in)
val task = inputObject.readObject().asInstanceOf[SubTask]
val result: Seq[Int] = task.compute()
println("计算节点[8888]的结果为:",result)

// 关闭资源
in.close()
inputObject.close()
client.close()
server.close()
}
}

客户端、服务端分布式集群计算模拟演示:

【回顾】Spark 分布式计算模拟_spark_06


在上述模拟过程基础上,Spark 计算框架为了能够进行高并发​高吞吐​的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
        ➢ RDD : 弹性分布式数据集
        ➢ 累加器:分布式共享只写变量
        ➢ 广播变量:分布式共享只读变量

​​返回顶部​​


举报

相关推荐

0 条评论