题目
实现一个支持重试、节流、错误隔离的Combine网络层
信息
- 类型:问答
- 难度:⭐⭐⭐
考点
自定义Publisher, 错误处理策略, 背压管理, 线程调度, 操作符组合
快速回答
实现要点:
- 创建自定义
NetworkPublisher处理URLSession数据任务 - 使用
tryCatch+retry实现指数退避重试机制 - 通过
buffer控制背压,避免内存溢出 - 使用
subscribe(on:)/receive(on:)管理线程切换 - 错误隔离:将URLError转换为领域错误类型
- 添加
debounce防止快速重复请求
核心需求
设计一个生产级网络层,需处理:
- 网络错误时自动重试(带指数退避)
- 高频请求时进行节流控制
- 避免快速失败导致整个流终止
- 严格的内存管理和线程安全
解决方案代码
struct NetworkError: Error, Equatable {
case invalidResponse(Int)
case decodingError
case rateLimitExceeded
}
struct NetworkPublisher: Publisher {
typealias Output = Data
typealias Failure = NetworkError
let request: URLRequest
func receive(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
let subscription = NetworkSubscription(request: request, subscriber: subscriber)
subscriber.receive(subscription: subscription)
}
final class NetworkSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Failure {
private var task: URLSessionDataTask?
private let request: URLRequest
private var subscriber: S?
init(request: URLRequest, subscriber: S) {
self.request = request
self.subscriber = subscriber
}
func request(_ demand: Subscribers.Demand) {
guard demand > 0 else { return }
task = URLSession.shared.dataTask(with: request) { [weak self] data, response, error in
guard let self else { return }
if let error = error as? URLError {
_ = self.subscriber?.receive(completion: .failure(self.mapURLError(error)))
return
}
guard let httpResponse = response as? HTTPURLResponse else {
_ = self.subscriber?.receive(completion: .failure(.invalidResponse(0)))
return
}
switch httpResponse.statusCode {
case 200..<300:
if let data {
_ = self.subscriber?.receive(data)
self.subscriber?.receive(completion: .finished)
}
case 429:
_ = self.subscriber?.receive(completion: .failure(.rateLimitExceeded))
default:
_ = self.subscriber?.receive(completion: .failure(.invalidResponse(httpResponse.statusCode)))
}
}
task?.resume()
}
func cancel() {
task?.cancel()
subscriber = nil
}
private func mapURLError(_ error: URLError) -> NetworkError {
switch error.code {
case .timedOut, .cannotConnectToHost: return .invalidResponse(-1001)
default: return .invalidResponse(error.errorCode)
}
}
}
}
// 使用示例
let request = URLRequest(url: URL(string: "https://api.example.com/data")!)
NetworkPublisher(request: request)
.retry(3) // 基础重试
.tryCatch { error -> AnyPublisher<Data, NetworkError> in
if case .rateLimitExceeded = error {
return Just(())
.delay(for: .seconds(5), scheduler: DispatchQueue.global())
.flatMap { _ in NetworkPublisher(request: request) }
.eraseToAnyPublisher()
}
throw error
}
.buffer(size: 50, prefetch: .keepFull, whenFull: .dropOldest) // 背压控制
.debounce(for: .seconds(0.5), scheduler: DispatchQueue.main) // 节流
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.receive(on: DispatchQueue.main)
.sink(
receiveCompletion: { completion in
if case .failure(let error) = completion {
print("最终错误: ", error)
}
},
receiveValue: { data in
print("收到数据: ", data)
}
)
关键设计原理
- 自定义Publisher:封装URLSession的异步回调为Combine流,实现精确的生命周期控制(
request/cancel) - 错误转换层:将系统URLError转换为领域特定错误,避免实现细节泄露
- 指数退避重试:通过
tryCatch嵌套实现针对特定错误(如429)的定制重试策略 - 背压管理:
buffer操作符防止下游处理缓慢导致内存堆积,设置.dropOldest策略保证实时性 - 线程安全:
subscribe(on:)确保上游操作在后台线程,receive(on:)保证下游在主线程更新UI
最佳实践
- 错误隔离:定义领域错误类型,避免传递系统级错误
- 资源管理:在Subscription的
cancel()中释放资源,防止内存泄漏 - 流量控制:高频场景(如搜索框)必须组合
debounce+buffer - 重试策略:区分可重试错误(网络超时)和不可重试错误(400 Bad Request)
常见陷阱
- 内存泄漏:未在Subscription中正确使用
[weak self] - 线程阻塞:在
receive方法执行同步耗时操作 - 错误吞噬:
tryCatch未重新抛出未处理错误导致静默失败 - 背压失控:未处理
Subscribers.Demand导致缓冲区无限增长
扩展优化
- 请求拦截:在Publisher中添加认证令牌刷新逻辑
- 响应缓存:插入
handleEvents记录成功响应到CoreData - 性能监控:使用
measureInterval跟踪请求耗时 - 测试策略:注入
URLProtocol模拟网络响应,验证重试逻辑