speechbrain.dataio.sampler 模块

PyTorch 兼容采样器。

这些决定了遍历数据集的顺序。

作者
  • Aku Rouhe 2020

  • Samuele Cornell 2020

  • Ralf Leibold 2020

  • Artem Ploujnikov 2021

  • Andreas Nautsch 2021, 2023

  • Adel Moumen 2023

摘要

BalancingDataSampler

一种数据采样器,从数据集中获取单个键,并确保该键下的分布大致均匀

ConcatDatasetBatchSampler

该采样器专为标准的 Pytorch ConcatDataset 设计。

DistributedSamplerWrapper

该包装器允许将任何采样器(例如 batch 采样器)与分布式数据并行 (DDP) 正确配合使用。

DynamicBatchSampler

此 BatchSampler 通过按示例长度分组来对示例进行批处理。

ReproducibleRandomSampler

RandomSampler 的修改版,总是返回相同的数值。

ReproducibleWeightedRandomSampler

WeightedRandomSampler 的可重现修改版。

参考

class speechbrain.dataio.sampler.ReproducibleRandomSampler(data_source, seed=563375142, epoch=0, **kwargs)[源代码]

基类: RandomSampler

RandomSampler 的修改版,总是返回相同的数值。

另请参阅 torch.utils.data.RandomSampler。其行为和参数基本相同,但增加了“seed”和“epoch”参数,且不支持“generator”。

注意

在每个 epoch 之前调用 set_epoch。否则,采样器在每个 epoch 会产生相同的索引序列。

参数:
  • data_source (Dataset) – 用于采样索引的数据源。

  • seed (int) – 用于随机数生成器的基础种子。建议使用一个 0 和 1 位混合良好的值。

  • epoch (int) – 开始的 epoch。

  • **kwargs (dict) – 传递给父类的参数。

示例

>>> import torch
>>> from speechbrain.utils.checkpoints import Checkpointer
>>> from speechbrain.dataio.dataloader import SaveableDataLoader
>>> # An example "dataset"
>>> dataset = torch.arange(10).unsqueeze(1)
>>> # Create the random sampler:
>>> sampler = ReproducibleRandomSampler(dataset)
>>> dataloader = SaveableDataLoader(dataset, sampler = sampler,
...     num_workers = 3)
>>> # Setup the checkpointer.
>>> # Note that the sampler doesn't need to be saved itself.
>>> tmpdir = getfixture('tmpdir')
>>> checkpointer = Checkpointer(tmpdir, {"dataloader": dataloader})
>>> # Iterate:
>>> subset = []
>>> for i, data_point in enumerate(dataloader):
...     # Say you save a checkpoint on the fourth batch:
...     if i == 3:
...         _ = checkpointer.save_checkpoint(end_of_epoch = False)
...     # So let's save the numbers you would get if you continue
...     if i >= 4:
...         subset.append(data_point.item())
>>> # What if instead you had to restart the experiment?
>>> new_sampler = ReproducibleRandomSampler(dataset)
>>> new_dataloader = SaveableDataLoader(dataset, sampler = new_sampler,
...        num_workers = 3)
>>> new_checkpointer = Checkpointer(tmpdir, {"dataloader": new_dataloader})
>>> _ = new_checkpointer.recover_if_possible()
>>> # You'll get the same random order again:
>>> new_subset = [data_point.item() for data_point in new_dataloader]
>>> assert subset == new_subset
set_epoch(epoch)[源代码]

您也可以直接访问 self.epoch,但我们保留此接口以反映 torch.utils.data.distributed.DistributedSampler 的接口。

class speechbrain.dataio.sampler.ReproducibleWeightedRandomSampler(weights, num_samples, replacement, seed=129491412, epoch=0, **kwargs)[源代码]

基类: WeightedRandomSampler

WeightedRandomSampler 的可重现修改版。

另请参阅 torch.utils.data.WeightedRandomSampler。其行为和参数相同,但增加了“seed”和“epoch”参数,且不支持“generator”。

注意

在每个 epoch 之前调用 set_epoch。否则,采样器在每个 epoch 会产生相同的索引序列。

参数:
  • weights (float 序列) – 每个索引的权重。无需求和为一。

  • num_samples (int) – 抽取的样本数量

  • replacement (bool) – 是否进行替换抽取(在一个 epoch 的 num_samples 内)。

  • seed (int) – 用于随机数生成器的基础种子。建议使用一个 0 和 1 位混合良好的值。

  • epoch (int) – 开始的 epoch。

  • **kwargs (dict) – 传递给父类的参数。

示例

>>> a = ReproducibleWeightedRandomSampler([0.1, 0.9, 0.4, 0.7, 3.0, 0.6], 5, replacement=True)
>>> b = ReproducibleWeightedRandomSampler([0.1, 0.9, 0.4, 0.7, 3.0, 0.6], 5, replacement=True)
>>> list(a)
[3, 1, 4, 4, 4]
>>> list(b)
[3, 1, 4, 4, 4]
>>> a.set_epoch(1)
>>> list(a)
[4, 5, 4, 4, 3]
>>> b.set_epoch(1)
>>> list(b)
[4, 5, 4, 4, 3]
set_epoch(epoch)[源代码]

您也可以直接访问 self.epoch,但我们保留此接口以反映 torch.utils.data.distributed.DistributedSampler 的接口。

class speechbrain.dataio.sampler.ConcatDatasetBatchSampler(samplers, batch_sizes: tuple | list, epoch=0)[源代码]

基类: Sampler

该采样器专为标准的 Pytorch ConcatDataset 设计。

它用于从不同连接的数据集中检索元素,并将它们以由 batch_sizes 指定的比例放入同一批次中,例如 8, 16 意味着每个批次将包含 24 个元素,其中前 8 个元素属于 ConcatDataset 对象中的第一个数据集,后 16 个元素属于第二个数据集。支持超过两个数据集,在这种情况下,您需要提供 3 个批次大小。

注意

从数据集中抽取批次,直到最小长度的数据集耗尽为止。因此,训练 epoch 中的示例数量由长度最小的数据集决定。

参数:
  • samplers (listtuple) – pytorch 采样器的列表或元组

  • batch_sizes (list) – 批次大小。

  • epoch (int) – 开始的 epoch。

示例

>>> import torch
>>> from speechbrain.dataio.sampler import ConcatDatasetBatchSampler, ReproducibleRandomSampler
>>> from speechbrain.dataio.sampler import ReproducibleRandomSampler
>>> from speechbrain.dataio.dataloader import SaveableDataLoader
>>> # example "datasets"
>>> dataset1 = torch.arange(0, 10).unsqueeze(1)
>>> dataset2 = torch.arange(20, 40).unsqueeze(1)
>>> tot_dataset = torch.utils.data.ConcatDataset([dataset1, dataset2])
>>> sampler1 = ReproducibleRandomSampler(dataset1)
>>> sampler2 = ReproducibleRandomSampler(dataset2)
>>> tot_sampler = ConcatDatasetBatchSampler([sampler1, sampler2], [2, 4])
>>> dataloader = SaveableDataLoader(tot_dataset, batch_sampler = tot_sampler,
...     num_workers = 3)
>>> for data_point in dataloader:
...      assert len(data_point) == 6
...      for i in range(2):
...         assert data_point[i] in [x for x in range(0, 10)]
...      for i in range(2, 4):
...         assert data_point[i] in [x for x in range(10, 40)]
set_epoch(epoch)[源代码]

您也可以直接访问 self.epoch,但我们保留此接口以反映 torch.utils.data.distributed.DistributedSampler 的接口。

class speechbrain.dataio.sampler.DynamicBatchSampler(dataset, max_batch_length: int, num_buckets: int | None = None, length_func=<function DynamicBatchSampler.<lambda>>, shuffle: bool = True, batch_ordering: str = 'random', max_batch_ex: int | None = None, bucket_boundaries: ~typing.List[int] = [], lengths_list: list[int] | None = None, seed: int = 42, epoch: int = 0, drop_last: bool = False, verbose: bool = False)[源代码]

基类: Sampler

此 BatchSampler 通过按示例长度分组来对示例进行批处理。

批次中的每个示例具有大致相同的长度,从而最大程度地减少填充。这使得在示例长度可能差异很大的数据集(例如 Librispeech)上进行训练更快。灵感来自: https://tensorflowcn.cn/api_docs/python/tf/data/experimental/bucket_by_sequence_length

动态批处理通过指定 max_batch_length 进行,它是批次中示例长度之和的上限:例如,如果 ex1 长度为 4,ex2 长度为 5,并且 max_batch_length 设置为 6,则 ex1 和 ex2 将单独放置在两个不同的批次中。

每个示例的长度可以通过两种方式获取。如果输入数据集是 DynamicItemDataset,可以通过指定 length_func 获取。默认为标注中存在“duration”条目。每个示例的长度也可以在实例化此类时通过指定一个包含每个示例长度的列表并将其传递给 lengths_list 来传递。

通过定义一组可能的离散间隔(buckets)来将示例分组。长度落在这些间隔内的示例可以进行批处理。

可以使用参数 num_buckets 指定 bucket 的数量。通常存在一个此参数值的最佳范围。

如果 num_buckets == 1,所有示例都可以进行批处理。您具有最大的随机性,但训练速度会较慢,因为长示例和短示例可以混合批处理,导致大量值成为填充。随着 bucket 数量的增加,只有长度相似的示例才能进行分组。这权衡了速度与随机性。TLDR:数字越小 -> 随机性越好,数字越大 -> 训练速度越快。请注意:如果设置得太高,训练速度会降低。如果 num_buckets -> 数据集中的示例数量,批次大小将很小,影响训练速度并可能影响性能。

还可以通过向 bucket_boundaries 参数传递列表来指定 buckets,而不是指定 left_bucket_length 和 bucket_length_multiplier。

示例

>>> import torch
>>> import speechbrain as sb
>>> from speechbrain.dataio.sampler import DynamicBatchSampler
>>> from speechbrain.dataio.dataset import DynamicItemDataset
>>> from speechbrain.dataio.dataloader import SaveableDataLoader
>>> from speechbrain.dataio.batch import PaddedBatch
>>> import numpy as np
>>> item_lengths = sorted([np.random.randint(10, 100) for x in range(20)])
>>> dataset = {"ex_{}".format(x) : {"wav" :torch.randn(x)} for x in item_lengths}
>>> dataset = DynamicItemDataset(dataset)
>>> dataset.set_output_keys(["wav"])
>>> length_func = lambda x : len(x) # trivial in this example
>>> bsampler = DynamicBatchSampler(dataset, 20, 4, length_func, shuffle=False, batch_ordering='descending')
>>> dataloader = SaveableDataLoader(dataset, batch_sampler=bsampler, collate_fn=PaddedBatch)
>>> for i, b in enumerate(dataloader):
...     data, length = b["wav"]
>>> assert data.shape[-1] == max(item_lengths)
参数:
  • dataset (torch.utils.data.Dataset) – 从中采样元素的 Pytorch Dataset。

  • max_batch_length (int) – 批次中示例长度之和的上限。应根据您的 GPU 内存选择。

  • num_buckets (int) – 用于将示例分组的离散 bucket 数量。如果 num_buckets == 1,所有示例都可以进行批处理。随着 bucket 数量的增加,只有长度相似的示例才能进行分组。这权衡了速度与随机性。数字越小 -> 随机性越好,数字越大 -> 训练速度越快。但是,如果设置得太高,训练速度会降低。如果 num_buckets -> 数据集中的示例数量,批次大小将很小,影响训练速度并可能影响性能。注意:您必须手动指定 bucket_boundaries 或 bucket 数量。

  • length_func (callable) – 用于从数据集中获取每个示例长度的函数。此参数仅当数据集是 Speechbrain DynamicItemDataset 对象时可用。可以是任何 callable:例如 lambda x: x[“duration”]*16000 如果标注中的 duration key 是秒,并且文件采样率为 16kHz,则返回样本数量。

  • shuffle (bool) – 每个 epoch 之间是否打乱示例。

  • batch_ordering (string) – 如果为 random,则随机排列批次;否则按长度 ascendingdescending 排序。

  • max_batch_ex (int) – 如果设置,它会限制批次中示例的最大数量,当示例数量超过此处指定的值时,会取代 max_batch_length。例如,您有很多短示例,其批次大小会过大,您可以使用此参数限制这些短示例的批次大小。

  • bucket_boundaries (list) – 通过手动指定 bucket 的右边界来覆盖 bucket_length_multiplier 和 left_bucket_length。

  • lengths_list (list) – 通过传递一个包含数据集中每个示例长度的列表来覆盖 length_func。当数据集是纯粹的 Pytorch Dataset 对象而不是 DynamicItemDataset 对象时,必须设置此参数,因为 length_func 不能用于 Pytorch Dataset。

  • seed (int) – 随机种子。

  • epoch (int) – 开始的 epoch。

  • drop_last (bool) – 如果为 True,采样器将丢弃未分组的最后一个示例。

  • verbose (bool) – 如果为 True,则在第一个 epoch 也记录每个批次的统计信息。

get_durations(batch)[源代码]

获取批次中元素的持续时间。

set_epoch(epoch)[源代码]

您也可以直接访问 self.epoch,但我们保留此接口以反映 torch.utils.data.distributed.DistributedSampler 的接口。

class speechbrain.dataio.sampler.DistributedSamplerWrapper(sampler, *args, **kwargs)[源代码]

基类: DistributedSampler

该包装器允许将任何采样器(例如 batch 采样器)与分布式数据并行 (DDP) 正确配合使用。

盲目地将采样器传递给每个 DDP 进程将导致每个进程都能够访问数据集中的所有数据,而不是仅访问每个进程独有的数据集子集。此包装器可防止这种情况,并允许每个进程仅使用原始数据的子集。

注意

在 Brain 类中使用 DDP 训练时,它会自动应用于任何采样器。

set_epoch(epoch)[源代码]

将 set_epoch() 传递给 DistributedSampler 和包装器。

class speechbrain.dataio.sampler.BalancingDataSampler(dataset, key, num_samples=None, replacement=True, seed=563375142, epoch=0, **kwargs)[源代码]

基类: ReproducibleWeightedRandomSampler

一种数据采样器,从数据集中获取单个键,并确保该键下的分布大致均匀

参数:
  • dataset (DynamicItemDataset) – 将从中抽取样本的数据集

  • key (str) – 将从中获取样本的键

  • num_samples (int) – 抽取的样本数量

  • replacement (bool) – 是否进行替换抽取(在一个 epoch 的 num_samples 内)。

  • seed (int) – 用于随机数生成器的基础种子。建议使用一个 0 和 1 位混合良好的值。

  • epoch (int) – 开始的 epoch。

  • **kwargs (dict) – 传递给父类的参数。

示例

>>> from speechbrain.dataio.sampler import BalancingDataSampler
>>> from speechbrain.dataio.dataset import DynamicItemDataset
>>> sample_data = {
...   1: {"category": "A",
...       "text": "This is a test"},
...   2: {"category": "A",
...       "text": "This is a second test"},
...   3: {"category": "B",
...       "text": "This is a third test"}
...  }
>>> dataset = DynamicItemDataset(data=sample_data)
>>> sampler = BalancingDataSampler(
...     dataset=dataset,
...     key="category",
...     num_samples=10
... )
>>> sampler.weights
tensor([0.5000, 0.5000, 1.0000], dtype=torch.float64)
>>> it = iter(sampler)
>>> [next(it) for _ in range(10)]
[2, 2, 1, 2, 2, 0, 1, 1, 1, 2]