0
点赞
收藏
分享

微信扫一扫

scala akka 修炼之路1(使用actor实现一个job的并发计算和task失败重启)


package

 

import
import
import
import
import
import
import
import
import
import scala.concurrent.ExecutionContext.Implicits.global
import
import

 

case class HeartBeat(taskid:Int,parent:String)
case class TaskFinished(taskid:Int)
case object
case object
case class TaskFailure(taskid:Int)
case class
case class
case class

 

class Parent(jobid:String,tasknum:Int) extends
val log=Logging(this.context.system,this)
var tasks=Array[ActorRef]()
var replySender=this.context.system.deadLetters
  varcount=0;//存在问题 
def
case
this.replySender=this.context.sender
tasks=(1 to tasknum).map(id=>this.context.actorOf(Props(new
tasks.foreach(actor=>(actor
    }
case heartBeat:HeartBeat=>{
      println("taskid-0000"+heartBeat.taskid+",finished:"+heartBeat.parent)
    }
case TaskFinished(taskid)=>{
      println("taskid-0000"+taskid+"finished...")
this.self
    }
case Terminated(actor)=>
actor.path.toString()+"stop")
case TaskFailure(taskid)=>{
      //restart task
valrestartActor=this.context.actorOf(Props(new Child(taskid*10)))
restartActor
    } 
case
this.count+=1
if(this.count==tasknum){
this.replySender ! akka.actor.Status.Success("all task finished")
      }
this.count)
    }
  }
}

class Child(taskid:Int) extends
val log=Logging(this.context.system,this)
def
case
1000)
this.context.parent ! HeartBeat(taskid,"10%")
2000)
this.context.parent ! HeartBeat(taskid,"70%")
      
      //task failed 
this.context.stop(this.self)
if(taskid%3==0){
this.context.parent ! TaskFailure(this.taskid)
log.info("taskid="+taskid+" task failed")
else{
this.context.parent ! TaskFinished(this.taskid)
      }     
    }

case
log.info(taskid+" restart...")
this.context.parent ! TaskFinished(this.taskid)
    }
  }
}

object
def
val system=ActorSystem("actorSystem")
val jobActor=system.actorOf(Props(new Parent("DataSplit-job",10)),"DataSplitJob")
val jobListener=ask(jobActor,JobStart)(10000)
jobListener.onComplete(result => resultmatch{
case Success(result)=>{
      println("job finished...,message:"+result)
    }
case Failure(result)=>{
"job failed...,message:"+result.getMessage())
    }
  })
}
}





举报

相关推荐

0 条评论