package
import
import
import
import
import
import
import
import
import
import scala.concurrent.ExecutionContext.Implicits.global
import
import
case class HeartBeat(taskid:Int,parent:String)
case class TaskFinished(taskid:Int)
case object
case object
case class TaskFailure(taskid:Int)
case class
case class
case class
class Parent(jobid:String,tasknum:Int) extends
val log=Logging(this.context.system,this)
var tasks=Array[ActorRef]()
var replySender=this.context.system.deadLetters
varcount=0;//存在问题
def
case
this.replySender=this.context.sender
tasks=(1 to tasknum).map(id=>this.context.actorOf(Props(new
tasks.foreach(actor=>(actor
}
case heartBeat:HeartBeat=>{
println("taskid-0000"+heartBeat.taskid+",finished:"+heartBeat.parent)
}
case TaskFinished(taskid)=>{
println("taskid-0000"+taskid+"finished...")
this.self
}
case Terminated(actor)=>
actor.path.toString()+"stop")
case TaskFailure(taskid)=>{
//restart task
valrestartActor=this.context.actorOf(Props(new Child(taskid*10)))
restartActor
}
case
this.count+=1
if(this.count==tasknum){
this.replySender ! akka.actor.Status.Success("all task finished")
}
this.count)
}
}
}
class Child(taskid:Int) extends
val log=Logging(this.context.system,this)
def
case
1000)
this.context.parent ! HeartBeat(taskid,"10%")
2000)
this.context.parent ! HeartBeat(taskid,"70%")
//task failed
this.context.stop(this.self)
if(taskid%3==0){
this.context.parent ! TaskFailure(this.taskid)
log.info("taskid="+taskid+" task failed")
else{
this.context.parent ! TaskFinished(this.taskid)
}
}
case
log.info(taskid+" restart...")
this.context.parent ! TaskFinished(this.taskid)
}
}
}
object
def
val system=ActorSystem("actorSystem")
val jobActor=system.actorOf(Props(new Parent("DataSplit-job",10)),"DataSplitJob")
val jobListener=ask(jobActor,JobStart)(10000)
jobListener.onComplete(result => resultmatch{
case Success(result)=>{
println("job finished...,message:"+result)
}
case Failure(result)=>{
"job failed...,message:"+result.getMessage())
}
})
}
}