文章目录
Scala并发编程模型Akka
1. Akka简介
Akka是什么?
Actor 模型解决什么问题?
2. Akka中Actor模型
Actor模型及其说明
Actor模型工作机制说明
3. Actor模型快速入门
SayHelloActor
package com.atguigu.akka01.actor
import akka.actor.{Actor}
/**
* @Date 2021/4/6 14:13
* @Version 10.21
* @Author DuanChaojie
* 继承Actor后,SayHelloActor就是一个Actor
* 重写核心方法receive
*/
class SayHelloActor extends Actor {
/**
* 1、receive方法,会被该SayHelloActor的MailBox调用
* 2、当该SayHelloActor的MailBox接收到消息,就会调用receive方法
* 3、type Receive = scala.PartialFunction[scala.Any, scala.Unit],即Receive表示偏函数接收的参数类型是Any,返回类型是Unit
* 4、isDefinedAt(x: Any) 如果返回true ,就会去调用 apply 构建对象实例,如果是false,过滤
*
* @return Receive
*/
override def receive: Receive = {
case "Hello" => println("SayHelloActor:Hello tom")
case "Ok" => println("SayHelloActor:Ok jack")
case "exit" => {
println("SayHelloActor:退出系统...")
// 停止actoref
/**
*/
/** 1、context.stop(xxx)阻止被指定的Actor,这是一个异步操作,即涉及一个消息发送。
* 2、如果此方法应用于来自参与者内部的"self"引用,
* 则该参与者保证不会在此调用后处理任何其他消息;
* 请注意,当前消息的处理将继续,此方法不会立即终止此参与者。
* 3、The 'self' field holds the ActorRef for this actor
* 4、self可用于向自身发送消息,格式为:self ! message
*/
context.stop(self)
/**
* Terminates this actor system
*/
context.system.terminate()
}
case _ => println("SayHelloActor:匹配失败!")
}
}
SayHelloActorDemo
package com.atguigu.akka01.main
import akka.actor.{ActorRef, ActorSystem, Props}
import com.atguigu.akka01.actor.SayHelloActor
import scala.io.StdIn
import scala.util.control.Breaks.{break, breakable}
/**
* @Date 2021/4/6 15:42
* @Version 10.21
* @Author DuanChaojie
*/
object SayHelloActorDemo {
// 1、创建一个ActorSystem,专门用于创建Actor
private val actorFactory = ActorSystem("actorFactory")
/**
* 创建SayHelloActor的同时,返回SayHelloActor的sayHelloActorRef
* 1、Props[SayHelloActor] 通过反射创建一个SayHelloActor实例
* 2、给创建的Actor(SayHelloActor)取名为sayHelloActor
* 3、sayHelloActorRef: ActorRef 就是 Props[SayHelloActor] 的ActorRef
* 4、创建的SayHelloActor实例被ActorSystem接管
*/
private val sayHelloActorRef: ActorRef = actorFactory.actorOf(Props[SayHelloActor], "sayHelloActor")
def main(args: Array[String]): Unit = {
// 向sayHelloActorRef发送消息
breakable {
while (true) {
println(Console.GREEN + "请输入你想发的消息(提示输入完毕按回车):")
val command = StdIn.readLine()
sayHelloActorRef ! command
Thread.sleep(1000)
if (command == "exit") {
break()
}
}
}
}
}
效果如图:
小结和说明:
4. Actor模型应用实例-Actor间通讯
DdActor
package com.atguigu.akka02.actor
import akka.actor.{Actor, ActorRef}
/**
* @Date 2021/4/6 15:38
* @Version 10.21
* @Author DuanChaojie
*/
class DdActor(mmActorRef: ActorRef) extends Actor {
val myMmActorRef = mmActorRef
override def receive: Receive = {
case "Go!" => {
println("3s后开启世界大战!")
Thread.sleep(3000)
myMmActorRef ! "DD"
}
case "MM" => {
Thread.sleep(1000)
println("MM:一起喵喵喵喵喵~")
myMmActorRef ! "DD"
}
}
}
MmActor
package com.atguigu.akka02.actor
import akka.actor.Actor
/**
* @Date 2021/4/6 15:38
* @Version 10.21
* @Author DuanChaojie
*/
class MmActor extends Actor {
override def receive: Receive = {
case "DD" => {
Thread.sleep(1000)
println("DD:我们一起学猫叫~")
sender() ! "MM"
}
}
}
Main
package com.atguigu.akka02.main
import akka.actor.{ActorRef, ActorSystem, Props}
import com.atguigu.akka02.actor.{DdActor, MmActor}
import scala.io.StdIn
/**
* @Date 2021/4/6 15:40
* @Version 10.21
* @Author DuanChaojie
*/
object Main extends App {
private val actorFactory = ActorSystem("actorFactory")
private val mmActorRef: ActorRef = actorFactory.actorOf(Props[MmActor], "mmActor")
private val ddActorRef: ActorRef = actorFactory.actorOf(Props(new DdActor(mmActorRef)), "ddActor")
println(Console.GREEN + "请输入Start...")
val command = StdIn.readLine()
if (command.toLowerCase == "start") {
ddActorRef ! "Go!"
}
}
效果图:
小结:
5. Akka网络编程
网络编程基础知识
TCP/IP模型
IP地址
端口port
Akka网络编程-小黄鸡客服案例
服务端–ServerMain
package com.atguigu.akka03.server
import akka.actor.{ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
/**
* @Date 2021/4/6 16:18
* @Version 10.21
* @Author DuanChaojie
*/
object ServerMain extends App{
val serverHost = "127.0.0.1"
val serverPort = 8888
/**
* 对于此字符串中的每一行:
* 从行中删除由空格或控制字符('|')组成的前导前缀。
* 创建config对象,指定协议类型,监听的ip和端口
*/
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$serverHost
|akka.remote.netty.tcp.port=$serverPort
""".stripMargin)
private val serverActorFactory = ActorSystem("serverActorFactory",config)
private val yellowChickenServerRef: ActorRef = serverActorFactory.actorOf(Props[YellowChickenServer],"yellowChickenServer")
// ServerMain启动
yellowChickenServerRef ! "start".toLowerCase
}
服务端–YellowChickenServer
package com.atguigu.akka03.server
import akka.actor.Actor
import com.atguigu.akka03.common.{ClientMessage, ServerMessage}
/**
* @Date 2021/4/6 16:17
* @Version 10.21
* @Author DuanChaojie
*/
class YellowChickenServer extends Actor {
override def receive: Receive = {
case "start" => println(Console.BLUE + "YellowChickenServer已经启动....")
case ClientMessage(msg) => {
// TODO match模糊匹配
msg match {
case "java" => sender() ! ServerMessage("Java 是由 Sun Microsystems 公司于 1995 年 5 月推出的高级程序设计语言。")
case "javascript" => sender() ! ServerMessage("JavaScript在1995年由Netscape公司的Brendan Eich,在网景导航者浏览器上首次设计实现而成。因为Netscape与Sun合作,Netscape管理层希望它外观看起来像Java,因此取名为JavaScript。但实际上它的语法风格与Self及Scheme较为接近。")
case "大数据" => sender() ! ServerMessage("大数据(big data),IT行业术语,是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。")
case "作者" => sender() ! ServerMessage("https://blog.csdn.net/weixin_45267102/article/details/111472987")
case _ => println("Nothing~")
}
}
}
}
客户端–ClientMain
package com.atguigu.akka03.client
import akka.actor.{ActorRef, ActorSystem, Props}
import com.atguigu.akka01.main.SayHelloActorDemo.sayHelloActorRef
import com.typesafe.config.ConfigFactory
import scala.io.StdIn
import scala.util.control.Breaks.{break, breakable}
/**
* @Date 2021/4/6 16:18
* @Version 10.21
* @Author DuanChaojie
*/
object ClientMain extends App {
val (clientHost, clientPort, serverHost, serverPort) = ("127.0.0.1", 6666, "127.0.0.1", 8888)
/**
* 对于此字符串中的每一行:
* 从行中删除由空格或控制字符('|')组成的前导前缀。
* 创建config对象,指定协议类型,监听的ip和端口
*/
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$clientHost
|akka.remote.netty.tcp.port=$clientPort
""".stripMargin)
val clientActorFactory = ActorSystem("clientActorFactory", config)
val yellowChickenClientRef: ActorRef = clientActorFactory.actorOf(Props(new YellowChickenClient(serverHost, serverPort)), "yellowChickenClient")
// ClientMain启动
yellowChickenClientRef ! "start".toLowerCase
// 向yellowChickenClientRef发送消息
breakable {
while (true) {
Thread.sleep(1000)
println(Console.GREEN + "请输入你咨询的问题(提示输入完毕按回车):")
val command = StdIn.readLine()
yellowChickenClientRef ! command
if (command == "exit") {
break()
}
}
}
}
客户端–YellowChickenClient
package com.atguigu.akka03.client
import akka.actor.{Actor, ActorSelection}
import com.atguigu.akka03.common.{ClientMessage, ServerMessage}
/**
* @Date 2021/4/6 16:18
* @Version 10.21
* @Author DuanChaojie
*/
class YellowChickenClient(serverHost: String, serverPort: Int) extends Actor {
var yellowChickenServerRef: ActorSelection = _
/**
* 1、在Actor中有一个方法PreStart方法,他会在actor运行前执行
* 2、在akka的开发中,通常将初始化的工作,放在preStart方法
*/
override def preStart(): Unit = {
/** 注意:
* serverActorFactory 是server端 ActorSystem("serverActorFactory",config)
* user/后面是 serverActorFactory.actorOf(Props[YellowChickenServer],"yellowChickenServer")
*/
yellowChickenServerRef = context.actorSelection(s"akka.tcp://serverActorFactory@${serverHost}:${serverPort}/user/yellowChickenServer")
println(yellowChickenServerRef)
}
override def receive: Receive = {
case "start" => println(Console.BLUE + "YellowChickenClient已经启动....")
// 将咨询的问题发送到Server端
case msg: String => {
yellowChickenServerRef ! ClientMessage(msg.toLowerCase)
}
case ServerMessage(msg) => {
println(s"YellowChickenServer: $msg")
}
}
}
MessageProtocol
/**
* @Date 2021/4/6 16:19
* @Version 10.21
* @Author DuanChaojie
*/
class MessageProtocol {
}
/**
* 使用样例类来构建协议
* 客户端发给服务器协议(序列化的对象)
* @param mes
*/
case class ClientMessage(mes: String)
/**
* 服务端发给客户端的协议(样例类对象)
* @param mes
*/
case class ServerMessage(mes: String)
项目结构图:
效果图:
6. Spark Master Worker 进程通讯项目
项目意义:
项目需求分析:
功能实现:
SparkMaster
package com.atguigu.spark.master
import akka.actor.{Actor, ActorSystem, Props}
import com.atguigu.spark.common.{HeartBeat, RegisterWorkerInfo, RegisteredWorkerInfo, RemoveTimeOutWorker, StartTimeOutWorker, WorkerInfo}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.collection.mutable
/**
* @Date 2021/4/6 18:35
* @Version 10.21
* @Author DuanChaojie
*/
class SparkMaster extends Actor {
/**
* SparkMaster维护一个SparkWorker的mutable.map(id,WorkerInfo(id, cpu, ram))
*/
val workers = mutable.Map[String, WorkerInfo]()
override def receive: Receive = {
case "start" => {
println(Console.BLUE + "SparkMaster启动了...")
self ! StartTimeOutWorker
}
case RegisterWorkerInfo(id, cpu, ram) => {
// SparkMaster处理注册信息
if (!workers.contains(id)) {
val workerInfo = new WorkerInfo(id, cpu, ram)
// 将wokerInfo加入到workers中
workers += (id -> workerInfo)
// 告诉SparkWorker注册成功
sender() ! RegisteredWorkerInfo
}
println("workers = " + workers)
}
case HeartBeat(id) => {
val workerInfo = workers(id)
workerInfo.lastHeartBeat = System.currentTimeMillis()
println("SparkMaster更新了id = " + id + "的心跳时间为:" + workerInfo.lastHeartBeat)
}
case StartTimeOutWorker => {
import context.dispatcher
context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)
}
case RemoveTimeOutWorker => {
//拿到所有的workerInfo
val workerInfos = workers.values
val deadWorkerInfos = workerInfos.filter(workerInfo => (System.currentTimeMillis() - workerInfo.lastHeartBeat) > 6000)
deadWorkerInfos.foreach(workerInfo => workers.remove(workerInfo.id))
println(s"当前有${workers.size}个sparkWorker存活~")
}
}
}
object SparkMaster {
/**
* @param args (0) serverHost args(1) serverPort
*/
def main(args: Array[String]): Unit = {
val masterHost = args(0)
val masterPort = args(1).toInt
/** 创建config对象,指定协议类型,监听的ip和端口 */
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$masterHost
|akka.remote.netty.tcp.port=$masterPort
""".stripMargin)
val sparkMasterSystem = ActorSystem("sparkMasterSystem", config)
val sparkMaster = sparkMasterSystem.actorOf(Props[SparkMaster], "sparkMaster")
sparkMaster ! "start"
}
}
SparkWorker
package com.atguigu.spark.worker
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.atguigu.spark.common.{HeartBeat, RegisterWorkerInfo, RegisteredWorkerInfo, SendHeartBeat}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
/**
* @Date 2021/4/6 18:36
* @Version 10.21
* @Author DuanChaojie
*/
class SparkWorker(masterHost: String, masterPort: Int) extends Actor {
var sparkMaster: ActorSelection = _
override def preStart(): Unit = {
sparkMaster = context.actorSelection(s"akka.tcp://sparkMasterSystem@${masterHost}:${masterPort}/user/sparkMaster")
}
// 使用UUID生成sparkWorker的id
private val id: String = java.util.UUID.randomUUID().toString
override def receive: Receive = {
case "start" => {
println(Console.BLUE + "SparkMaster启动了...")
println("正在向SparkMaster注册自己...")
//RegisterWorkerInfo(id: String, cpu: Int, ram: Int)
sparkMaster ! RegisterWorkerInfo(id, 4, 256 * 1024)
}
case RegisteredWorkerInfo => {
println("已经向SparkMaster注册成功!")
import context.dispatcher
context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)
}
case SendHeartBeat => {
println("给SparkMaster发送心跳~")
sparkMaster ! HeartBeat(id)
}
}
}
object SparkWorker {
/**
* @param args
* workerHost args(0)
* workerPort args(1)
* masterHost args(2)
* masterPort args(3)
*/
def main(args: Array[String]): Unit = {
val (workerHost, workerPort, masterHost, masterPort) = (args(0),args(1).toInt,args(2),args(3).toInt)
/** 创建config对象,指定协议类型,监听的ip和端口 */
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$workerHost
|akka.remote.netty.tcp.port=$workerPort
""".stripMargin)
val sparkWorkerSystem = ActorSystem("sparkWorkerSystem", config)
val sparkWorker = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort)), "sparkWorker")
sparkWorker ! "start"
}
}
MessageProtocol
package com.atguigu.spark.common
/**
* @Date 2021/4/6 18:37
* @Version 10.21
* @Author DuanChaojie
* MessageProtocol.scala
*/
class MessageProtocol {
}
/**
* worker注册信息
* @param id
* @param cpu
* @param ram
*/
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)
/**
* 这个是WorkerInfo, 这个信息将来是保存到master的 hm(该hashmap是用于管理worker)
* 将来这个WorkerInfo会扩展(比如增加worker上一次的心跳时间)
* @param id
* @param cpu
* @param ram
*/
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {
var lastHeartBeat : Long = System.currentTimeMillis()
}
/**
* 当worker注册成功,服务器返回一个RegisteredWorkerInfo 对象
*/
case object RegisteredWorkerInfo
/**
* worker每隔一定时间由定时器发给自己的一个消息
*/
case object SendHeartBeat
/**
* worker每隔一定时间由定时器触发,而向master发现的协议消息
* @param id
*/
case class HeartBeat(id: String)
/**
* master给自己发送一个触发检查超时worker的信息
*/
case object StartTimeOutWorker
/**
* master给自己发消息,检测worker,对于心跳超时的
*/
case object RemoveTimeOutWorker
项目结构图:
项目效果图:
进行分布式部署(Linux系统)
#依次执行以下命令
java -jar SparkMaster.jar 127.0.0.1 7777
java -jar SparkWorker.jar 127.0.0.1 6666 127.0.0.1 7777
java -jar SparkWorker.jar 127.0.0.1 5555 127.0.0.1 7777
java -jar SparkWorker.jar 127.0.0.1 4444 127.0.0.1 7777