侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

实现一个支持重试、节流、错误隔离的Combine网络层

2025-12-12 / 0 评论 / 4 阅读

题目

实现一个支持重试、节流、错误隔离的Combine网络层

信息

  • 类型:问答
  • 难度:⭐⭐⭐

考点

自定义Publisher, 错误处理策略, 背压管理, 线程调度, 操作符组合

快速回答

实现要点:

  • 创建自定义NetworkPublisher处理URLSession数据任务
  • 使用tryCatch+retry实现指数退避重试机制
  • 通过buffer控制背压,避免内存溢出
  • 使用subscribe(on:)/receive(on:)管理线程切换
  • 错误隔离:将URLError转换为领域错误类型
  • 添加debounce防止快速重复请求
## 解析

核心需求

设计一个生产级网络层,需处理:

  • 网络错误时自动重试(带指数退避)
  • 高频请求时进行节流控制
  • 避免快速失败导致整个流终止
  • 严格的内存管理和线程安全

解决方案代码

struct NetworkError: Error, Equatable {
    case invalidResponse(Int)
    case decodingError
    case rateLimitExceeded
}

struct NetworkPublisher: Publisher {
    typealias Output = Data
    typealias Failure = NetworkError

    let request: URLRequest

    func receive(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        let subscription = NetworkSubscription(request: request, subscriber: subscriber)
        subscriber.receive(subscription: subscription)
    }

    final class NetworkSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Failure {
        private var task: URLSessionDataTask?
        private let request: URLRequest
        private var subscriber: S?

        init(request: URLRequest, subscriber: S) {
            self.request = request
            self.subscriber = subscriber
        }

        func request(_ demand: Subscribers.Demand) {
            guard demand > 0 else { return }

            task = URLSession.shared.dataTask(with: request) { [weak self] data, response, error in
                guard let self else { return }

                if let error = error as? URLError {
                    _ = self.subscriber?.receive(completion: .failure(self.mapURLError(error)))
                    return
                }

                guard let httpResponse = response as? HTTPURLResponse else {
                    _ = self.subscriber?.receive(completion: .failure(.invalidResponse(0)))
                    return
                }

                switch httpResponse.statusCode {
                case 200..<300:
                    if let data {
                        _ = self.subscriber?.receive(data)
                        self.subscriber?.receive(completion: .finished)
                    }
                case 429:
                    _ = self.subscriber?.receive(completion: .failure(.rateLimitExceeded))
                default:
                    _ = self.subscriber?.receive(completion: .failure(.invalidResponse(httpResponse.statusCode)))
                }
            }
            task?.resume()
        }

        func cancel() {
            task?.cancel()
            subscriber = nil
        }

        private func mapURLError(_ error: URLError) -> NetworkError {
            switch error.code {
            case .timedOut, .cannotConnectToHost: return .invalidResponse(-1001)
            default: return .invalidResponse(error.errorCode)
            }
        }
    }
}

// 使用示例
let request = URLRequest(url: URL(string: "https://api.example.com/data")!)

NetworkPublisher(request: request)
    .retry(3) // 基础重试
    .tryCatch { error -> AnyPublisher<Data, NetworkError> in
        if case .rateLimitExceeded = error {
            return Just(())
                .delay(for: .seconds(5), scheduler: DispatchQueue.global())
                .flatMap { _ in NetworkPublisher(request: request) }
                .eraseToAnyPublisher()
        }
        throw error
    }
    .buffer(size: 50, prefetch: .keepFull, whenFull: .dropOldest) // 背压控制
    .debounce(for: .seconds(0.5), scheduler: DispatchQueue.main) // 节流
    .subscribe(on: DispatchQueue.global(qos: .userInitiated))
    .receive(on: DispatchQueue.main)
    .sink(
        receiveCompletion: { completion in
            if case .failure(let error) = completion {
                print("最终错误: ", error)
            }
        },
        receiveValue: { data in
            print("收到数据: ", data)
        }
    )

关键设计原理

  • 自定义Publisher:封装URLSession的异步回调为Combine流,实现精确的生命周期控制(request/cancel
  • 错误转换层:将系统URLError转换为领域特定错误,避免实现细节泄露
  • 指数退避重试:通过tryCatch嵌套实现针对特定错误(如429)的定制重试策略
  • 背压管理buffer操作符防止下游处理缓慢导致内存堆积,设置.dropOldest策略保证实时性
  • 线程安全subscribe(on:)确保上游操作在后台线程,receive(on:)保证下游在主线程更新UI

最佳实践

  • 错误隔离:定义领域错误类型,避免传递系统级错误
  • 资源管理:在Subscription的cancel()中释放资源,防止内存泄漏
  • 流量控制:高频场景(如搜索框)必须组合debounce+buffer
  • 重试策略:区分可重试错误(网络超时)和不可重试错误(400 Bad Request)

常见陷阱

  • 内存泄漏:未在Subscription中正确使用[weak self]
  • 线程阻塞:在receive方法执行同步耗时操作
  • 错误吞噬tryCatch未重新抛出未处理错误导致静默失败
  • 背压失控:未处理Subscribers.Demand导致缓冲区无限增长

扩展优化

  • 请求拦截:在Publisher中添加认证令牌刷新逻辑
  • 响应缓存:插入handleEvents记录成功响应到CoreData
  • 性能监控:使用measureInterval跟踪请求耗时
  • 测试策略:注入URLProtocol模拟网络响应,验证重试逻辑
    载入天数...载入时分秒...