从单机到分布式:一致性实战(三)引入缓存:多存储一致性
从这里开始 小明的二手书平台在读写分离上线后,数据库终于稳定了。但好景不长,运营同学兴奋地告诉他:“我们要搞一次促销活动!” 活动当天,流量暴涨。虽然有了从库分担读请求,但热门书籍的详情页依然让数据库不堪重负。监控显示,相同书籍的查询每秒重复了上万次。 “这些数据明明很少变化,为什么每次都要查数据库?“小明决定引入 Redis 缓存。 代码很快写好了: async fn get_book(pool: &PgPool, redis: &RedisClient, book_id: i64) -> Result<Book> { // 先查缓存 if let Some(cached) = redis.get(&format!("book:{}", book_id)).await? { return Ok(serde_json::from_str(&cached)?); } // 缓存未命中,查数据库 let book = sqlx::query_as!(Book, "SELECT * FROM books WHERE id = $1", book_id) .fetch_one(pool) .await?; // 写入缓存 redis.set_ex(&format!("book:{}", book_id), &serde_json::to_string(&book)?, 3600).await?; Ok(book) } 促销当天,一切看起来都很顺利。直到客服开始收到投诉:“我明明改了价格,为什么页面显示的还是旧的?” 小明查看日志,发现问题了:卖家更新了价格,数据库确实变了,但缓存里还是旧数据。用户看到的是缓存中的过期信息。 数据同时存在于两个地方,它们之间失去了同步——这就是缓存一致性问题。 问题的本质 缓存一致性问题的根源在于:数据库和缓存是两个独立的存储系统,对它们的操作无法在同一个事务中完成。 无论你以什么顺序操作,都可能出问题: 先更新数据库,再更新缓存: 时刻1: 请求A 更新数据库 price=100 时刻2: 请求B 更新数据库 price=200 时刻3: 请求B 更新缓存 price=200 时刻4: 请求A 更新缓存 price=100 // 并发导致缓存最终是旧值! 先更新缓存,再更新数据库: 时刻1: 请求A 更新缓存 price=100 时刻2: 请求A 更新数据库失败,回滚 时刻3: 缓存中 price=100,但数据库还是旧值 // 脏数据! 先删除缓存,再更新数据库: 时刻1: 请求A 删除缓存 时刻2: 请求B 读取,缓存未命中,从数据库读取旧值 时刻3: 请求B 将旧值写入缓存 时刻4: 请求A 更新数据库 // 缓存中又是旧值! 先更新数据库,再删除缓存(最常用): 时刻1: 请求A 读取,缓存未命中 时刻2: 请求A 从数据库读取 price=100 时刻3: 请求B 更新数据库 price=200 时刻4: 请求B 删除缓存 时刻5: 请求A 将 price=100 写入缓存 // 又是旧值! 看起来无论怎么做都不对?别急,让我们看看业界是如何解决的。 方案一:Cache-Aside(旁路缓存) 这是最经典、最广泛使用的方案。核心思想是:读时填充缓存,写时只删除缓存。 为什么"删除"而不是"更新”? 删除缓存的好处是: 避免并发写冲突:删除是幂等的,多次删除和一次删除效果相同 简化逻辑:不需要计算新值,让下一次读请求自然地填充正确的值 避免无效更新:如果数据更新后根本没人读,更新缓存就是浪费 实现 use redis::AsyncCommands; use sqlx::PgPool; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] pub struct Book { pub id: i64, pub title: String, pub price: i64, // 以分为单位 pub stock: i32, pub updated_at: chrono::DateTime<chrono::Utc>, } pub struct BookService { pool: PgPool, redis: redis::Client, } impl BookService { /// 读取书籍信息(Cache-Aside 模式) pub async fn get_book(&self, book_id: i64) -> Result<Book, Error> { let cache_key = format!("book:{}", book_id); let mut conn = self.redis.get_multiplexed_async_connection().await?; // 1. 先查缓存 let cached: Option<String> = conn.get(&cache_key).await?; if let Some(json) = cached { return Ok(serde_json::from_str(&json)?); } // 2. 缓存未命中,查数据库 let book = sqlx::query_as!(Book, "SELECT * FROM books WHERE id = $1", book_id) .fetch_one(&self.pool) .await?; // 3. 填充缓存(设置过期时间作为兜底) let json = serde_json::to_string(&book)?; conn.set_ex(&cache_key, &json, 3600).await?; Ok(book) } /// 更新书籍价格(先更新数据库,再删除缓存) pub async fn update_price(&self, book_id: i64, new_price: i64) -> Result<(), Error> { // 1. 更新数据库 sqlx::query!( "UPDATE books SET price = $1, updated_at = NOW() WHERE id = $2", new_price, book_id ) .execute(&self.pool) .await?; // 2. 删除缓存 let cache_key = format!("book:{}", book_id); let mut conn = self.redis.get_multiplexed_async_connection().await?; conn.del(&cache_key).await?; Ok(()) } } 仍然存在的问题 前面我们分析过,“先更新数据库,再删除缓存"在极端并发下仍可能出现不一致: ...