侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

设计一个具有容错能力的分布式计算系统

2025-12-12 / 0 评论 / 5 阅读

题目

设计一个具有容错能力的分布式计算系统

信息

  • 类型:问答
  • 难度:⭐⭐

考点

Actor监管策略, 消息协议设计, 状态管理, 超时处理, 容错恢复

快速回答

实现要点:

  • 创建Master-Worker层级结构,Master使用OneForOneStrategy监管Worker
  • 定义ComputeTask/Result/FailureReport消息协议
  • Master使用ask模式配合Timeout处理响应超时
  • Worker捕获计算异常并通过Status.Failure反馈
  • Master使用become管理任务状态(等待/运行中)
## 解析

场景需求

设计一个分布式计算系统:Master接收计算任务,分发给Worker执行。要求:
1. Worker崩溃后自动重启
2. 单任务失败不影响整体
3. 处理响应超时
4. Master自身故障时停止所有子Actor

核心实现代码

// 消息协议
sealed trait Command
case class ComputeTask(data: Seq[Int]) extends Command
case class Result(value: Int) extends Command
case class FailureReport(reason: Throwable) extends Command

// Worker Actor
class Worker extends Actor {
  def receive: Receive = {
    case ComputeTask(data) => 
      try {
        val res = data.sum // 实际计算逻辑
        sender() ! Result(res)
      } catch {
        case e: Exception => 
          sender() ! Status.Failure(e) // 异常封装
      }
  }
}

// Master Actor
class Master extends Actor {
  import akka.actor.SupervisorStrategy._
  import scala.concurrent.duration._

  // 监管策略:Worker异常时1分钟内最多重启3次
  override val supervisorStrategy: SupervisorStrategy = 
    OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1.minute) {
      case _: ArithmeticException => Resume  // 计算错误继续处理
      case _: NullPointerException => Restart // 状态错误重启
      case _: Exception => Escalate           // 未知异常升级
    }

  private var pendingTasks = Map[String, ActorRef]()
  private val worker = context.actorOf(Props[Worker](), "worker")

  def receive: Receive = idle

  // 状态管理
  def idle: Receive = {
    case task: ComputeTask => 
      implicit val timeout: Timeout = Timeout(5.seconds) // 响应超时设置
      val askSender = sender()
      val taskId = java.util.UUID.randomUUID.toString

      (worker ? task).onComplete {
        case Success(Result(value)) => 
          askSender ! s"Result: $value"
          context.become(idle)
        case Success(FailureReport(ex)) => 
          context.system.log.error(s"Task failed: ${ex.getMessage}")
        case Failure(_) => 
          context.system.log.warning(s"Task $taskId timeout")
      }(context.dispatcher)

      pendingTasks += taskId -> askSender
      context.become(processing(taskId))
  }

  def processing(taskId: String): Receive = {
    case _: ComputeTask => sender() ! "System busy" // 拒绝新任务
    // 处理其他状态转换...
  }
}

关键设计原理

  • 监管策略:Master通过supervisorStrategy定义Worker的故障处理规则:
    • Resume:保留状态继续处理(适合临时错误)
    • Restart:重建Actor(解决状态污染)
    • Escalate:移交监管给父Actor(严重故障)
  • 消息协议
    • 使用sealed trait确保模式匹配完整性
    • Status.Failure包装异常实现跨Actor错误传递
  • 状态管理
    • become/unbecome实现状态机切换(idle/processing)
    • pendingTasks跟踪进行中任务

最佳实践

  • 超时处理:所有ask操作必须设置Timeout,避免资源泄漏
  • 错误隔离:Worker崩溃不影响Master,通过监管树实现故障隔离
  • 非阻塞设计:计算逻辑需异步,避免阻塞Actor线程
  • 幂等消息:任务消息包含唯一ID,支持重复发送

常见错误

  • 全局状态共享:在Actor内部使用var而非通过消息传递状态
  • 阻塞操作:在receive中执行同步I/O导致线程饥饿
  • 监管过度:错误使用AllForOneStrategy导致无关Worker被重启
  • 消息丢失:未处理DeadLetter或缺少ACK机制

扩展知识

  • 集群分片:通过ClusterSharding横向扩展Worker
  • 持久化:使用PersistentActor保存任务状态应对Master故障
  • 路由策略Router配合SmallestMailbox实现负载均衡
  • Circuit Breaker:集成断路器模式防止级联故障