题目
设计一个具有容错能力的分布式计算系统
信息
- 类型:问答
- 难度:⭐⭐
考点
Actor监管策略, 消息协议设计, 状态管理, 超时处理, 容错恢复
快速回答
实现要点:
- 创建Master-Worker层级结构,Master使用
OneForOneStrategy监管Worker - 定义
ComputeTask/Result/FailureReport消息协议 - Master使用
ask模式配合Timeout处理响应超时 - Worker捕获计算异常并通过
Status.Failure反馈 - Master使用
become管理任务状态(等待/运行中)
场景需求
设计一个分布式计算系统:Master接收计算任务,分发给Worker执行。要求:
1. Worker崩溃后自动重启
2. 单任务失败不影响整体
3. 处理响应超时
4. Master自身故障时停止所有子Actor
核心实现代码
// 消息协议
sealed trait Command
case class ComputeTask(data: Seq[Int]) extends Command
case class Result(value: Int) extends Command
case class FailureReport(reason: Throwable) extends Command
// Worker Actor
class Worker extends Actor {
def receive: Receive = {
case ComputeTask(data) =>
try {
val res = data.sum // 实际计算逻辑
sender() ! Result(res)
} catch {
case e: Exception =>
sender() ! Status.Failure(e) // 异常封装
}
}
}
// Master Actor
class Master extends Actor {
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
// 监管策略:Worker异常时1分钟内最多重启3次
override val supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1.minute) {
case _: ArithmeticException => Resume // 计算错误继续处理
case _: NullPointerException => Restart // 状态错误重启
case _: Exception => Escalate // 未知异常升级
}
private var pendingTasks = Map[String, ActorRef]()
private val worker = context.actorOf(Props[Worker](), "worker")
def receive: Receive = idle
// 状态管理
def idle: Receive = {
case task: ComputeTask =>
implicit val timeout: Timeout = Timeout(5.seconds) // 响应超时设置
val askSender = sender()
val taskId = java.util.UUID.randomUUID.toString
(worker ? task).onComplete {
case Success(Result(value)) =>
askSender ! s"Result: $value"
context.become(idle)
case Success(FailureReport(ex)) =>
context.system.log.error(s"Task failed: ${ex.getMessage}")
case Failure(_) =>
context.system.log.warning(s"Task $taskId timeout")
}(context.dispatcher)
pendingTasks += taskId -> askSender
context.become(processing(taskId))
}
def processing(taskId: String): Receive = {
case _: ComputeTask => sender() ! "System busy" // 拒绝新任务
// 处理其他状态转换...
}
}关键设计原理
- 监管策略:Master通过
supervisorStrategy定义Worker的故障处理规则:Resume:保留状态继续处理(适合临时错误)Restart:重建Actor(解决状态污染)Escalate:移交监管给父Actor(严重故障)
- 消息协议:
- 使用
sealed trait确保模式匹配完整性 Status.Failure包装异常实现跨Actor错误传递
- 使用
- 状态管理:
become/unbecome实现状态机切换(idle/processing)pendingTasks跟踪进行中任务
最佳实践
- 超时处理:所有
ask操作必须设置Timeout,避免资源泄漏 - 错误隔离:Worker崩溃不影响Master,通过监管树实现故障隔离
- 非阻塞设计:计算逻辑需异步,避免阻塞Actor线程
- 幂等消息:任务消息包含唯一ID,支持重复发送
常见错误
- 全局状态共享:在Actor内部使用
var而非通过消息传递状态 - 阻塞操作:在receive中执行同步I/O导致线程饥饿
- 监管过度:错误使用
AllForOneStrategy导致无关Worker被重启 - 消息丢失:未处理
DeadLetter或缺少ACK机制
扩展知识
- 集群分片:通过
ClusterSharding横向扩展Worker - 持久化:使用
PersistentActor保存任务状态应对Master故障 - 路由策略:
Router配合SmallestMailbox实现负载均衡 - Circuit Breaker:集成断路器模式防止级联故障