Rust 并行库 crossbeam 的 Channel 示例

news/2025/2/23 15:46:43

示例1

一个不完整的示例:

rust">let (tx, rx) = channel::unbounded::<Task>();
let mut handlers = vec![];

for _ in 0..number {
    let rx = rx.clone();
    let handle = thread::spawn(move || {
        while let Some(task) = rx.recv() {
            task.call_box();
        }
    });

    handlers.push(handle);
}

该例子中,rx 可以被多个线程使用,是线程安全的。这就是所谓的 MPMC 模式。设想 channel 中有 10 个数据,MPMC 模式允许10个线程同时利用 rx 从 channel 中读取数据。

Rust 自带的 channel 是 MPSC 模式的,一次仅允许一个线程从 channel 读取数据。显然 crossbeam 效率更高。

示例2

rust">use crossbeam::channel;
use crossbeam::thread;
use std::thread::sleep;
use std::time::Duration;

// 定义Task结构体
struct Task {
    data: usize, // 假设任务包含一个数据字段
    call_box: Box<dyn FnMut()>, // 假设任务包含一个可调用对象的装箱指针
}

impl Task {
    fn new(data: usize, call_box: impl FnMut() + 'static) -> Self {
        Task {
            data,
            call_box: Box::new(call_box),
        }
    }

    // 实现call_box方法
    fn call_box(&mut self) {
        (self.call_box)();
    }
}

fn main() {
    const NUMBER_OF_WORKERS: usize = 4; // 假设有4个工作线程
    let (tx, rx) = channel::unbounded::<Task>();
    let mut handlers = vec![];

    // 启动工作线程
    for _ in 0..NUMBER_OF_WORKERS {
        let rx = rx.clone();
        let handle = thread::spawn(move || {
            while let Some(task) = rx.recv() {
                task.call_box(); // 执行任务
            }
        });
        handlers.push(handle);
    }

    // 发送任务到通道
    for i in 0..10 { // 假设发送10个任务
        let task = Task::new(i, || {
            println!("Executing task with data: {}", i);
            sleep(Duration::from_secs(1)); // 模拟耗时操作
            println!("Finished task with data: {}", i);
        });
        tx.send(task).unwrap();
    }

    // 关闭发送通道
    drop(tx);

    // 等待所有工作线程完成
    for handle in handlers {
        handle.join().unwrap();
    }

    println!("All tasks are processed.");
}

在这个程序中,我们定义了一个Task结构体,它包含一个data字段和一个call_box字段,后者是一个装箱的可调用对象。我们实现了call_box方法,它调用这个装箱的可调用对象。

main函数中,我们创建了一个无界通道,用于在工作线程和主线程之间传递Task实例。我们启动了NUMBER_OF_WORKERS个工作线程,它们不断地从通道接收Task实例并调用call_box方法执行它们。

然后,主线程创建了一些Task实例,并通过通道发送它们给工作线程。一旦所有任务都被发送,主线程通过drop(tx)关闭了发送通道,这样工作线程在尝试接收任务时,如果没有更多任务可用,将会得到一个None,从而退出循环。

最后,主线程等待所有工作线程完成,并打印出消息表示所有任务都已经处理完毕。

请注意,为了简化示例,我使用了Box<dyn FnMut()>来允许Task存储任何可调用对象的装箱指针。这意味着任务中的可调用对象必须能够单独编译成一个独立的、无状态的函数,这样才能安全地在多个线程之间共享。在实际应用中,你可能需要根据你的具体需求调整Task结构体的设计和使用方式。


http://www.niftyadmin.cn/n/5428365.html

相关文章

郭炜老师mooc第十一章数据分析和展示(numpy,pandas, matplotlib)

多维数组库numpy numpy创建数组的常用函数 # numpy数组import numpy as np #以后numpy简写为np print(np.array([1,2,3])) #>>[1 2 3] print(np.arange(1,9,2)) #>>[1 3 5 7] 不包括9 print(np.linspace(1,10,4)) #>>[ 1. 4. 7. 10.] # linespace(x,y,n)&…

Java初阶数据结构队列的实现

1.队列的概念 1.队列就是相当于排队打饭 2.在排队的时候就有一个队头一个队尾。 3.从队尾进对头出 4.所以他的特点就是先进先出 所以我们可以用链表来实现 单链表实现要队尾进队头出{要有last 尾插头删} 双向链表实现效率高&#xff1a;不管从哪个地方当作队列都是可以的&…

HttpResponse响应模块设计与实现(http模块三)

目录 类功能 类定义 类实现 编译测试 类功能 类定义 // HttpResponse响应模块功能设计 class HttpResponse { private:int _statu;bool _redirect_flag; // 重定向标志std::string _body;std::string _redirect_url; // 重定向地址std::unordered_map<std::string, std…

前端代码整洁与规范之CSS篇

一、代码整洁 1. 命名规范 CSS 类名的命名应该简洁清晰&#xff0c;能够准确描述元素的作用。避免使用无意义的名称&#xff0c;例如“a”、“b”等&#xff0c;而应该使用有意义的英文单词或单词缩写。同时&#xff0c;也要避免使用驼峰命名法和下划线命名法混杂使用&#x…

Linux开发:父子进程间通过匿名管道传输数据

匿名管道是Linux用于父子进程间传输数据的一种方式,其主要的特点是: 1.管道在父进程中创建 2.管道一共有两个文件描述符,fd[0]用于读取,fd[1]用于写入 3.父进程fork,创建子进程 4.子进程继承了父进程创建的两个管道文件描述符 5.根据需要,父子进程各关闭一个文件描述…

MySQL基础---SQL语句2(WHERE、AND、OR、ORDER BY、COUNT)

1. WHERE 子句 1. 语法 WHERE 子句用于限定选择的标准 在 slelece、update、delete 语句中&#xff0c;皆可使用 WHERE 子句来限定选择的标准 -- 查询语句 select 列名称 form 表名称 where 列 运算符 值-- 更新语句 update 列名称 form 列新值 where 列 运算符 值-- 删除语句…

Python环境搭建 -- Python与PyCharm安装

一、Python安装 我们先找到Python的官方网站&#xff0c;在浏览器中搜索Python即可&#xff0c;然后进入Python官网 点击Downloads&#xff0c;选择对应匹配的操作系统 点进去之后&#xff0c;Python的版本分为稳定的版本和前置版本&#xff0c;前置的版本就是还没有发行的版本…

AI在金融服务行业的现状及发展趋势

在金融服务行业中&#xff0c;AI的融入和应用已经成为推动该行业发展的关键力量。英伟达发布的第四届年度金融服务行业人工智能现状报告&#xff0c;揭示了2024年AI在金融领域的新兴趋势和应用前景。报告中指出&#xff0c;超过91%的金融服务公司正在评估或已经将AI技术应用到企…