从单机到分布式:一致性实战(六)消息驱动:最终一致性

从这里开始 上一章,小明使用本地消息表解决了跨服务事务问题。随着系统演进,消息队列已经成为服务间通信的核心基础设施。 但新的问题接踵而至。一天早上,运营同学报告了一个奇怪的现象:“有个用户的订单被重复扣款了三次!” 小明查看日志,发现消息确实发送了三次。原因是消息发送后,在收到 broker 确认之前网络超时了,发送端以为失败了就重试。 “那我加个去重逻辑?“小明想。但很快又发现另一个问题:有些消息根本没发出去就丢了。 消息队列不是银弹,它本身也有一致性问题需要解决。 消息投递的三种语义 消息队列的投递保证分为三种: At-Most-Once(最多一次) 消息发送后不等待确认,可能丢失,但不会重复。 // At-Most-Once:发了就不管 async fn send_at_most_once(producer: &KafkaProducer, message: &Message) { // 发送后立即返回,不等待确认 producer.send_fire_and_forget(message).await; // 消息可能丢失,但代码继续执行 } 适用场景:日志收集、监控数据等允许丢失的场景 At-Least-Once(至少一次) 消息发送后等待确认,失败则重试,保证不丢但可能重复。 // At-Least-Once:确认收到才算成功 async fn send_at_least_once( producer: &KafkaProducer, message: &Message, ) -> Result<(), Error> { loop { match producer.send(message).await { Ok(()) => return Ok(()), Err(e) if e.is_retriable() => { tracing::warn!("Send failed, retrying: {}", e); tokio::time::sleep(Duration::from_millis(100)).await; // 继续重试,可能导致重复 } Err(e) => return Err(e), } } } 这是最常用的语义,因为重复可以通过消费端幂等来解决,但丢失无法恢复。 Exactly-Once(恰好一次) 消息既不丢失也不重复,是最理想的状态。 但真正的 Exactly-Once 在分布式系统中几乎不可能实现。所谓的 “Exactly-Once” 实际上是 “At-Least-Once + 幂等消费”。 // "Exactly-Once" = At-Least-Once 发送 + 幂等消费 async fn send_exactly_once( producer: &KafkaProducer, message: &Message, ) -> Result<(), Error> { // 消息必须有唯一 ID let message_id = message.id; // At-Least-Once 发送 producer.send_with_retry(message).await?; Ok(()) } async fn consume_exactly_once( pool: &PgPool, message: &Message, ) -> Result<(), Error> { // 幂等消费(见下文) todo!() } 生产端:确保消息发出 问题:本地事务与消息发送的一致性 // 这段代码有问题 async fn create_order_wrong(pool: &PgPool, producer: &KafkaProducer, order: Order) -> Result<(), Error> { // 1. 写数据库 sqlx::query!("INSERT INTO orders (id, user_id) VALUES ($1, $2)", order.id, order.user_id) .execute(pool) .await?; // 2. 发消息 // 如果这里失败了,订单已创建但消息没发出 producer.send(&OrderCreatedEvent::from(&order)).await?; Ok(()) } 解决方案一:本地消息表(Transactional Outbox) 我们在上一章已经详细介绍过,这里再强调关键点: ...

December 11, 2025 · 9 min · 1838 words · Nanlong

从单机到分布式:一致性实战(五)服务拆分:跨服务事务一致性

从这里开始 分库分表解决了数据量问题后,小明的二手书平台继续快速发展。团队也从最初的 3 人扩展到了 20 人。 单体应用的问题开始显现: 代码库越来越大,改一个小功能都要担心影响其他模块 每次发布都是全量部署,风险高 不同模块的技术栈被绑定在一起,无法独立演进 技术负责人建议:“是时候拆分服务了。” 团队经过讨论,决定把系统拆分成多个服务: 用户服务:管理用户账户和余额 库存服务:管理书籍库存 订单服务:处理订单逻辑 拆分后,架构清晰了很多。但第一个需求就让小明犯了难: 用户下单时,需要:1)从用户账户扣款,2)扣减库存,3)创建订单 在单体应用时代,这三个操作可以放在同一个数据库事务里: // 单体时代:一个事务搞定 async fn place_order(pool: &PgPool, order: OrderRequest) -> Result<Order, Error> { let mut tx = pool.begin().await?; // 1. 扣款 sqlx::query!("UPDATE users SET balance = balance - $1 WHERE id = $2", order.amount, order.user_id) .execute(&mut *tx).await?; // 2. 扣库存 sqlx::query!("UPDATE books SET stock = stock - 1 WHERE id = $1", order.book_id) .execute(&mut *tx).await?; // 3. 创建订单 let order = sqlx::query_as!(Order, "INSERT INTO orders (user_id, book_id, amount) VALUES ($1, $2, $3) RETURNING *", order.user_id, order.book_id, order.amount ).fetch_one(&mut *tx).await?; tx.commit().await?; Ok(order) } 但现在,用户余额在用户服务的数据库,库存在库存服务的数据库,订单在订单服务的数据库。三个操作跨越三个独立的数据库,无法在同一个事务中完成。 如果扣款成功了,但扣库存失败了,怎么办?钱扣了,货没发,用户肯定要投诉。 这就是跨服务事务一致性问题。 问题的本质 分布式事务难在哪里?让我们看一个简化的场景: 订单服务 库存服务 │ │ │──── 1. 扣库存请求 ──────>│ │ │ (扣库存成功) │<─── 2. 扣库存响应 ───────│ │ │ │ (创建订单...) │ │ (本地事务失败!) │ │ │ │ 现在怎么办? │ │ 库存已经扣了,但订单没创建│ 问题的根源是: 没有全局事务协调者:每个服务只能控制自己的本地事务 网络不可靠:调用可能超时、失败、或成功但响应丢失 部分失败:一个操作成功,另一个失败 传统解决方案是两阶段提交(2PC),但它有致命缺陷: ...

December 11, 2025 · 12 min · 2405 words · Nanlong

从单机到分布式:一致性实战(四)数据分片:跨分片事务一致性

从这里开始 经过缓存优化后,小明的二手书平台性能有了很大提升。但新的问题出现了:数据量太大,单个数据库快撑不住了。 订单表已经有上亿条数据,每次查询都很慢,添加索引也无济于事。磁盘空间也快满了,单机存储已经到达瓶颈。 DBA 看了监控后说:“是时候分库分表了。” 小明决定按 user_id 将订单数据分到 4 个数据库分片中: user_id % 4 = 0 → shard_0 user_id % 4 = 1 → shard_1 user_id % 4 = 2 → shard_2 user_id % 4 = 3 → shard_3 分片后,单个用户的订单查询飞快了。但很快遇到了新问题: 问题一:跨分片查询 “给我查最近 7 天所有用户的订单总额。“运营说。 这意味着要查询所有 4 个分片,然后合并结果。 问题二:跨分片事务 “用户 A(在 shard_0)想把一本书转让给用户 B(在 shard_1)。” 这涉及两个分片的数据修改,如何保证原子性? 问题三:分片键变更 “用户要改手机号,但我们是按手机号分片的…” 分片键变更意味着数据要迁移到另一个分片。 这就是数据分片带来的一致性挑战。 分片基础 分片路由 首先实现一个分片路由器: use sqlx::PgPool; use std::collections::HashMap; use std::sync::Arc; /// 分片路由器 pub struct ShardRouter { shards: Vec<Arc<PgPool>>, shard_count: usize, } impl ShardRouter { pub fn new(shard_pools: Vec<PgPool>) -> Self { let shard_count = shard_pools.len(); Self { shards: shard_pools.into_iter().map(Arc::new).collect(), shard_count, } } /// 根据分片键获取分片 pub fn get_shard(&self, shard_key: i64) -> &PgPool { let shard_index = (shard_key as usize) % self.shard_count; &self.shards[shard_index] } /// 获取所有分片(用于跨分片查询) pub fn all_shards(&self) -> &[Arc<PgPool>] { &self.shards } /// 根据分片键计算分片索引 pub fn shard_index(&self, shard_key: i64) -> usize { (shard_key as usize) % self.shard_count } } 基本的分片读写 pub struct ShardedOrderRepository { router: ShardRouter, } impl ShardedOrderRepository { /// 写入订单(路由到对应分片) pub async fn create_order(&self, order: &Order) -> Result<(), Error> { let pool = self.router.get_shard(order.user_id); sqlx::query!( r#" INSERT INTO orders (id, user_id, book_id, amount, status, created_at) VALUES ($1, $2, $3, $4, $5, NOW()) "#, order.id, order.user_id, order.book_id, order.amount, order.status, ) .execute(pool) .await?; Ok(()) } /// 查询用户订单(单分片查询) pub async fn get_user_orders(&self, user_id: i64) -> Result<Vec<Order>, Error> { let pool = self.router.get_shard(user_id); let orders = sqlx::query_as!(Order, "SELECT * FROM orders WHERE user_id = $1 ORDER BY created_at DESC", user_id ) .fetch_all(pool) .await?; Ok(orders) } } 问题一:跨分片查询 当查询条件不包含分片键时,需要查询所有分片。 ...

December 11, 2025 · 13 min · 2588 words · Nanlong

从单机到分布式:一致性实战(三)引入缓存:多存储一致性

从这里开始 小明的二手书平台在读写分离上线后,数据库终于稳定了。但好景不长,运营同学兴奋地告诉他:“我们要搞一次促销活动!” 活动当天,流量暴涨。虽然有了从库分担读请求,但热门书籍的详情页依然让数据库不堪重负。监控显示,相同书籍的查询每秒重复了上万次。 “这些数据明明很少变化,为什么每次都要查数据库?“小明决定引入 Redis 缓存。 代码很快写好了: async fn get_book(pool: &PgPool, redis: &RedisClient, book_id: i64) -> Result<Book> { // 先查缓存 if let Some(cached) = redis.get(&format!("book:{}", book_id)).await? { return Ok(serde_json::from_str(&cached)?); } // 缓存未命中,查数据库 let book = sqlx::query_as!(Book, "SELECT * FROM books WHERE id = $1", book_id) .fetch_one(pool) .await?; // 写入缓存 redis.set_ex(&format!("book:{}", book_id), &serde_json::to_string(&book)?, 3600).await?; Ok(book) } 促销当天,一切看起来都很顺利。直到客服开始收到投诉:“我明明改了价格,为什么页面显示的还是旧的?” 小明查看日志,发现问题了:卖家更新了价格,数据库确实变了,但缓存里还是旧数据。用户看到的是缓存中的过期信息。 数据同时存在于两个地方,它们之间失去了同步——这就是缓存一致性问题。 问题的本质 缓存一致性问题的根源在于:数据库和缓存是两个独立的存储系统,对它们的操作无法在同一个事务中完成。 无论你以什么顺序操作,都可能出问题: 先更新数据库,再更新缓存: 时刻1: 请求A 更新数据库 price=100 时刻2: 请求B 更新数据库 price=200 时刻3: 请求B 更新缓存 price=200 时刻4: 请求A 更新缓存 price=100 // 并发导致缓存最终是旧值! 先更新缓存,再更新数据库: 时刻1: 请求A 更新缓存 price=100 时刻2: 请求A 更新数据库失败,回滚 时刻3: 缓存中 price=100,但数据库还是旧值 // 脏数据! 先删除缓存,再更新数据库: 时刻1: 请求A 删除缓存 时刻2: 请求B 读取,缓存未命中,从数据库读取旧值 时刻3: 请求B 将旧值写入缓存 时刻4: 请求A 更新数据库 // 缓存中又是旧值! 先更新数据库,再删除缓存(最常用): 时刻1: 请求A 读取,缓存未命中 时刻2: 请求A 从数据库读取 price=100 时刻3: 请求B 更新数据库 price=200 时刻4: 请求B 删除缓存 时刻5: 请求A 将 price=100 写入缓存 // 又是旧值! 看起来无论怎么做都不对?别急,让我们看看业界是如何解决的。 方案一:Cache-Aside(旁路缓存) 这是最经典、最广泛使用的方案。核心思想是:读时填充缓存,写时只删除缓存。 为什么"删除"而不是"更新”? 删除缓存的好处是: 避免并发写冲突:删除是幂等的,多次删除和一次删除效果相同 简化逻辑:不需要计算新值,让下一次读请求自然地填充正确的值 避免无效更新:如果数据更新后根本没人读,更新缓存就是浪费 实现 use redis::AsyncCommands; use sqlx::PgPool; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] pub struct Book { pub id: i64, pub title: String, pub price: i64, // 以分为单位 pub stock: i32, pub updated_at: chrono::DateTime<chrono::Utc>, } pub struct BookService { pool: PgPool, redis: redis::Client, } impl BookService { /// 读取书籍信息(Cache-Aside 模式) pub async fn get_book(&self, book_id: i64) -> Result<Book, Error> { let cache_key = format!("book:{}", book_id); let mut conn = self.redis.get_multiplexed_async_connection().await?; // 1. 先查缓存 let cached: Option<String> = conn.get(&cache_key).await?; if let Some(json) = cached { return Ok(serde_json::from_str(&json)?); } // 2. 缓存未命中,查数据库 let book = sqlx::query_as!(Book, "SELECT * FROM books WHERE id = $1", book_id) .fetch_one(&self.pool) .await?; // 3. 填充缓存(设置过期时间作为兜底) let json = serde_json::to_string(&book)?; conn.set_ex(&cache_key, &json, 3600).await?; Ok(book) } /// 更新书籍价格(先更新数据库,再删除缓存) pub async fn update_price(&self, book_id: i64, new_price: i64) -> Result<(), Error> { // 1. 更新数据库 sqlx::query!( "UPDATE books SET price = $1, updated_at = NOW() WHERE id = $2", new_price, book_id ) .execute(&self.pool) .await?; // 2. 删除缓存 let cache_key = format!("book:{}", book_id); let mut conn = self.redis.get_multiplexed_async_connection().await?; conn.del(&cache_key).await?; Ok(()) } } 仍然存在的问题 前面我们分析过,“先更新数据库,再删除缓存"在极端并发下仍可能出现不一致: ...

December 11, 2025 · 9 min · 1915 words · Nanlong

从单机到分布式:一致性实战(二)读写分离:副本一致性

用户刚改完昵称,刷新页面发现还是旧的。这不是 Bug,这是主从延迟。读写分离带来的第一个一致性问题,你准备好了吗? ...

December 11, 2025 · 7 min · 1291 words · Nanlong

从单机到分布式:一致性实战(一)单机时代:ACID 的庇护

当你的系统只有一台机器、一个数据库时,一致性问题几乎不存在。数据库事务是你的保护伞。但这种幸福,注定是短暂的。 ...

December 11, 2025 · 5 min · 885 words · Nanlong

模式不是套路(十一):终篇——反模式与断舍离

“模式不是目的,而是手段。当手段成为目的,代码便开始腐烂。” 走完了这趟设计模式的旅程,我们学会了用 Rust 的特性让模式返璞归真。但还有一个更重要的问题:什么时候不该用模式? 这篇终章,我们来聊聊设计模式的阴暗面——过度设计、模式滥用,以及断舍离的智慧。 过度设计的七宗罪 第一罪:未来幻想症 // ❌ 为"可能的需求"做准备 trait PaymentProcessor { fn process(&self, amount: f64) -> Result<(), Error>; fn refund(&self, transaction_id: &str) -> Result<(), Error>; fn partial_refund(&self, transaction_id: &str, amount: f64) -> Result<(), Error>; fn recurring(&self, schedule: &Schedule) -> Result<(), Error>; fn batch_process(&self, payments: Vec<Payment>) -> Result<(), Error>; fn validate_card(&self, card: &Card) -> Result<bool, Error>; fn tokenize(&self, card: &Card) -> Result<Token, Error>; fn dispute(&self, transaction_id: &str, reason: &str) -> Result<(), Error>; // 还有 20 个方法... } // ✅ 只实现当前需要的 trait PaymentProcessor { fn process(&self, amount: f64) -> Result<(), Error>; } // 需要时再扩展 trait RefundablePayment: PaymentProcessor { fn refund(&self, transaction_id: &str) -> Result<(), Error>; } 症状:接口定义了大量"将来可能用到"的方法,但实现者只用其中两三个。 Rust 的解药:trait 组合。需要什么就定义什么,通过 trait 继承和组合按需扩展。 第二罪:抽象层叠床架屋 // ❌ 抽象套抽象 trait Repository<T> { fn save(&self, entity: T) -> Result<(), Error>; } trait UserRepository: Repository<User> { fn find_by_email(&self, email: &str) -> Result<Option<User>, Error>; } trait UserRepositoryFactory { fn create(&self) -> Box<dyn UserRepository>; } trait UserRepositoryFactoryProvider { fn get_factory(&self) -> Box<dyn UserRepositoryFactory>; } // ✅ 直接了当 struct UserRepository { pool: PgPool, } impl UserRepository { pub fn save(&self, user: &User) -> Result<(), Error> { // 直接实现 } pub fn find_by_email(&self, email: &str) -> Result<Option<User>, Error> { // 直接实现 } } 症状:为了"灵活性"层层包装,但整个系统只有一种实现。 ...

December 10, 2025 · 7 min · 1313 words · Nanlong

模式不是套路(十):规则引擎——让业务人员写代码

促销规则天天变,每次改都要发版?解释器模式帮你构建 DSL,让业务人员自己配置规则。从简单条件到复杂表达式,看规则引擎如何炼成。 ...

December 10, 2025 · 12 min · 2504 words · Nanlong

模式不是套路(九):树形世界——递归结构的驯服术

文件系统、组织架构、商品分类、权限树……树形结构无处不在,但处理起来总是一团乱麻。组合、访问者、迭代器、享元四种模式,让你优雅地驯服递归之兽。 ...

December 10, 2025 · 12 min · 2394 words · Nanlong

模式不是套路(八):框架设计——把控制权还给用户

写框架不是写业务代码——你定义骨架,用户填充细节。模板方法和钩子模式,让框架既有约束力又有灵活性,用户用得舒服,你也好维护。 ...

December 10, 2025 · 12 min · 2433 words · Nanlong