从单机到分布式:一致性实战(六)消息驱动:最终一致性
从这里开始 上一章,小明使用本地消息表解决了跨服务事务问题。随着系统演进,消息队列已经成为服务间通信的核心基础设施。 但新的问题接踵而至。一天早上,运营同学报告了一个奇怪的现象:“有个用户的订单被重复扣款了三次!” 小明查看日志,发现消息确实发送了三次。原因是消息发送后,在收到 broker 确认之前网络超时了,发送端以为失败了就重试。 “那我加个去重逻辑?“小明想。但很快又发现另一个问题:有些消息根本没发出去就丢了。 消息队列不是银弹,它本身也有一致性问题需要解决。 消息投递的三种语义 消息队列的投递保证分为三种: At-Most-Once(最多一次) 消息发送后不等待确认,可能丢失,但不会重复。 // At-Most-Once:发了就不管 async fn send_at_most_once(producer: &KafkaProducer, message: &Message) { // 发送后立即返回,不等待确认 producer.send_fire_and_forget(message).await; // 消息可能丢失,但代码继续执行 } 适用场景:日志收集、监控数据等允许丢失的场景 At-Least-Once(至少一次) 消息发送后等待确认,失败则重试,保证不丢但可能重复。 // At-Least-Once:确认收到才算成功 async fn send_at_least_once( producer: &KafkaProducer, message: &Message, ) -> Result<(), Error> { loop { match producer.send(message).await { Ok(()) => return Ok(()), Err(e) if e.is_retriable() => { tracing::warn!("Send failed, retrying: {}", e); tokio::time::sleep(Duration::from_millis(100)).await; // 继续重试,可能导致重复 } Err(e) => return Err(e), } } } 这是最常用的语义,因为重复可以通过消费端幂等来解决,但丢失无法恢复。 Exactly-Once(恰好一次) 消息既不丢失也不重复,是最理想的状态。 但真正的 Exactly-Once 在分布式系统中几乎不可能实现。所谓的 “Exactly-Once” 实际上是 “At-Least-Once + 幂等消费”。 // "Exactly-Once" = At-Least-Once 发送 + 幂等消费 async fn send_exactly_once( producer: &KafkaProducer, message: &Message, ) -> Result<(), Error> { // 消息必须有唯一 ID let message_id = message.id; // At-Least-Once 发送 producer.send_with_retry(message).await?; Ok(()) } async fn consume_exactly_once( pool: &PgPool, message: &Message, ) -> Result<(), Error> { // 幂等消费(见下文) todo!() } 生产端:确保消息发出 问题:本地事务与消息发送的一致性 // 这段代码有问题 async fn create_order_wrong(pool: &PgPool, producer: &KafkaProducer, order: Order) -> Result<(), Error> { // 1. 写数据库 sqlx::query!("INSERT INTO orders (id, user_id) VALUES ($1, $2)", order.id, order.user_id) .execute(pool) .await?; // 2. 发消息 // 如果这里失败了,订单已创建但消息没发出 producer.send(&OrderCreatedEvent::from(&order)).await?; Ok(()) } 解决方案一:本地消息表(Transactional Outbox) 我们在上一章已经详细介绍过,这里再强调关键点: ...