Open In Colab 以在 GitHub 上执行或查看/下载此笔记本

大型数据集和共享文件系统的数据加载

您是否有存储在共享文件系统中的大型数据集,并且希望将其用于训练神经网络?这个数据集是否大到甚至无法放入计算节点的本地 SSD?如果是这样,本教程将引导您完成从共享文件系统读取大文件所需的所有步骤。

在许多计算集群中,主要的数据存储是网络文件系统 (NFS),例如 LustreNFS 可以同时为许多用户提供服务,并从单个文件提供高数据吞吐量。然而,打开或列出许多不同的文件很慢 - 这样做可能会使整个系统对每个人都变慢,而不仅仅是违规用户。语音数据集通常包含许多非常小的录音。一次又一次地读取每个文件正是可能导致 NFS 变慢的数据 IO 类型。

一种解决方案是将数据集复制到计算节点的本地 SSD 中。这可以通过将数据集压缩到单个文件(例如 dataset.tar.gz)、将其复制到本地节点,最后解压缩(untarring)文件来相对高效地完成。从本地 SSD 读取文件非常高效,并且不会损害共享文件系统的性能。在这种情况下,标准的 SpeechBrain 数据 IO 工作良好,请参见本教程。然而,可能存在超出本地 SSD 大小的巨大数据集。

一种可能的解决方法是将数据保存在共享文件系统中,并将小型录音捆绑到更大的归档文件中,这些归档文件通常称为分片(shards)。从分片加载数据避免了打开过多的文件,因此速度很快。

从分片读取数据时,无法再对数据集进行随机访问。数据是从中按顺序读取的。这需要在准备实验时多加注意。

上面提到的分片 IO 情况在学术计算集群设置中很典型。流式数据 IO 也可以在更大的规模下与专用数据服务器一起使用。

在本教程中,我们将使用 WebDataset 库。WebDataset 的替代方案和使用场景由 WebDataset 开发人员在这个 PyTorch 提案中阐述。

什么是 WebDataset?

WebDataset 是一个与 PyTorch 兼容良好的分片(流式)数据 IO 库。WebDataset 使用标准的 TAR 归档文件作为分片格式,遵循一个简单的约定:所有连续的、具有相同基础文件名的文件属于同一个示例。因此,列出 data-archive/shard-0000.tar 的内容可能看起来像

> tar -t data-archives/shard-0000.tar
spk1-utt1.wav
spk1-utt1.txt
spk1-utt1.json
spk1-utt2.wav
spk1-utt2.txt
spk1-utt2.json
spk2-utt1wav
spk2-utt1.txt
spk2-utt1.json
...

在 Python 端,数据集接口是一个 IterableDataset,它有一组可以链式调用来构建数据流程的方法,例如

import webdataset as wds  # Note the typical import shorthand
dataset = (
      wds.WebDataset("data-archives/shard-00{00...24}.tar")  # 25 shards
      .decode()  # Automagically decode files
      .shuffle(size=1000)  # Shuffle on-the-fly in a buffer
      .batch(batchsize=10)  # Create batches
)

请注意,WebDataset(至少在撰写本文时)是一个快速发展的库。它也在考虑纳入 PyTorch 核心。再次强调,请此处此处阅读更多信息。

安装依赖

%%capture
# Installing SpeechBrain via pip
BRANCH = 'develop'
!python -m pip install git+https://github.com/speechbrain/speechbrain.git@$BRANCH
%%capture
!pip install "webdataset<0.2"
import speechbrain as sb
import webdataset as wds
import torch
import glob
import pathlib
import random

创建 TAR 分片

WebDataset 中的数据准备过程是迭代数据集中的每个示例并将其分割成 TAR 分片。TAR 文件是标准格式,因此您可以使用任何标准工具创建它们。WebDataset 提供了一些辅助工具,可以使这个过程更容易一些。

  • Tarp,一个基于 Go 的工具,可以将 TAR 流分割成多个分片,并执行一些其他的流处理任务。请参阅GitHub 页面。这是一个单独的工具,需要单独安装,但理论上 Go 可能比 Python 更快。

  • wds.ShardWriter,一个 Python 类,可以将 WebDataset 风格的 dict 写入 TAR 归档文件,并按照给定大小分割成多个分片。这是我们在此处将采用的方法。

下载一些数据

在本教程中,我们将使用 Mini Librispeech 的开发集(但我们会将其视为正常的训练数据)。

%%capture
!wget https://www.openslr.org/resources/31/dev-clean-2.tar.gz
!tar -xvzf dev-clean-2.tar.gz
!rm dev-clean-2.tar.gz

迭代数据

这一步当然会因数据集而异。在 Mini Librispeech 中,数据按说话人和文档组织。我们将首先读取所有转录文本,然后将其打乱,以便连续的示例不是来自同一说话人和文档。

DATAROOT = pathlib.Path("LibriSpeech/dev-clean-2")
SHARDSDIR = pathlib.Path("DATA-SHARDS")
SHARDSDIR.mkdir(exist_ok=True, parents=True)

# 1. Gather texts
# Note that here uttid encodes speaker and document IDs, so we don't need to
# keep track of them separately
texts = {}
for textf in DATAROOT.glob("*/*/*.trans.txt"):
    with open(textf) as fi:
        for line in fi:
            uttid, text = line.split(" ", maxsplit=1)
            texts[uttid] = text
            print(uttid, text)

# 2. Shuffle uttids
uttids = list(texts.keys())
random.shuffle(uttids)
print(uttids)
# 3. Create TARs
# In this example, we are only storing 100 examples / shard, because the full
# development set could probably fit in a normal shard. In practical setups
# use bigger values.
# maxcount sets the max number of examples, and maxsize
# sets the maximum size in bytes.

# 3A. Iterate over the shuffled uttids
# 3B. For each uttid, create an example dict
#   The example dict is written into a TAR stream. The special __key__
#   entry becomes the basename for this example's files, and the other
#   entries in the dict become files with different extensions.
#   E.G. with uttid "3536-23268-0007" this will write the files:
#     3536-23268-0007.audio.pth, 3536-23268-0007.text
#   There are default handlers for many extensions
#     See https://github.com/webdataset/webdataset/blob/6ee2279795b3f667bb7a5868af596990cc6efee3/webdataset/writer.py#L97

with wds.ShardWriter(f"{SHARDSDIR}/shard-%06d.tar", maxcount = 100) as writer:
    for uttid in uttids:
        spk, doc, _ = uttid.split("-")
        audio_fpath = (DATAROOT / spk / doc / uttid).with_suffix(".flac")
        audio_tensor = sb.dataio.dataio.read_audio(str(audio_fpath))
        example = {
            "__key__": uttid,
            "audio.pth": audio_tensor,
            "text": texts[uttid]
        }
        writer.write(example)
! cd DATA-SHARDS/
# Now we can load these shards.
# This uses the SpeechBrain batch class, but batching itself is done by
# WebDataset
dataset = (
      wds.WebDataset(str(SHARDSDIR)+"/shard-0000{00..10}.tar")
      .decode()
      .shuffle(100)
      .batched(batchsize=10,
               collation_fn=sb.dataio.batch.PaddedBatch)
)
batch = next(iter(dataset))
print(batch.text)
print(batch["audio.pth"])  # Because of the audio.pth name, attribute access doesn't work
print("How much of batch is padding [%]:",
      sb.dataio.iterators.padding_ratio(batch["audio.pth"].lengths).item()*100)

WebDataset 与 SpeechBrain

SpeechBrain 与任何 PyTorch 数据加载兼容,因此 WebDataset 可以直接使用而无需任何扩展(就像我们目前所做的那样)。然而,仍然存在三个问题

  1. 分片中的数据通常不是排序的(或者甚至是故意打乱的)。连续的语句长度会非常不同,需要大量填充。

  2. SaveableDataLoader 中的 epoch 内检查点机制不适用于 IterableDatasets。

  3. 在使用分布式数据并行时,很难实现精确的 epoch。(这个问题并非 WebDataset 或 SpeechBrain 特有。)

这些问题可以通过以下策略和扩展来解决

  1. SpeechBrain 实现了一个即时动态批处理和分桶迭代器。它与 webdataset.WebDataset 协同工作。

  • 分桶将长度相似的语句放在同一个批次中,减少填充量。

  • 动态批处理与分桶同时实现是很自然的,旨在产生总元素数量相似的批次。包含短语句的批次具有更大的批次大小,而包含长语句的批次具有较小的批次大小。

  • 流式数据加载需要即时操作。

  1. 不关心精确的 epoch。而是测量更新次数,并设置一个名义 epoch 长度(例如,一个 epoch = 2500 次更新)。

  2. 不关心精确的重启:当实验重启时,数据加载不会从上次停止的示例继续,而是从随机分配的分片重新开始。

训练数据加载流程中的一些更改

  • 首先,在加载流程中使用 .rename 以获得更合理命名的批次元素。这也会解决之前提到的 audio.pth 无法通过典型属性风格访问的问题。

  • 然后添加一个 .repeat,以便使用无限数据流。

  • 最后,主要更改是使用 sb.dataio.iterators.dynamic_bucketed_batch 作为批处理方法

    • 可以使用 .then 方法使用通用迭代器

    • 请参阅文档以了解参数。

    • 由于这也会涉及打乱操作,因此不再使用 WebDataset 的打乱功能。

dataset = (
      wds.WebDataset(str(SHARDSDIR)+"/shard-0000{00..10}.tar")
      .decode()
      .rename(id="__key__", signal="audio.pth", text="text")  # Mention all, even text.
      .repeat()
      .then(sb.dataio.iterators.dynamic_bucketed_batch,
            len_key = "signal",  # Which batch element's length to consider
            sampler_kwargs={
                "target_batch_numel":16000*45.,  # Add examples till they total 45 seconds
                "max_batch_numel":   16000*60.   # ... but so that they don't go over 60 seconds
            }
      )
)

batch = next(iter(dataset))
print("Batch size:", len(batch))
print("How much of batch is padding [%]:",
      sb.dataio.iterators.padding_ratio(batch.signal.lengths).item()*100)

更复杂的数据加载流程

  • 您可以使用 .map() 实现任意处理。

text_mapping = {"<PADDING>": 0}
index = 1
for example in wds.WebDataset(str(SHARDSDIR)+"/shard-0000{00..10}.tar").decode():
    for word in example["text"].split():
        if word not in text_mapping:
            text_mapping[word] = index
            index += 1

def text_to_index(sample):
    """Adds text_vec entry, a LongTensor for text"""
    sample["text_vec"] = torch.LongTensor(
        [text_mapping[word] for word in sample["text"].split()]
    )
    return sample
dataset = (
      wds.WebDataset(str(SHARDSDIR)+"/shard-0000{00..10}.tar")
      .decode()
      .rename(id="__key__", signal="audio.pth", text="text")
      .map(text_to_index)
      .repeat()
      .then(sb.dataio.iterators.dynamic_bucketed_batch,
            len_key = "signal",  # Which batch element's length to consider
            sampler_kwargs={
                "target_batch_numel":16000*45.,  # Add examples till they total 45 seconds
                "max_batch_numel":   16000*60.   # ... but so that they don't go over 60 seconds
            }
      )
)
batch = next(iter(dataset))
print(batch.text[0])
print(batch.text_vec.data[0])

如何处理 DataLoader

  • 由于我们的数据集返回的是批次(而不是单个示例),DataLoader 应设置 batch_size=None

    • 如果您的数据集来自 WebDataset,Brain 类(以及底层 sb.dataio.dataloader.make_dataloader)将自动设置此项。

  • 为了达到名义上的 epoch,SpeechBrain 提供了 sb.dataio.dataloader.LoopedLoader

    • 如果在 train_loader_kwargs 中指定了 looped_nominal_epoch(调用 .fit() 时),Brain 类(以及底层 sb.dataio.dataloader.make_dataloader)将使用此项。

    • Brain 类还会自动将此项添加到检查点器中,以便将其保存在检查点中(并且它也适用于 epoch 内检查点)。

dataloader = sb.dataio.dataloader.make_dataloader(dataset, looped_nominal_epoch=5)
for epoch in range(1,6):
    print("Epoch", epoch)
    for ind, batch in enumerate(dataloader, start=1):
        print("\tBatch", ind, ": batch size", len(batch))

引用 SpeechBrain

如果您在研究或商业中使用 SpeechBrain,请使用以下 BibTeX 条目引用它

@misc{speechbrainV1,
  title={Open-Source Conversational AI with {SpeechBrain} 1.0},
  author={Mirco Ravanelli and Titouan Parcollet and Adel Moumen and Sylvain de Langen and Cem Subakan and Peter Plantinga and Yingzhi Wang and Pooneh Mousavi and Luca Della Libera and Artem Ploujnikov and Francesco Paissan and Davide Borra and Salah Zaiem and Zeyu Zhao and Shucong Zhang and Georgios Karakasidis and Sung-Lin Yeh and Pierre Champion and Aku Rouhe and Rudolf Braun and Florian Mai and Juan Zuluaga-Gomez and Seyed Mahed Mousavi and Andreas Nautsch and Xuechen Liu and Sangeet Sagar and Jarod Duret and Salima Mdhaffar and Gaelle Laperriere and Mickael Rouvier and Renato De Mori and Yannick Esteve},
  year={2024},
  eprint={2407.00463},
  archivePrefix={arXiv},
  primaryClass={cs.LG},
  url={https://arxiv.org/abs/2407.00463},
}
@misc{speechbrain,
  title={{SpeechBrain}: A General-Purpose Speech Toolkit},
  author={Mirco Ravanelli and Titouan Parcollet and Peter Plantinga and Aku Rouhe and Samuele Cornell and Loren Lugosch and Cem Subakan and Nauman Dawalatabad and Abdelwahab Heba and Jianyuan Zhong and Ju-Chieh Chou and Sung-Lin Yeh and Szu-Wei Fu and Chien-Feng Liao and Elena Rastorgueva and François Grondin and William Aris and Hwidong Na and Yan Gao and Renato De Mori and Yoshua Bengio},
  year={2021},
  eprint={2106.04624},
  archivePrefix={arXiv},
  primaryClass={eess.AS},
  note={arXiv:2106.04624}
}