用户刚改完昵称,刷新页面发现还是旧的。这不是 Bug,这是主从延迟。读写分离带来的第一个一致性问题,你准备好了吗?

前情回顾

上一篇我们看到,小明的二手书平台在单机时代过得很幸福。一个 PostgreSQL 实例,一个事务搞定一切。

但好景不长。日活破万后,数据库 CPU 飙到 100%。小明不得不做出改变。

读写分离:看起来很美

小明的第一反应是:加机器。

分析了一下业务,发现 读请求是写请求的 10 倍。用户大部分时间在浏览书籍、查看订单,只有少数时间在下单、修改资料。

于是小明做了一个经典的架构升级:读写分离

┌─────────────────────────────────────────────────────────────────┐
│                           用户请求                               │
└─────────────────────────────┬───────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│                         Rust + Axum                              │
│                                                                  │
│              写请求                        读请求                 │
│                │                             │                   │
└────────────────┼─────────────────────────────┼───────────────────┘
                 │                             │
                 ▼                             ▼
┌────────────────────────┐        ┌────────────────────────┐
│      PostgreSQL        │        │      PostgreSQL        │
│        主库            │ ─────→ │        从库            │
│       (写入)           │  复制   │       (只读)           │
└────────────────────────┘        └────────────────────────┘

看起来很完美

  • 写请求走主库,保证数据安全
  • 读请求走从库,分担压力
  • 主库的数据自动同步到从库

小明部署上线,CPU 立刻降到 30%。问题解决了?

Bug 还是 Feature?

上线第二天,客服收到投诉:

“我刚改了昵称,刷新页面还是旧的!你们系统有 Bug 吧?”

小明查了半天代码,没找到问题。数据库里的昵称明明是新的啊?

然后他意识到了:用户的写请求走了主库,但刷新页面的读请求走了从库。

主从复制有延迟。可能是 10 毫秒,可能是 1 秒,极端情况下可能是几秒。

时间线:
───────────────────────────────────────────────────────────────→

T0: 用户修改昵称
T1: 主库更新成功,返回"修改成功"
    │ ← 主从复制中...(延迟 500ms)
T2: 用户刷新页面,从从库读取
T3: 返回旧昵称!(从库还没同步完)

这不是 Bug,这是分布式系统的常态。

核心问题:主从复制是异步的,写入主库后,数据不会立即出现在从库。在这个时间窗口内,读从库会得到旧数据。

方案全景:五种选择

面对主从延迟,有五种解决方案:

┌─────────────────────────────────────────────────────────────────┐
│                     读写分离一致性方案                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  方案1:强制读主          → 最简单,但失去读写分离的意义         │
│  方案2:写后读主(时间窗口)→ 平衡一致性和性能                   │
│  方案3:写后读主(会话级) → 同一请求内保证一致                  │
│  方案4:同步复制          → 数据库层面保证,但牺牲写性能         │
│  方案5:业务层容忍        → 零成本,但用户体验差                 │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

让我们逐一分析。

方案一:强制读主

最简单粗暴的方案:所有读请求都走主库。

pub struct PrimaryOnlyRepo {
    primary: PgPool,
    _replica: PgPool,  // 从库闲置
}

impl PrimaryOnlyRepo {
    pub async fn get_user(&self, user_id: i64) -> Result<User> {
        // 直接读主库,保证强一致
        sqlx::query_as!(
            User,
            r#"SELECT * FROM users WHERE id = $1"#,
            user_id
        )
        .fetch_one(&self.primary)
        .await
        .map_err(Into::into)
    }
}

优点

  • 实现最简单
  • 一致性最强
  • 不用担心主从延迟

缺点

  • 主库压力没有减轻
  • 读写分离形同虚设
  • 浪费了从库资源

适用场景:读写比接近 1:1,或者主库性能足够。

结论:这不是解决方案,这是放弃治疗。

方案二:写后读主(时间窗口)

核心思想:用户写入后的一段时间内,强制从主库读取。

use dashmap::DashMap;
use std::time::{Duration, Instant};

pub struct TimeWindowRouter {
    primary: PgPool,
    replica: PgPool,
    // 记录最近写入的 key 和时间
    recent_writes: DashMap<String, Instant>,
    // 写后强制读主的时间窗口
    window: Duration,
}

impl TimeWindowRouter {
    pub fn new(primary: PgPool, replica: PgPool, window_secs: u64) -> Self {
        Self {
            primary,
            replica,
            recent_writes: DashMap::new(),
            window: Duration::from_secs(window_secs),
        }
    }

    /// 标记写入
    pub fn mark_write(&self, key: &str) {
        self.recent_writes.insert(key.to_string(), Instant::now());
    }

    /// 获取读连接:判断走主还是从
    pub fn read_pool(&self, key: &str) -> &PgPool {
        if let Some(write_time) = self.recent_writes.get(key) {
            if write_time.elapsed() < self.window {
                return &self.primary;  // 刚写过,读主库
            }
        }
        &self.replica  // 否则读从库
    }

    /// 定期清理过期记录
    pub fn cleanup(&self) {
        self.recent_writes.retain(|_, v| v.elapsed() < self.window * 2);
    }
}

使用示例:

pub async fn update_user_nickname(
    router: &TimeWindowRouter,
    user_id: i64,
    nickname: &str,
) -> Result<()> {
    let key = format!("user:{}", user_id);

    // 写入主库
    sqlx::query!(
        r#"UPDATE users SET nickname = $1 WHERE id = $2"#,
        nickname,
        user_id
    )
    .execute(&router.primary)
    .await?;

    // 标记刚写入
    router.mark_write(&key);

    Ok(())
}

pub async fn get_user(router: &TimeWindowRouter, user_id: i64) -> Result<User> {
    let key = format!("user:{}", user_id);

    // 自动判断走主还是从
    let pool = router.read_pool(&key);

    sqlx::query_as!(
        User,
        r#"SELECT * FROM users WHERE id = $1"#,
        user_id
    )
    .fetch_one(pool)
    .await
    .map_err(Into::into)
}

优点

  • 兼顾一致性和性能
  • 大部分读请求仍然走从库
  • 只有刚写入的数据走主库

缺点

  • 需要维护写入记录
  • 时间窗口难以精确设置
  • 多实例部署时需要共享状态(如 Redis)

时间窗口设置建议

主从延迟建议窗口
< 100ms1-2 秒
100ms - 1s3-5 秒
> 1s考虑同步复制

适用场景:主从延迟可控,需要平衡一致性和性能。

方案三:写后读主(会话级)

核心思想:同一个 HTTP 请求内,写过的数据从主库读。

use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;

/// 会话级上下文:追踪当前请求内写过的表
#[derive(Default)]
pub struct SessionContext {
    wrote_tables: RwLock<HashSet<String>>,
}

impl SessionContext {
    pub async fn mark_write(&self, table: &str) {
        self.wrote_tables.write().await.insert(table.to_string());
    }

    pub async fn should_read_primary(&self, table: &str) -> bool {
        self.wrote_tables.read().await.contains(table)
    }
}

结合 Axum 中间件使用:

use axum::{
    extract::Extension,
    middleware::{self, Next},
    response::Response,
};

/// 中间件:每个请求创建新的 SessionContext
pub async fn session_middleware<B>(
    mut req: axum::http::Request<B>,
    next: Next<B>,
) -> Response {
    req.extensions_mut()
        .insert(Arc::new(SessionContext::default()));
    next.run(req).await
}

/// Handler 示例
pub async fn update_and_get_user(
    Extension(ctx): Extension<Arc<SessionContext>>,
    Extension(primary): Extension<PgPool>,
    Extension(replica): Extension<PgPool>,
    user_id: i64,
    nickname: String,
) -> Result<User> {
    // 写入主库
    sqlx::query!(
        r#"UPDATE users SET nickname = $1 WHERE id = $2"#,
        nickname,
        user_id
    )
    .execute(&primary)
    .await?;

    // 标记写入了 users 表
    ctx.mark_write("users").await;

    // 读取时判断
    let pool = if ctx.should_read_primary("users").await {
        &primary
    } else {
        &replica
    };

    sqlx::query_as!(
        User,
        r#"SELECT * FROM users WHERE id = $1"#,
        user_id
    )
    .fetch_one(pool)
    .await
    .map_err(Into::into)
}

优点

  • 精确控制,只影响当前请求
  • 不需要持久化写入记录
  • 请求结束自动清理

缺点

  • 只解决同一请求内的问题
  • 用户刷新页面(新请求)仍可能读到旧数据
  • 需要改造所有数据访问代码

适用场景:单次请求内有"写后读"的场景,如更新后返回最新数据。

方案四:同步复制

核心思想:让数据库保证写入主库后,从库也同步完成。

这是 PostgreSQL 的原生能力:

-- postgresql.conf 配置
-- 同步复制:至少等待一个从库确认
synchronous_commit = on
synchronous_standby_names = 'replica1'

在 Rust 中,你不需要改任何代码:

// 当 synchronous_commit = on 时
// 这条写入会等待从库确认后才返回
sqlx::query!(
    r#"UPDATE users SET nickname = $1 WHERE id = $2"#,
    nickname,
    user_id
)
.execute(pool)
.await?;

// 此时从库已经有最新数据了

对于不重要的写入,可以临时关闭同步等待

pub async fn log_user_action(pool: &PgPool, user_id: i64, action: &str) -> Result<()> {
    // 本次事务不等待同步复制
    sqlx::query("SET LOCAL synchronous_commit = off")
        .execute(pool)
        .await?;

    sqlx::query!(
        r#"INSERT INTO user_logs (user_id, action, created_at) VALUES ($1, $2, NOW())"#,
        user_id,
        action
    )
    .execute(pool)
    .await?;

    Ok(())
}

优点

  • 应用层最简单,无需改代码
  • 强一致,从库保证有最新数据
  • 数据库原生支持,稳定可靠

缺点

  • 写入延迟增加(要等从库确认)
  • 从库故障会影响主库写入
  • 不适合写密集场景

同步模式对比

模式配置一致性性能可用性
异步复制synchronous_commit = off
本地提交synchronous_commit = local
同步复制synchronous_commit = on
远程写入synchronous_commit = remote_write较强

适用场景:金融交易、对一致性要求极高的场景。

方案五:业务层容忍

最简单的方案:什么都不做,在 UI 上告诉用户。

pub async fn update_user_nickname(
    pool: &PgPool,
    user_id: i64,
    nickname: &str,
) -> Result<UpdateResponse> {
    sqlx::query!(
        r#"UPDATE users SET nickname = $1 WHERE id = $2"#,
        nickname,
        user_id
    )
    .execute(pool)
    .await?;

    Ok(UpdateResponse {
        success: true,
        message: "修改成功!数据同步可能需要几秒钟。".to_string(),
    })
}

优点

  • 零开发成本
  • 不影响系统性能
  • 符合最终一致性的哲学

缺点

  • 用户体验差
  • 只适合非关键业务
  • 可能引起用户困惑

适用场景:非关键业务,如浏览历史、阅读记录。

方案对比与选型

方案一致性性能影响实现复杂度适用场景
强制读主不推荐
写后读主(时间窗口)较强通用推荐
写后读主(会话级)较强单请求内读写
同步复制金融场景
业务层容忍非关键业务

决策流程

                    ┌─────────────────────┐
                    │   一致性要求高吗?   │
                    └──────────┬──────────┘
              ┌────────────────┴────────────────┐
              │                                  │
              ▼                                  ▼
         ┌────────┐                        ┌────────┐
         │   是   │                        │   否   │
         └────┬───┘                        └────┬───┘
              │                                  │
              ▼                                  ▼
    ┌─────────────────────┐            ┌─────────────────────┐
    │  DBA 能配置同步复制? │            │    业务层容忍       │
    └──────────┬──────────┘            │  (提示用户延迟)    │
               │                        └─────────────────────┘
    ┌──────────┴──────────┐
    │                      │
    ▼                      ▼
┌────────┐           ┌────────┐
│  能    │           │  不能  │
└────┬───┘           └────┬───┘
     │                     │
     ▼                     ▼
┌─────────────┐    ┌──────────────────┐
│  同步复制   │    │  写后读主         │
│  (推荐)   │    │  时间窗口 3-5 秒  │
└─────────────┘    └──────────────────┘

完整实现:多实例友好的写后读主

在生产环境中,你的服务通常有多个实例。写入记录需要共享,这时候用 Redis:

use redis::AsyncCommands;

pub struct RedisBackedRouter {
    primary: PgPool,
    replica: PgPool,
    redis: redis::Client,
    window_secs: u64,
}

impl RedisBackedRouter {
    pub async fn mark_write(&self, key: &str) -> Result<()> {
        let mut conn = self.redis.get_multiplexed_async_connection().await?;
        let redis_key = format!("write_marker:{}", key);

        // 设置标记,自动过期
        conn.set_ex::<_, _, ()>(&redis_key, "1", self.window_secs).await?;

        Ok(())
    }

    pub async fn read_pool(&self, key: &str) -> Result<&PgPool> {
        let mut conn = self.redis.get_multiplexed_async_connection().await?;
        let redis_key = format!("write_marker:{}", key);

        let exists: bool = conn.exists(&redis_key).await?;

        if exists {
            Ok(&self.primary)
        } else {
            Ok(&self.replica)
        }
    }
}

使用示例

pub struct UserService {
    router: RedisBackedRouter,
}

impl UserService {
    pub async fn update_nickname(&self, user_id: i64, nickname: &str) -> Result<()> {
        let key = format!("user:{}", user_id);

        sqlx::query!(
            r#"UPDATE users SET nickname = $1 WHERE id = $2"#,
            nickname,
            user_id
        )
        .execute(&self.router.primary)
        .await?;

        self.router.mark_write(&key).await?;

        Ok(())
    }

    pub async fn get_user(&self, user_id: i64) -> Result<User> {
        let key = format!("user:{}", user_id);
        let pool = self.router.read_pool(&key).await?;

        sqlx::query_as!(
            User,
            r#"SELECT * FROM users WHERE id = $1"#,
            user_id
        )
        .fetch_one(pool)
        .await
        .map_err(Into::into)
    }
}

监控主从延迟

无论选择哪种方案,监控主从延迟都是必须的:

use prometheus::{Histogram, register_histogram};

lazy_static::lazy_static! {
    static ref REPLICATION_LAG: Histogram = register_histogram!(
        "pg_replication_lag_seconds",
        "PostgreSQL replication lag in seconds"
    ).unwrap();
}

pub async fn check_replication_lag(primary: &PgPool, replica: &PgPool) -> Result<f64> {
    // 方法:写入时间戳到主库,从从库读取,计算差值
    let now = chrono::Utc::now();

    sqlx::query!(
        r#"INSERT INTO heartbeat (id, ts) VALUES (1, $1)
           ON CONFLICT (id) DO UPDATE SET ts = $1"#,
        now
    )
    .execute(primary)
    .await?;

    // 等待一小段时间让复制有机会完成
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;

    let replica_ts: chrono::DateTime<chrono::Utc> = sqlx::query_scalar!(
        r#"SELECT ts FROM heartbeat WHERE id = 1"#
    )
    .fetch_one(replica)
    .await?
    .ok_or_else(|| anyhow::anyhow!("Heartbeat timestamp not found"))?;

    let lag = (now - replica_ts).num_milliseconds() as f64 / 1000.0;

    REPLICATION_LAG.observe(lag);

    Ok(lag)
}

告警阈值建议

延迟级别动作
< 100ms正常
100ms - 1s警告观察
> 1s严重检查从库状态
> 5s紧急考虑切换或降级

常见问题

Q:如果从库挂了怎么办?

A:自动 fallback 到主库。

impl RedisBackedRouter {
    pub async fn read_pool_with_fallback(&self, key: &str) -> &PgPool {
        // 先尝试从从库读
        match self.check_replica_health().await {
            Ok(true) => {
                // 从库健康,走正常逻辑
                self.read_pool(key).await.unwrap_or(&self.primary)
            }
            _ => {
                // 从库不健康,fallback 到主库
                &self.primary
            }
        }
    }

    async fn check_replica_health(&self) -> Result<bool> {
        let result = sqlx::query("SELECT 1")
            .execute(&self.replica)
            .await;

        Ok(result.is_ok())
    }
}

Q:多个从库怎么负载均衡?

A:用连接池或中间件。

// 简单的轮询
pub struct MultiReplicaRouter {
    primary: PgPool,
    replicas: Vec<PgPool>,
    counter: AtomicUsize,
}

impl MultiReplicaRouter {
    pub fn next_replica(&self) -> &PgPool {
        let idx = self.counter.fetch_add(1, Ordering::Relaxed) % self.replicas.len();
        &self.replicas[idx]
    }
}

Q:写后读主会不会增加主库压力?

A:会,但通常可控。

假设写请求占 10%(读占 90%),时间窗口 3 秒,平均请求间隔 10 秒:

  • 用户写入后 3 秒内的读请求会走主库
  • 假设 30% 的读请求命中这个窗口
  • 额外主库负载:90% × 30% ≈ 27% 的请求从从库转到主库

实际场景下这个数字会更低,因为:

  1. 大部分用户只是浏览,不写入
  2. 写入后用户不一定立即刷新
  3. 可以只对关键业务(如用户资料)启用写后读主,其他业务仍走从库

小明的选择

经过权衡,小明选择了方案二:写后读主(时间窗口)

  • 时间窗口设为 3 秒
  • 用 Redis 存储写入标记
  • 监控主从延迟,延迟超过 1 秒告警

用户投诉没有了,主库 CPU 保持在 40%,从库分担了大部分读压力。

但新的问题来了。

小明发现,书籍列表页面仍然很慢。每次请求都要查数据库,即使数据根本没变。

“要不要加个缓存?”

小明陷入了思考。他不知道的是,缓存会带来另一个一致性问题——缓存与数据库不一致

下一篇,我们来聊这个坑。


上一篇:单机时代:ACID 的庇护 下一篇:引入缓存:多存储一致性

本系列:

  1. 单机时代:ACID 的庇护
  2. 读写分离:副本一致性(本篇)
  3. 引入缓存:多存储一致性
  4. 数据分片:跨分片事务一致性
  5. 服务拆分:跨服务事务一致性
  6. 消息驱动:最终一致性
  7. 总结:演进全景与选型指南