题目
设计一个基于Combine的复杂数据流处理系统,处理网络请求、数据转换、错误处理和线程调度
信息
- 类型:问答
- 难度:⭐⭐⭐
考点
Publisher链式操作, 错误处理策略, 线程调度优化, 内存管理, 自定义Publisher
快速回答
实现要点:
- 使用
URLSession.dataTaskPublisher发起网络请求 - 通过
tryMap进行JSON解码和数据类型转换 - 使用
catch和retry实现错误恢复机制 - 利用
subscribe(on:)和receive(on:)精确控制线程 - 通过
share()共享Publisher避免重复请求 - 使用
AnyCancellable管理订阅生命周期
场景需求
设计一个从多个API获取数据并合并处理的系统:
1. 从API-A获取用户ID列表
2. 根据ID并发请求API-B获取用户详情
3. 过滤无效数据并转换格式
4. 实现错误重试和降级逻辑
5. 确保线程安全及内存管理
完整实现方案
import Combine
struct UserService {
// 1. 获取用户ID列表
func fetchUserIDs() -> AnyPublisher<[String], Error> {
URLSession.shared.dataTaskPublisher(for: URL(string: "https://api.example.com/ids")!)
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.tryMap { data, _ in
try JSONDecoder().decode([String].self, from: data)
}
.eraseToAnyPublisher()
}
// 2. 获取用户详情(带错误恢复)
func fetchUserDetail(id: String) -> AnyPublisher<UserDetail, Error> {
let url = URL(string: "https://api.example.com/user/\(id)")!
return URLSession.shared.dataTaskPublisher(for: url)
.tryMap { output in
guard let httpResponse = output.response as? HTTPURLResponse,
httpResponse.statusCode == 200 else {
throw URLError(.badServerResponse)
}
return try JSONDecoder().decode(UserDetail.self, from: output.data)
}
.retry(2) // 最多重试2次
.catch { _ in // 降级处理
Just(UserDetail.fallbackData())
.setFailureType(to: Error.self)
}
.eraseToAnyPublisher()
}
// 3. 组合数据流
func consolidatedUserData() -> AnyPublisher<[ProcessedUser], Error> {
fetchUserIDs()
.flatMap { ids -> AnyPublisher<ProcessedUser, Error> in
Publishers.MergeMany(ids.prefix(5).map { id in // 限制并发数量
self.fetchUserDetail(id: id)
.tryMap { detail -> ProcessedUser in
guard detail.isValid else { throw DataError.invalidFormat }
return ProcessedUser(detail: detail)
}
.subscribe(on: DispatchQueue.global())
})
.eraseToAnyPublisher()
}
.collect() // 聚合结果
.receive(on: DispatchQueue.main) // 切换到主线程更新UI
.share() // 避免重复请求
.eraseToAnyPublisher()
}
}
// 内存管理示例
class UserViewModel: ObservableObject {
@Published var users: [ProcessedUser] = []
private var cancellables = Set<AnyCancellable>()
func loadData() {
UserService().consolidatedUserData()
.sink(receiveCompletion: { completion in
if case .failure(let error) = completion {
print("处理错误: ", error.localizedDescription)
}
}, receiveValue: { [weak self] users in
self?.users = users
})
.store(in: &cancellables)
}
}核心考察点解析
- Publisher链式操作:
通过flatMap嵌套异步操作,MergeMany处理并发流,collect聚合结果 - 错误处理策略:
retry自动重试失败请求,catch提供降级数据,tryMap抛出转换错误 - 线程调度优化:
subscribe(on:)指定上游操作线程,receive(on:)控制下游接收线程 - 内存管理关键:
使用share()避免重复创建Publisher,[weak self]打破循环引用,AnyCancellable管理订阅 - 性能优化:
ids.prefix(5)限制并发数量,防止资源耗尽
常见错误
- 线程使用不当:在后台线程更新UI导致崩溃
- 内存泄漏:未使用
[weak self]或未存储AnyCancellable - 错误处理缺失:未处理
tryMap可能抛出的异常 - 过度并发:未限制
MergeMany的并发量导致系统资源耗尽 - 重复请求:多个订阅者导致重复网络请求(应使用
share())
最佳实践
- 使用
eraseToAnyPublisher隐藏复杂类型 - 对网络请求添加超时控制:
.timeout(.seconds(10), scheduler: DispatchQueue.global()) - 关键操作添加日志:
.handleEvents(receiveOutput: { print("Received: ", $0) }) - 使用
CombineLatest处理多数据源合并 - 实现
Backpressure策略:通过buffer或自定义Subscriber控制流量
扩展知识
- 自定义Publisher:实现
Publisher协议处理特定数据源(如数据库观察) - 背压管理:使用
Subscribers.Demand控制数据请求速率 - 测试策略:利用
TestScheduler控制虚拟时间进行流测试 - 与SwiftUI集成:
@Published属性包装器驱动视图更新 - 性能监控:使用
.breakpointOnError()调试错误流