Tokio:应该使用哪种 mutex?
编辑
              
              177
            
            
          2025-03-15
          基本概念
互斥锁(Mutex)是一种同步原语,用于保护共享数据,确保在任何时刻只有一个线程可以访问这些数据。
两种互斥锁的主要区别
1. 阻塞行为
- 
标准库互斥锁( std::sync::Mutex):- 当你调用 .lock()时,如果锁已被占用,当前线程会阻塞直到获得锁
- 阻塞意味着线程会暂停执行,操作系统会将 CPU 时间分配给其他线程
- 这会阻塞整个线程,包括该线程上的所有其他任务
 
- 当你调用 
- 
异步互斥锁( tokio::sync::Mutex):- 当你调用 .lock().await时,如果锁已被占用,当前任务会挂起而不是阻塞
- 挂起意味着当前任务暂停执行,但线程可以继续执行其他任务
- 这只会挂起当前异步任务,不会阻塞整个线程
 
- 当你调用 
2. 跨越.await 点
- 
标准库互斥锁: - 不能安全地跨越 .await点持有锁
- 如果你在持有锁的同时执行 .await操作,可能导致死锁
 
- 不能安全地跨越 
- 
异步互斥锁: - 专门设计为可以跨越 .await点持有锁
- 允许你在持有锁的同时执行其他异步操作
 
- 专门设计为可以跨越 
3. 性能开销
- 
标准库互斥锁: - 通常性能开销较小
- 适合短时间持有的场景
 
- 
异步互斥锁: - 由于支持异步操作,性能开销较大
- 适合需要在持有锁的同时执行异步操作的场景
 
示例说明
示例 1:标准库互斥锁的基本使用
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
    // 创建一个被Arc包装的Mutex
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    for _ in 0..10 {
        // 克隆Arc以在多个线程间共享
        let counter = Arc::clone(&counter);
        
        // 创建新线程
        let handle = thread::spawn(move || {
            // 获取锁 - 如果锁被占用,这里会阻塞线程
            let mut num = counter.lock().unwrap();
            
            // 修改受保护的数据
            *num += 1;
            
            // 锁在这里自动释放(当num离开作用域时)
        });
        
        handles.push(handle);
    }
    // 等待所有线程完成
    for handle in handles {
        handle.join().unwrap();
    }
    // 打印最终结果
    println!("最终计数: {}", *counter.lock().unwrap());
}
示例 2:异步互斥锁的基本使用
use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
    // 创建一个被Arc包装的异步Mutex
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    for _ in 0..10 {
        // 克隆Arc以在多个任务间共享
        let counter = Arc::clone(&counter);
        
        // 创建新的异步任务
        let handle = tokio::spawn(async move {
            // 获取锁 - 如果锁被占用,这里会挂起当前任务,但不会阻塞线程
            let mut num = counter.lock().await;
            
            // 修改受保护的数据
            *num += 1;
            
            // 锁在这里自动释放(当num离开作用域时)
        });
        
        handles.push(handle);
    }
    // 等待所有任务完成
    for handle in handles {
        handle.await.unwrap();
    }
    // 打印最终结果
    println!("最终计数: {}", *counter.lock().await);
}
示例 3:展示跨越.await 点的区别
// 使用标准库Mutex - 可能导致死锁!
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
    let data = Arc::new(Mutex::new(0));
    
    tokio::spawn(async move {
        // 获取锁
        let mut lock = data.lock().unwrap();
        
        // 危险! 在持有锁的同时执行异步操作
        // 如果另一个任务也尝试获取这个锁并在此之前获得了执行权
        // 那么当前任务恢复执行时可能无法继续(死锁)
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        
        // 修改数据
        *lock += 1;
    });
}
// 使用异步Mutex - 安全地跨越.await点
use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
    let data = Arc::new(Mutex::new(0));
    
    tokio::spawn(async move {
        // 获取锁
        let mut lock = data.lock().await;
        
        // 安全! 异步Mutex设计为可以跨越.await点
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        
        // 修改数据
        *lock += 1;
    });
}
何时使用哪种互斥锁
使用标准库互斥锁的场景:
- 保护纯内存数据(非 IO 资源)
- 锁持有时间短
- 不需要在持有锁时执行异步操作
- 对性能要求高
使用异步互斥锁的场景:
- 需要在持有锁的同时执行异步操作
- 保护 IO 资源(如数据库连接)
- 锁可能被长时间持有
- 不希望阻塞整个线程
最佳实践
- 
默认选择标准库互斥锁:除非有特殊需求,优先使用标准库互斥锁,因为它性能更好。 
- 
包装模式:将互斥锁包装在结构体中,提供方法来操作内部数据: 
struct Counter {
    count: Arc<Mutex<i32>>,
}
impl Counter {
    fn new() -> Self {
        Self {
            count: Arc::new(Mutex::new(0)),
        }
    }
    
    fn increment(&self) {
        let mut count = self.count.lock().unwrap();
        *count += 1;
    }
    
    fn get_count(&self) -> i32 {
        *self.count.lock().unwrap()
    }
}
- 对于 IO 资源,考虑使用专门的管理任务:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
    // 创建通道
    let (tx, mut rx) = mpsc::channel(100);
    
    // 启动管理任务
    tokio::spawn(async move {
        // 假设这是一个数据库连接
        let db_connection = establish_connection().await;
        
        // 处理请求
        while let Some(request) = rx.recv().await {
            // 使用连接处理请求
            process_request(&db_connection, request).await;
        }
    });
    
    // 发送请求
    tx.send(Request::new()).await.unwrap();
}
- 1
- 0
- 
              
              
  分享
