speechbrain.utils.parallel 模块
并行处理工具,有助于加速某些任务,例如数据预处理。
- 作者
Sylvain de Langen 2023
摘要
类
上下文管理器,在退出时取消列表中的所有元素。这用于在引发异常时更快地中止 futures。 |
函数
使用函数映射可迭代项,通过多个进程并行处理项块,并使用 tqdm 显示进度。 |
参考
- class speechbrain.utils.parallel.CancelFuturesOnExit(future_list)[source]
基类:
object
上下文管理器,在退出时取消列表中的所有元素。.cancel() 方法用于在引发异常时更快地中止 futures。
- speechbrain.utils.parallel.parallel_map(fn: Callable[[Any], Any], source: Iterable[Any], process_count: int = 2, chunk_size: int = 8, queue_size: int = 128, executor: Executor | None = None, progress_bar: bool = True, progress_bar_kwargs: dict = {'smoothing': 0.02})[source]
使用函数映射可迭代项,通过多个进程并行处理项块,并使用 tqdm 显示进度。
处理后的元素将始终按原始的正确顺序返回。与
ProcessPoolExecutor.map
不同,元素是惰性生产和消费的。- 参数:
fn (Callable) – 对 source 列表中的每个元素调用的函数。输出是对 source 列表调用 fn(elem) 后的迭代器。
source (Iterable) – 其元素通过映射函数的迭代器。
process_count (int) – 生成的进程数。如果提供了自定义执行器,则忽略此参数。对于 CPU 密集型任务,通常没有必要超过逻辑核心数。对于 IO 密集型任务,可能需要这样做以限制 iowait 中花费的时间。
chunk_size (int) – 一次向工作进程馈送多少元素。值 8 通常是可以接受的。值过低可能会增加开销并降低 CPU 占用率。
queue_size (int) – 主进程一次等待的块数。值过低会增加队列饥饿的可能性,迫使工作进程空闲。值过高可能会导致内存使用量高,尤其是当源可迭代对象生成大对象时。
executor (Optional[Executor]) – 允许提供现有的执行器(最好是 ProcessPoolExecutor)。如果为 None(默认值),则会为此映射任务生成一个进程池,并在任务完成后关闭。
progress_bar (bool) – 是否显示 tqdm 进度条。
progress_bar_kwargs (dict) – 当
progress_bar == True
时转发给 tqdm 的关键字参数字典。允许覆盖默认值,或者例如在无法从源可迭代对象推断时指定total
。
- 生成:
由 fn 处理过的 source 中的项