0
点赞
收藏
分享

微信扫一扫

scala学习复习笔记超详细(并发编程模型Akka)

独兜曲 2022-04-13 阅读 26
scala

文章目录

Scala并发编程模型Akka

1. Akka简介

Akka是什么?

Actor 模型解决什么问题?

2. Akka中Actor模型

Actor模型及其说明

Actor模型工作机制说明

在这里插入图片描述

3. Actor模型快速入门

SayHelloActor

package com.atguigu.akka01.actor

import akka.actor.{Actor}
/**
  * @Date 2021/4/6 14:13
  * @Version 10.21
  * @Author DuanChaojie
  * 继承Actor后,SayHelloActor就是一个Actor
  * 重写核心方法receive
  */
class SayHelloActor extends Actor {
  /**
    * 1、receive方法,会被该SayHelloActor的MailBox调用
    * 2、当该SayHelloActor的MailBox接收到消息,就会调用receive方法
    * 3、type Receive = scala.PartialFunction[scala.Any, scala.Unit],即Receive表示偏函数接收的参数类型是Any,返回类型是Unit
    * 4、isDefinedAt(x: Any) 如果返回true ,就会去调用 apply 构建对象实例,如果是false,过滤
    *
    * @return Receive
    */
  override def receive: Receive = {
    case "Hello" => println("SayHelloActor:Hello tom")
    case "Ok" => println("SayHelloActor:Ok jack")
    case "exit" => {
      println("SayHelloActor:退出系统...")
      // 停止actoref
      /**

        */

      /** 1、context.stop(xxx)阻止被指定的Actor,这是一个异步操作,即涉及一个消息发送。
        * 2、如果此方法应用于来自参与者内部的"self"引用,
        *  则该参与者保证不会在此调用后处理任何其他消息;
        *  请注意,当前消息的处理将继续,此方法不会立即终止此参与者。
        * 3、The 'self' field holds the ActorRef for this actor
        * 4、self可用于向自身发送消息,格式为:self ! message
        */
      context.stop(self)

      /**
        * Terminates this actor system
        */
      context.system.terminate()
    }
    case _ => println("SayHelloActor:匹配失败!")
  }

}

SayHelloActorDemo

package com.atguigu.akka01.main

import akka.actor.{ActorRef, ActorSystem, Props}
import com.atguigu.akka01.actor.SayHelloActor

import scala.io.StdIn
import scala.util.control.Breaks.{break, breakable}

/**
  * @Date 2021/4/6 15:42
  * @Version 10.21
  * @Author DuanChaojie
  */
object SayHelloActorDemo {
  // 1、创建一个ActorSystem,专门用于创建Actor
  private val actorFactory = ActorSystem("actorFactory")

  /**
    * 创建SayHelloActor的同时,返回SayHelloActor的sayHelloActorRef
    * 1、Props[SayHelloActor] 通过反射创建一个SayHelloActor实例
    * 2、给创建的Actor(SayHelloActor)取名为sayHelloActor
    * 3、sayHelloActorRef: ActorRef 就是 Props[SayHelloActor] 的ActorRef
    * 4、创建的SayHelloActor实例被ActorSystem接管
    */
  private val sayHelloActorRef: ActorRef = actorFactory.actorOf(Props[SayHelloActor], "sayHelloActor")


  def main(args: Array[String]): Unit = {

    // 向sayHelloActorRef发送消息
    breakable {
      while (true) {
        println(Console.GREEN + "请输入你想发的消息(提示输入完毕按回车):")
        val command = StdIn.readLine()
        sayHelloActorRef ! command
        Thread.sleep(1000)
        if (command == "exit") {
          break()
        }
      }
    }

  }
}

效果如图:

在这里插入图片描述

小结和说明:

4. Actor模型应用实例-Actor间通讯

DdActor

package com.atguigu.akka02.actor

import akka.actor.{Actor, ActorRef}

/**
  * @Date 2021/4/6 15:38
  * @Version 10.21
  * @Author DuanChaojie
  */
class DdActor(mmActorRef: ActorRef) extends Actor {
  val myMmActorRef = mmActorRef

  override def receive: Receive = {
    case "Go!" => {
      println("3s后开启世界大战!")
      Thread.sleep(3000)
      myMmActorRef ! "DD"
    }
    case "MM" => {
      Thread.sleep(1000)
      println("MM:一起喵喵喵喵喵~")
      myMmActorRef ! "DD"
    }
  }
}

MmActor

package com.atguigu.akka02.actor

import akka.actor.Actor

/**
  * @Date 2021/4/6 15:38
  * @Version 10.21
  * @Author DuanChaojie
  */
class MmActor extends Actor {
  override def receive: Receive = {
    case "DD" => {
      Thread.sleep(1000)
      println("DD:我们一起学猫叫~")
      sender() ! "MM"
    }
  }
}

Main

package com.atguigu.akka02.main

import akka.actor.{ActorRef, ActorSystem, Props}
import com.atguigu.akka02.actor.{DdActor, MmActor}

import scala.io.StdIn

/**
  * @Date 2021/4/6 15:40
  * @Version 10.21
  * @Author DuanChaojie
  */
object Main extends App {
  private val actorFactory = ActorSystem("actorFactory")

  private val mmActorRef: ActorRef = actorFactory.actorOf(Props[MmActor], "mmActor")
  private val ddActorRef: ActorRef = actorFactory.actorOf(Props(new DdActor(mmActorRef)), "ddActor")

  println(Console.GREEN + "请输入Start...")

  val command = StdIn.readLine()

  if (command.toLowerCase == "start") {
    ddActorRef ! "Go!"
  }
}

效果图:

在这里插入图片描述

小结:

5. Akka网络编程

网络编程基础知识

TCP/IP模型
IP地址
端口port

Akka网络编程-小黄鸡客服案例

服务端–ServerMain
package com.atguigu.akka03.server

import akka.actor.{ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

/**
  * @Date 2021/4/6 16:18
  * @Version 10.21
  * @Author DuanChaojie
  */
object ServerMain extends App{
  val serverHost = "127.0.0.1"
  val serverPort = 8888

  /**
    * 对于此字符串中的每一行:
    * 从行中删除由空格或控制字符('|')组成的前导前缀。
    * 创建config对象,指定协议类型,监听的ip和端口
    */
  val config = ConfigFactory.parseString(
    s"""
       |akka.actor.provider="akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname=$serverHost
       |akka.remote.netty.tcp.port=$serverPort
        """.stripMargin)

  private val serverActorFactory = ActorSystem("serverActorFactory",config)

  private val yellowChickenServerRef: ActorRef = serverActorFactory.actorOf(Props[YellowChickenServer],"yellowChickenServer")

  // ServerMain启动
  yellowChickenServerRef ! "start".toLowerCase
}
服务端–YellowChickenServer
package com.atguigu.akka03.server

import akka.actor.Actor
import com.atguigu.akka03.common.{ClientMessage, ServerMessage}

/**
  * @Date 2021/4/6 16:17
  * @Version 10.21
  * @Author DuanChaojie
  */
class YellowChickenServer extends Actor {
  override def receive: Receive = {
    case "start" => println(Console.BLUE + "YellowChickenServer已经启动....")
    case ClientMessage(msg) => {
      // TODO match模糊匹配
      msg match {
        case "java" => sender() ! ServerMessage("Java 是由 Sun Microsystems 公司于 1995 年 5 月推出的高级程序设计语言。")
        case "javascript" => sender() ! ServerMessage("JavaScript在1995年由Netscape公司的Brendan Eich,在网景导航者浏览器上首次设计实现而成。因为Netscape与Sun合作,Netscape管理层希望它外观看起来像Java,因此取名为JavaScript。但实际上它的语法风格与Self及Scheme较为接近。")
        case "大数据" => sender() ! ServerMessage("大数据(big data),IT行业术语,是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。")
        case "作者" => sender() ! ServerMessage("https://blog.csdn.net/weixin_45267102/article/details/111472987")
        case _ => println("Nothing~")
      }
    }
  }
}
客户端–ClientMain
package com.atguigu.akka03.client

import akka.actor.{ActorRef, ActorSystem, Props}
import com.atguigu.akka01.main.SayHelloActorDemo.sayHelloActorRef
import com.typesafe.config.ConfigFactory

import scala.io.StdIn
import scala.util.control.Breaks.{break, breakable}

/**
  * @Date 2021/4/6 16:18
  * @Version 10.21
  * @Author DuanChaojie
  */
object ClientMain extends App {
  val (clientHost, clientPort, serverHost, serverPort) = ("127.0.0.1", 6666, "127.0.0.1", 8888)

  /**
    * 对于此字符串中的每一行:
    * 从行中删除由空格或控制字符('|')组成的前导前缀。
    * 创建config对象,指定协议类型,监听的ip和端口
    */
  val config = ConfigFactory.parseString(
    s"""
       |akka.actor.provider="akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname=$clientHost
       |akka.remote.netty.tcp.port=$clientPort
        """.stripMargin)
  val clientActorFactory = ActorSystem("clientActorFactory", config)
  val yellowChickenClientRef: ActorRef = clientActorFactory.actorOf(Props(new YellowChickenClient(serverHost, serverPort)), "yellowChickenClient")

  // ClientMain启动
  yellowChickenClientRef ! "start".toLowerCase

  // 向yellowChickenClientRef发送消息
  breakable {
    while (true) {
      Thread.sleep(1000)
      println(Console.GREEN + "请输入你咨询的问题(提示输入完毕按回车):")
      val command = StdIn.readLine()
      yellowChickenClientRef ! command
      if (command == "exit") {
        break()
      }
    }
  }
}
客户端–YellowChickenClient
package com.atguigu.akka03.client

import akka.actor.{Actor, ActorSelection}
import com.atguigu.akka03.common.{ClientMessage, ServerMessage}


/**
  * @Date 2021/4/6 16:18
  * @Version 10.21
  * @Author DuanChaojie
  */
class YellowChickenClient(serverHost: String, serverPort: Int) extends Actor {

  var yellowChickenServerRef: ActorSelection = _

  /**
    * 1、在Actor中有一个方法PreStart方法,他会在actor运行前执行
    * 2、在akka的开发中,通常将初始化的工作,放在preStart方法
    */
  override def preStart(): Unit = {
    /** 注意:
      * serverActorFactory 是server端 ActorSystem("serverActorFactory",config)
      * user/后面是  serverActorFactory.actorOf(Props[YellowChickenServer],"yellowChickenServer")
      */
    yellowChickenServerRef = context.actorSelection(s"akka.tcp://serverActorFactory@${serverHost}:${serverPort}/user/yellowChickenServer")
    println(yellowChickenServerRef)
  }

  override def receive: Receive = {
    case "start" => println(Console.BLUE + "YellowChickenClient已经启动....")
    // 将咨询的问题发送到Server端
    case msg: String => {
      yellowChickenServerRef ! ClientMessage(msg.toLowerCase)
    }
    case ServerMessage(msg) => {
      println(s"YellowChickenServer: $msg")
    }
  }

}
MessageProtocol
/**
  * @Date 2021/4/6 16:19
  * @Version 10.21
  * @Author DuanChaojie
  */
class MessageProtocol {
}
/**
  * 使用样例类来构建协议
  * 客户端发给服务器协议(序列化的对象)
  * @param mes
  */
case class ClientMessage(mes: String)

/**
  * 服务端发给客户端的协议(样例类对象)
  * @param mes
  */
case class ServerMessage(mes: String)
项目结构图:

在这里插入图片描述

效果图:

在这里插入图片描述
在这里插入图片描述

6. Spark Master Worker 进程通讯项目

项目意义:

项目需求分析:

在这里插入图片描述

功能实现:

SparkMaster
package com.atguigu.spark.master

import akka.actor.{Actor, ActorSystem, Props}
import com.atguigu.spark.common.{HeartBeat, RegisterWorkerInfo, RegisteredWorkerInfo, RemoveTimeOutWorker, StartTimeOutWorker, WorkerInfo}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.collection.mutable

/**
  * @Date 2021/4/6 18:35
  * @Version 10.21
  * @Author DuanChaojie
  */
class SparkMaster extends Actor {

  /**
    * SparkMaster维护一个SparkWorker的mutable.map(id,WorkerInfo(id, cpu, ram))
    */
  val workers = mutable.Map[String, WorkerInfo]()


  override def receive: Receive = {
    case "start" => {
      println(Console.BLUE + "SparkMaster启动了...")
      self ! StartTimeOutWorker
    }
    case RegisterWorkerInfo(id, cpu, ram) => {
      // SparkMaster处理注册信息
      if (!workers.contains(id)) {
        val workerInfo = new WorkerInfo(id, cpu, ram)
        // 将wokerInfo加入到workers中
        workers += (id -> workerInfo)
        // 告诉SparkWorker注册成功
        sender() ! RegisteredWorkerInfo
      }
      println("workers = " + workers)
    }
    case HeartBeat(id) => {
      val workerInfo = workers(id)
      workerInfo.lastHeartBeat = System.currentTimeMillis()
      println("SparkMaster更新了id = " + id + "的心跳时间为:" + workerInfo.lastHeartBeat)
    }
    case StartTimeOutWorker => {
      import context.dispatcher
      context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)
    }
    case RemoveTimeOutWorker => {
      //拿到所有的workerInfo
      val workerInfos = workers.values
      val deadWorkerInfos = workerInfos.filter(workerInfo => (System.currentTimeMillis() - workerInfo.lastHeartBeat) > 6000)
      deadWorkerInfos.foreach(workerInfo => workers.remove(workerInfo.id))
      println(s"当前有${workers.size}个sparkWorker存活~")
    }
  }
}

object SparkMaster {
  /**
    * @param args (0) serverHost args(1) serverPort
    */
  def main(args: Array[String]): Unit = {

    val masterHost = args(0)
    val masterPort = args(1).toInt

    /** 创建config对象,指定协议类型,监听的ip和端口 */
    val config = ConfigFactory.parseString(
      s"""
         |akka.actor.provider="akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname=$masterHost
         |akka.remote.netty.tcp.port=$masterPort
        """.stripMargin)

    val sparkMasterSystem = ActorSystem("sparkMasterSystem", config)
    val sparkMaster = sparkMasterSystem.actorOf(Props[SparkMaster], "sparkMaster")

    sparkMaster ! "start"
  }
}
SparkWorker
package com.atguigu.spark.worker

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.atguigu.spark.common.{HeartBeat, RegisterWorkerInfo, RegisteredWorkerInfo, SendHeartBeat}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._

/**
  * @Date 2021/4/6 18:36
  * @Version 10.21
  * @Author DuanChaojie
  */
class SparkWorker(masterHost: String, masterPort: Int) extends Actor {

  var sparkMaster: ActorSelection = _

  override def preStart(): Unit = {
    sparkMaster = context.actorSelection(s"akka.tcp://sparkMasterSystem@${masterHost}:${masterPort}/user/sparkMaster")
  }

  // 使用UUID生成sparkWorker的id
  private val id: String = java.util.UUID.randomUUID().toString

  override def receive: Receive = {
    case "start" => {
      println(Console.BLUE + "SparkMaster启动了...")
      println("正在向SparkMaster注册自己...")
      //RegisterWorkerInfo(id: String, cpu: Int, ram: Int)
      sparkMaster ! RegisterWorkerInfo(id, 4, 256 * 1024)
    }
    case RegisteredWorkerInfo => {
      println("已经向SparkMaster注册成功!")

      import  context.dispatcher
      context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)
    }

    case SendHeartBeat => {
      println("给SparkMaster发送心跳~")
      sparkMaster ! HeartBeat(id)
    }
  }
}

object SparkWorker {
  /**
    * @param args
    *       workerHost       args(0)
    *       workerPort       args(1)
    *       masterHost       args(2)
    *       masterPort       args(3)
    */
  def main(args: Array[String]): Unit = {

    val (workerHost, workerPort, masterHost, masterPort) = (args(0),args(1).toInt,args(2),args(3).toInt)
    /** 创建config对象,指定协议类型,监听的ip和端口 */
    val config = ConfigFactory.parseString(
      s"""
         |akka.actor.provider="akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname=$workerHost
         |akka.remote.netty.tcp.port=$workerPort
        """.stripMargin)
    val sparkWorkerSystem = ActorSystem("sparkWorkerSystem", config)
    val sparkWorker = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort)), "sparkWorker")

    sparkWorker ! "start"
  }
}
MessageProtocol
package com.atguigu.spark.common

/**
  * @Date 2021/4/6 18:37
  * @Version 10.21
  * @Author DuanChaojie
  * MessageProtocol.scala
  */
class MessageProtocol {
}

/**
  * worker注册信息
  * @param id
  * @param cpu
  * @param ram
  */
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)


/**
  * 这个是WorkerInfo, 这个信息将来是保存到master的 hm(该hashmap是用于管理worker)
  * 将来这个WorkerInfo会扩展(比如增加worker上一次的心跳时间)
  * @param id
  * @param cpu
  * @param ram
  */
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {
  var lastHeartBeat : Long = System.currentTimeMillis()
}

/**
  * 当worker注册成功,服务器返回一个RegisteredWorkerInfo 对象
  */
case object RegisteredWorkerInfo

/**
  * worker每隔一定时间由定时器发给自己的一个消息
  */
case object SendHeartBeat

/**
  * worker每隔一定时间由定时器触发,而向master发现的协议消息
  * @param id
  */
case class HeartBeat(id: String)


/**
  * master给自己发送一个触发检查超时worker的信息
  */
case object StartTimeOutWorker

/**
  * master给自己发消息,检测worker,对于心跳超时的
  */
case object RemoveTimeOutWorker
项目结构图:

在这里插入图片描述

项目效果图:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

进行分布式部署(Linux系统)
#依次执行以下命令
java -jar SparkMaster.jar 127.0.0.1 7777

java -jar SparkWorker.jar 127.0.0.1 6666 127.0.0.1 7777
java -jar SparkWorker.jar 127.0.0.1 5555 127.0.0.1 7777
java -jar SparkWorker.jar 127.0.0.1 4444 127.0.0.1 7777

举报

相关推荐

0 条评论