定时任务怎么实现?Top-K 怎么高效计算?答案都指向同一个数据结构:堆。它是任务调度器、排行榜、事件驱动系统的核心。

堆是什么?

堆是一种特殊的完全二叉树,满足:

最小堆:父节点 <= 所有子节点
最大堆:父节点 >= 所有子节点

最小堆示例:
        1
       / \
      3   2
     / \
    5   4

数组表示:[1, 3, 2, 5, 4]
父节点 i 的子节点:2i+1, 2i+2
子节点 i 的父节点:(i-1)/2

核心操作

操作复杂度说明
peekO(1)查看堆顶(最小/最大元素)
pushO(log n)插入元素
popO(log n)删除堆顶
heapifyO(n)从数组构建堆

场景一:延迟任务

订单 30 分钟未支付自动取消,怎么实现?

朴素方案:定时扫描

// 每分钟扫描一次
async fn scan_expired_orders() {
    loop {
        let now = Utc::now();
        let expired = db
            .query("SELECT * FROM orders WHERE status = 'pending' AND expire_at < ?", now)
            .await;

        for order in expired {
            cancel_order(order.id).await;
        }

        sleep(Duration::from_secs(60)).await;
    }
}

问题

1. 精度差:最多延迟 1 分钟执行
2. 数据库压力:每分钟全表扫描
3. 处理延迟:如果订单多,一次扫描可能处理不完

堆方案:延迟队列

use std::collections::BinaryHeap;
use std::cmp::{Ordering, Reverse};
use chrono::Utc;
use std::time::Duration;

#[derive(Eq, PartialEq)]
struct DelayedTask {
    execute_at: i64,  // 执行时间戳
    order_id: String,
}

impl Ord for DelayedTask {
    fn cmp(&self, other: &Self) -> Ordering {
        // 最小堆:最早执行的在堆顶
        other.execute_at.cmp(&self.execute_at)
    }
}

impl PartialOrd for DelayedTask {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

struct DelayQueue {
    heap: BinaryHeap<DelayedTask>,
}

impl DelayQueue {
    fn new() -> Self {
        Self {
            heap: BinaryHeap::new(),
        }
    }

    fn push(&mut self, task: DelayedTask) {
        self.heap.push(task);
    }

    fn pop_ready(&mut self) -> Vec<DelayedTask> {
        let now = Utc::now().timestamp();
        let mut ready = Vec::new();

        while let Some(task) = self.heap.peek() {
            if task.execute_at <= now {
                ready.push(self.heap.pop().unwrap());
            } else {
                break;
            }
        }

        ready
    }

    fn next_delay(&self) -> Option<Duration> {
        self.heap.peek().map(|task| {
            let now = Utc::now().timestamp();
            let wait = (task.execute_at - now).max(0) as u64;
            Duration::from_secs(wait)
        })
    }
}

使用方式

use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::sleep;

async fn delay_queue_worker(queue: Arc<Mutex<DelayQueue>>) {
    loop {
        let delay = {
            let q = queue.lock().await;
            q.next_delay()
        };

        // 等待到下一个任务的执行时间
        if let Some(d) = delay {
            sleep(d).await;
        } else {
            sleep(Duration::from_secs(1)).await;
            continue;
        }

        // 处理到期任务
        let tasks = {
            let mut q = queue.lock().await;
            q.pop_ready()
        };

        for task in tasks {
            cancel_order(&task.order_id).await;
        }
    }
}

优势

指标定时扫描延迟队列
精度分钟级毫秒级
CPU持续消耗按需唤醒
数据库每分钟扫描无查询
复杂度O(n)O(log n)

场景二:Top-K 问题

找出访问量最高的 100 个 URL,数据量 10 亿。

朴素方案:全排序

fn top_k_naive(urls: &[(String, u64)], k: usize) -> Vec<(String, u64)> {
    let mut sorted = urls.to_vec();
    sorted.sort_by(|a, b| b.1.cmp(&a.1));  // 按访问量降序
    sorted.truncate(k);
    sorted
}

问题:O(n log n),10 亿数据要排序几分钟。

堆方案:最小堆

思路:
  维护一个大小为 K 的最小堆
  遍历所有元素,如果比堆顶大,替换堆顶
  最后堆里就是 Top-K

为什么用最小堆?
  堆顶是 K 个元素中最小的
  新元素比堆顶大 → 应该进入 Top-K → 替换堆顶
  新元素比堆顶小 → 不够格 → 跳过
use std::collections::BinaryHeap;
use std::cmp::Reverse;

fn top_k_heap(urls: impl Iterator<Item = (String, u64)>, k: usize) -> Vec<(String, u64)> {
    // 最小堆(用 Reverse 包装)
    let mut heap: BinaryHeap<Reverse<(u64, String)>> = BinaryHeap::with_capacity(k + 1);

    for (url, count) in urls {
        if heap.len() < k {
            heap.push(Reverse((count, url)));
        } else if let Some(Reverse((min_count, _))) = heap.peek() {
            if count > *min_count {
                heap.pop();
                heap.push(Reverse((count, url)));
            }
        }
    }

    heap.into_iter()
        .map(|Reverse((count, url))| (url, count))
        .collect()
}

复杂度

方案时间空间
全排序O(n log n)O(n)
最小堆O(n log k)O(k)

K=100,n=10 亿:

全排序:10 亿 × log(10 亿) ≈ 300 亿次操作
最小堆:10 亿 × log(100) ≈ 70 亿次操作

快 4 倍,且内存只需要存 100 个元素

流式 Top-K

数据持续到来,实时更新 Top-K:

use std::collections::{BinaryHeap, HashMap};
use std::cmp::Reverse;

struct StreamingTopK {
    heap: BinaryHeap<Reverse<(u64, String)>>,
    k: usize,
    // 用于快速查找元素是否在堆中
    in_heap: HashMap<String, u64>,
}

impl StreamingTopK {
    fn new(k: usize) -> Self {
        Self {
            heap: BinaryHeap::with_capacity(k + 1),
            k,
            in_heap: HashMap::new(),
        }
    }

    fn rebuild_heap(&mut self) {
        let items: Vec<_> = self.in_heap.iter()
            .map(|(url, &count)| Reverse((count, url.clone())))
            .collect();
        self.heap = BinaryHeap::from(items);
    }

    fn update(&mut self, url: String, count: u64) {
        // 如果已在堆中,需要更新
        if let Some(&old_count) = self.in_heap.get(&url) {
            if count > old_count {
                // 需要重建堆(或使用支持更新的数据结构)
                self.in_heap.insert(url.clone(), count);
                self.rebuild_heap();
            }
            return;
        }

        // 不在堆中
        if self.heap.len() < self.k {
            self.heap.push(Reverse((count, url.clone())));
            self.in_heap.insert(url, count);
        } else if let Some(Reverse((min_count, _))) = self.heap.peek() {
            if count > *min_count {
                let Reverse((_, old_url)) = self.heap.pop().unwrap();
                self.in_heap.remove(&old_url);
                self.heap.push(Reverse((count, url.clone())));
                self.in_heap.insert(url, count);
            }
        }
    }
}

场景三:合并有序流

K 个有序的数据流,合并成一个有序流。

场景:
  - 多路归并排序
  - 合并多个数据库的查询结果
  - 日志聚合(多个 Pod 的日志按时间排序)
use std::collections::BinaryHeap;
use std::cmp::Reverse;

struct MergeIterator<I> {
    heap: BinaryHeap<Reverse<(i64, usize)>>,  // (值, 流索引)
    streams: Vec<I>,
}

impl<I: Iterator<Item = i64>> MergeIterator<I> {
    fn new(streams: Vec<I>) -> Self {
        let mut iter = Self {
            heap: BinaryHeap::new(),
            streams,
        };
        // 初始化:从每个流取第一个元素
        for (idx, stream) in iter.streams.iter_mut().enumerate() {
            if let Some(value) = stream.next() {
                iter.heap.push(Reverse((value, idx)));
            }
        }
        iter
    }
}

impl<I: Iterator<Item = i64>> Iterator for MergeIterator<I> {
    type Item = i64;

    fn next(&mut self) -> Option<i64> {
        // 取出最小的
        let Reverse((value, stream_idx)) = self.heap.pop()?;

        // 从该流补充下一个元素
        if let Some(next_value) = self.streams[stream_idx].next() {
            self.heap.push(Reverse((next_value, stream_idx)));
        }

        Some(value)
    }
}

复杂度:每次取最小值 O(log k),总共 n 个元素,O(n log k)。

场景四:事件驱动调度

Tokio 的定时器就是用堆实现的:

Timer Wheel 简化版:

1. 维护一个最小堆,存储所有定时器
2. 事件循环:
   a. 查看堆顶的到期时间
   b. epoll_wait 等待到那个时间(或有 IO 事件)
   c. 处理到期的定时器
   d. 重复
use std::collections::BinaryHeap;
use std::cmp::Reverse;
use std::time::{Duration, Instant};

type TaskId = u64;

struct TimerHeap {
    heap: BinaryHeap<Reverse<(Instant, TaskId)>>,
}

impl TimerHeap {
    fn new() -> Self {
        Self {
            heap: BinaryHeap::new(),
        }
    }

    fn schedule(&mut self, task_id: TaskId, deadline: Instant) {
        self.heap.push(Reverse((deadline, task_id)));
    }

    fn next_timeout(&self) -> Option<Duration> {
        self.heap.peek().map(|Reverse((deadline, _))| {
            deadline.saturating_duration_since(Instant::now())
        })
    }

    fn poll_expired(&mut self) -> Vec<TaskId> {
        let now = Instant::now();
        let mut expired = Vec::new();

        while let Some(Reverse((deadline, _))) = self.heap.peek() {
            if *deadline <= now {
                let Reverse((_, task_id)) = self.heap.pop().unwrap();
                expired.push(task_id);
            } else {
                break;
            }
        }

        expired
    }
}

堆的变体

1. 索引堆

支持根据 key 更新优先级:

use std::collections::HashMap;
use std::hash::Hash;

struct IndexedHeap<K, V> {
    heap: Vec<(K, V)>,
    index: HashMap<K, usize>,  // key → 在堆中的位置
}

impl<K: Hash + Eq + Clone, V: Ord> IndexedHeap<K, V> {
    fn new() -> Self {
        Self {
            heap: Vec::new(),
            index: HashMap::new(),
        }
    }

    fn push(&mut self, key: K, value: V) {
        let pos = self.heap.len();
        self.index.insert(key.clone(), pos);
        self.heap.push((key, value));
        self.sift_up(pos);
    }

    fn update(&mut self, key: K, new_value: V) {
        if let Some(&pos) = self.index.get(&key) {
            let old_value = &self.heap[pos].1;
            if new_value > *old_value {
                self.heap[pos].1 = new_value;
                self.sift_up(pos);
            } else {
                self.heap[pos].1 = new_value;
                self.sift_down(pos);
            }
        } else {
            self.push(key, new_value);
        }
    }

    fn sift_up(&mut self, mut pos: usize) {
        while pos > 0 {
            let parent = (pos - 1) / 2;
            if self.heap[pos].1 > self.heap[parent].1 {
                self.index.insert(self.heap[parent].0.clone(), pos);
                self.index.insert(self.heap[pos].0.clone(), parent);
                self.heap.swap(pos, parent);
                pos = parent;
            } else {
                break;
            }
        }
    }

    fn sift_down(&mut self, mut pos: usize) {
        let len = self.heap.len();
        loop {
            let left = 2 * pos + 1;
            let right = 2 * pos + 2;
            let mut largest = pos;

            if left < len && self.heap[left].1 > self.heap[largest].1 {
                largest = left;
            }
            if right < len && self.heap[right].1 > self.heap[largest].1 {
                largest = right;
            }

            if largest != pos {
                self.index.insert(self.heap[largest].0.clone(), pos);
                self.index.insert(self.heap[pos].0.clone(), largest);
                self.heap.swap(pos, largest);
                pos = largest;
            } else {
                break;
            }
        }
    }
}

适用场景:Dijkstra 最短路径、动态优先级调度。

2. 双端堆

同时支持取最大和最小:

// 可以用两个堆模拟
struct MinMaxHeap<T> {
    min_heap: BinaryHeap<Reverse<T>>,
    max_heap: BinaryHeap<T>,
    // 需要处理两个堆的同步...
}

实际应用:滑动窗口中位数。

3. 斐波那契堆

复杂度对比:
           | 二叉堆    | 斐波那契堆
push       | O(log n)  | O(1) 均摊
peek       | O(1)      | O(1)
pop        | O(log n)  | O(log n) 均摊
decrease   | O(log n)  | O(1) 均摊

理论上更优,但实际中常数因子大,很少使用

实战:优先级任务队列

use std::collections::BinaryHeap;
use std::cmp::Ordering;
use chrono::Utc;

#[derive(Eq, PartialEq)]
struct Task {
    priority: u8,      // 0 = 最高优先级
    created_at: i64,   // 创建时间
    id: String,
    payload: String,
}

impl Ord for Task {
    fn cmp(&self, other: &Self) -> Ordering {
        // 先比较优先级(小的优先)
        // 优先级相同,比较创建时间(早的优先)
        match self.priority.cmp(&other.priority) {
            Ordering::Equal => other.created_at.cmp(&self.created_at),
            ord => ord.reverse(),  // BinaryHeap 是最大堆,所以反转
        }
    }
}

impl PartialOrd for Task {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

struct PriorityTaskQueue {
    heap: BinaryHeap<Task>,
}

impl PriorityTaskQueue {
    fn new() -> Self {
        Self {
            heap: BinaryHeap::new(),
        }
    }

    fn enqueue(&mut self, priority: u8, payload: String) -> String {
        let id = uuid::Uuid::new_v4().to_string();
        let task = Task {
            priority,
            created_at: Utc::now().timestamp(),
            id: id.clone(),
            payload,
        };
        self.heap.push(task);
        id
    }

    fn dequeue(&mut self) -> Option<Task> {
        self.heap.pop()
    }
}

常见陷阱

1. BinaryHeap 是最大堆

// Rust 的 BinaryHeap 默认是最大堆
let mut heap = BinaryHeap::new();
heap.push(3);
heap.push(1);
heap.push(2);
assert_eq!(heap.pop(), Some(3));  // 最大的先出

// 要最小堆,用 Reverse 包装
use std::cmp::Reverse;
let mut min_heap = BinaryHeap::new();
min_heap.push(Reverse(3));
min_heap.push(Reverse(1));
assert_eq!(min_heap.pop(), Some(Reverse(1)));  // 最小的先出

2. 自定义类型要实现 Ord

#[derive(Eq, PartialEq)]
struct Item {
    value: i32,
}

// 错误:没有实现 Ord,不能放入 BinaryHeap
// let mut heap: BinaryHeap<Item> = BinaryHeap::new();

// 正确:实现 Ord
impl Ord for Item {
    fn cmp(&self, other: &Self) -> Ordering {
        self.value.cmp(&other.value)
    }
}

impl PartialOrd for Item {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

3. 堆不支持随机删除

// 标准 BinaryHeap 不支持删除任意元素
// 需要懒删除:标记为删除,pop 时跳过

use std::collections::{BinaryHeap, HashSet};
use std::hash::Hash;

struct LazyHeap<T: Eq + Hash> {
    heap: BinaryHeap<T>,
    deleted: HashSet<T>,
}

impl<T: Ord + Eq + Hash + Clone> LazyHeap<T> {
    fn new() -> Self {
        Self {
            heap: BinaryHeap::new(),
            deleted: HashSet::new(),
        }
    }

    fn push(&mut self, item: T) {
        self.heap.push(item);
    }

    fn remove(&mut self, item: &T) {
        self.deleted.insert(item.clone());
    }

    fn pop(&mut self) -> Option<T> {
        while let Some(item) = self.heap.pop() {
            if !self.deleted.remove(&item) {
                return Some(item);
            }
        }
        None
    }
}

总结

场景堆的作用复杂度
延迟任务按执行时间排序O(log n) 插入/删除
Top-K维护最小的 K 个O(n log k)
多路归并取 K 路中最小O(n log k)
优先级队列按优先级出队O(log n)
定时器找最近到期的O(1) 查看,O(log n) 删除

核心认知

堆的本质是"高效找极值"。任何需要反复获取最大/最小元素的场景,都应该想到堆。它不能做排序(那是排序算法的事),但能高效维护动态数据的极值。


上一篇:一致性哈希:分布式系统的路由表

下一篇:限流算法:保护系统的三道防线

本系列:

  1. 算法是业务武器
  2. 排序与二分:被低估的基础功
  3. 哈希表:缓存设计的基石
  4. 布隆过滤器:用 1% 误判换 90% 内存
  5. 一致性哈希:分布式系统的路由表
  6. 堆与优先队列:调度器的核心(本篇)
  7. 限流算法:保护系统的三道防线
  8. 跳表:Redis 排行榜的秘密
  9. 树结构:层级数据的优雅解法
  10. 字符串匹配:敏感词过滤的正确姿势
  11. 实战选型:没有银弹,只有场景