题目
设计一个基于协程的线程安全缓存系统,支持过期时间和最大容量限制
信息
- 类型:问答
- 难度:⭐⭐⭐
考点
结构化并发, 协程状态管理, 并发冲突处理, 资源限制优化
快速回答
实现要点:
- 使用
Mutex保护共享状态,避免并发修改冲突 - 通过
CoroutineScope管理后台清理协程的生命周期 - 结合
LinkedHashMap实现 LRU 淘汰策略 - 使用
Delay和Channel实现高效过期清理 - 处理协程取消时的资源释放
核心需求分析
需要实现一个支持以下特性的缓存:
1. 线程安全的并发访问
2. 条目过期自动失效
3. 最大容量限制(LRU淘汰)
4. 高效的后台清理机制
完整实现方案
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.*
import kotlin.time.Duration
class ExpiringLruCache<K, V>(
private val maxSize: Int,
private val defaultExpiration: Duration
) {
// 共享状态保护
private val mutex = Mutex()
// 存储结构:双向链表实现LRU + 哈希表快速访问
private val cacheMap = LinkedHashMap<K, CacheEntry<V>>(maxSize, 0.75f, true)
// 过期清理通道
private val cleanupChannel = Channel<CleanupCommand>(Channel.UNLIMITED)
// 结构化并发作用域
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
init {
// 启动后台清理协程
scope.launch { processCleanupCommands() }
}
private data class CacheEntry<V>(
val value: V,
val expireAt: Long = System.currentTimeMillis() + defaultExpiration.inWholeMilliseconds,
var cleanupJob: Job? = null
)
private sealed interface CleanupCommand {
data class Expired<K>(val key: K) : CleanupCommand
object CapacityFull : CleanupCommand
}
suspend fun put(key: K, value: V, customExpiration: Duration? = null) = mutex.withLock {
// 取消现有清理任务(如果存在)
cacheMap[key]?.cleanupJob?.cancel()
// 创建新条目
val expiration = customExpiration ?: defaultExpiration
val entry = CacheEntry(
value = value,
expireAt = System.currentTimeMillis() + expiration.inWholeMilliseconds
).apply {
// 启动新的过期清理任务
cleanupJob = scope.launch {
delay(expiration)
cleanupChannel.send(CleanupCommand.Expired(key))
}
}
cacheMap[key] = entry
// 触发容量检查
if (cacheMap.size > maxSize) {
cleanupChannel.trySend(CleanupCommand.CapacityFull)
}
}
suspend fun get(key: K): V? = mutex.withLock {
cacheMap[key]?.let { entry ->
if (entry.expireAt > System.currentTimeMillis()) entry.value
else {
remove(key)
null
}
}
}
private suspend fun remove(key: K) = mutex.withLock {
cacheMap.remove(key)?.cleanupJob?.cancel()
}
private suspend fun processCleanupCommands() {
for (command in cleanupChannel) {
when (command) {
is CleanupCommand.Expired<*> -> {
@Suppress("UNCHECKED_CAST")
remove(command.key as K)
}
CleanupCommand.CapacityFull -> mutex.withLock {
while (cacheMap.size > maxSize) {
val eldestKey = cacheMap.keys.first()
cacheMap.remove(eldestKey)?.cleanupJob?.cancel()
}
}
}
}
}
suspend fun close() {
scope.coroutineContext.cancelChildren()
scope.cancel()
cleanupChannel.close()
}
}
关键技术解析
- 结构化并发:通过 CoroutineScope 统一管理所有后台协程,确保缓存关闭时自动取消所有任务
- 并发控制:
- Mutex 保护 cacheMap 的读写操作
- Channel 解耦清理触发和执行,避免阻塞主操作
- LRU 实现:利用 LinkedHashMap 的访问顺序特性(第三个参数=true)
- 过期处理优化:
- 每个条目独立计时器:精准控制过期时间
- 延迟任务通过 Channel 通知:避免在延迟任务中直接操作共享状态
最佳实践
- 资源释放:在 remove/close 中显式取消协程,防止内存泄漏
- 背压处理:使用 trySend 避免容量清理阻塞 put 操作
- 取消传播:使用 SupervisorJob 防止单个条目清理失败影响整体
常见错误
- 协程泄漏:忘记取消已移除条目的 delay 协程
- 并发修改异常:未用 Mutex 保护 LinkedHashMap 的结构修改
- 取消处理不当:在清理协程中未处理 CancellationException
- 性能瓶颈:在全局锁内执行 delay 等挂起操作
扩展优化方向
- 时间轮算法:当条目量极大时,用时间轮替代独立 Delay 提升性能
- 分层过期队列:按过期时间分桶管理,减少扫描开销
- 异步加载:集成 suspend () -> V 实现 "getOrLoad" 模式
- 命中率统计:添加监控指标用于容量调优