speechbrain.dataio.dataio 模块
数据读写。
- 作者
Mirco Ravanelli 2020
Aku Rouhe 2020
Ju-Chieh Chou 2020
Samuele Cornell 2020
Abdel HEBA 2020
Gaëlle Laperrière 2021
Sahar Ghannay 2021
Sylvain de Langen 2022
Adel Moumen 2025
摘要
类
逐行写入 CSV 文件。 |
函数
创建附加 <eos> 标记的标签。 |
|
将指定张量上的任何填充值设置为 mask_value。 |
|
将指定张量上的任何填充值设置为 mask_value。 |
|
将一批整数 ID 转换为字符串标签。 |
|
保留语义概念和值用于评估。 |
|
获取输入文件的 md5 校验和。 |
|
为每个序列创建一个二进制掩码。 |
|
加载 CSV 并格式化字符串值。 |
|
加载 JSON 并递归格式化字符串值。 |
|
用于加载 .pkl pickle 文件的实用函数。 |
|
加载 pkl 文件。 |
|
将字符序列合并为单词序列。 |
|
将多个 csv 文件合并为一个文件。 |
|
创建开头附加 <bos> 标记的标签。 |
|
基于自定义标记的通用音频加载。 |
|
从文件路径检索音频元数据。 |
|
基于自定义标记的通用音频加载。 |
|
读取 kaldi 格式的标签。 |
|
将 SpeechBrain 风格的相对长度转换为绝对持续时间。 |
|
将输入文件列表的 md5 保存为 pickled 字典到文件中。 |
|
以 pkl 格式保存对象。 |
|
将单词序列拆分为字符序列。 |
|
将音频写入磁盘。 |
|
将数据写入标准输出。 |
|
以文本格式写入数据。 |
参考
- speechbrain.dataio.dataio.load_data_json(json_path, replacements={})[source]
加载 JSON 并递归格式化字符串值。
- 参数:
- 返回:
应用替换后的 JSON 数据。
- 返回类型:
示例
>>> json_spec = '''{ ... "ex1": {"files": ["{ROOT}/mic1/ex1.wav", "{ROOT}/mic2/ex1.wav"], "id": 1}, ... "ex2": {"files": [{"spk1": "{ROOT}/ex2.wav"}, {"spk2": "{ROOT}/ex2.wav"}], "id": 2} ... } ... ''' >>> tmpfile = getfixture('tmpdir') / "test.json" >>> with open(tmpfile, "w", encoding="utf-8") as fo: ... _ = fo.write(json_spec) >>> data = load_data_json(tmpfile, {"ROOT": "/home"}) >>> data["ex1"]["files"][0] '/home/mic1/ex1.wav' >>> data["ex2"]["files"][1]["spk2"] '/home/ex2.wav'
- speechbrain.dataio.dataio.load_data_csv(csv_path, replacements={})[source]
加载 CSV 并格式化字符串值。
使用 SpeechBrain 传统的 CSV 数据格式,其中 CSV 必须包含一个 ‘ID’ 字段。如果存在名为 duration 的字段,则将其解释为浮点数。其余字段保持原样 (传统的 _format 和 _opts 字段不会以任何特殊方式用于加载数据)。
支持使用 $to_replace 进行类似 Bash 的字符串替换。
- 参数:
- 返回:
应用替换后的 CSV 数据。
- 返回类型:
示例
>>> csv_spec = '''ID,duration,wav_path ... utt1,1.45,$data_folder/utt1.wav ... utt2,2.0,$data_folder/utt2.wav ... ''' >>> tmpfile = getfixture("tmpdir") / "test.csv" >>> with open(tmpfile, "w", encoding="utf-8") as fo: ... _ = fo.write(csv_spec) >>> data = load_data_csv(tmpfile, {"data_folder": "/home"}) >>> data["utt1"]["wav_path"] '/home/utt1.wav'
- speechbrain.dataio.dataio.read_audio_info(path, backend=None) AudioMetaData [source]
从文件路径检索音频元数据。其行为与 torchaudio.info 相同,但尝试修复在某些 torchaudio 版本和编解码器组合下可能损坏的元数据(例如帧数)。
请注意,在某些情况下,这可能导致完全的文件遍历!
- 参数:
- 引发:
ValueError – 如果
backend
不是允许的值之一。必须是 [None, ‘ffmpeg’, ‘sox’, ‘soundfile’] 之一。- 返回:
与
torchaudio.info
返回的值相同,但如果num_frames
原本是== 0
,则可能会最终被修正。- 返回类型:
torchaudio.backend.common.AudioMetaData
注意
某些编解码器,例如 MP3,需要完整的文件遍历才能检索准确的长度信息。在这种情况下,你也可以直接读取整个音频文件,以避免处理时间加倍。
- speechbrain.dataio.dataio.read_audio(waveforms_obj, backend=None)[source]
基于自定义标记的通用音频加载。
预期的用例是与 JSON 指定的数据集结合使用。
参数可能只是一个文件路径:
read_audio("/path/to/wav1.wav")
或者,你可以在字典中指定更多选项,例如:``` # 从采样点 8000 到 15999 加载文件 read_audio({
“file”: “/path/to/wav2.wav”, “start”: 8000, “stop”: 16000
})
支持哪些编解码器取决于你的 torchaudio 后端。有关详细信息,请参阅
torchaudio.load
文档。- 参数 waveforms_obj:
音频路径或包含所需配置的字典。
字典变体的键: -
"file"
(str): 音频文件路径。 -"start"
(int, 可选):要加载的第一个采样点。如果未指定,则从第一帧开始加载。 -"stop"
(int, 可选):要加载的最后一个采样点(不包含)。如果未指定或等于 start,则从start
加载到文件末尾。如果stop
超出文件的采样点数,也不会失败,但会返回较少的帧。- 类型 waveforms_obj:
str, dict
- 参数 backend:
用于加载音频文件的音频后端。必须是 ‘ffmpeg’、‘sox’、‘soundfile’ 或 None 之一。如果为 None,则使用 torchaudio 的默认后端。
- 类型 backend:
str, 可选
- 返回:
1 通道:形状为
(samples, )
的音频张量。>=2 通道:形状为(samples, channels)
的音频张量。- 返回类型:
torch.Tensor
- 引发 ValueError:
如果
backend
不是允许的值之一。必须是 [None, ‘ffmpeg’, ‘sox’, ‘soundfile’] 之一。
示例
>>> dummywav = torch.rand(16000) >>> import os >>> tmpfile = str(getfixture('tmpdir') / "wave.wav") >>> write_audio(tmpfile, dummywav, 16000) >>> asr_example = { "wav": tmpfile, "spk_id": "foo", "words": "foo bar"} >>> loaded = read_audio(asr_example["wav"]) >>> loaded.allclose(dummywav.squeeze(0),atol=1e-4) # replace with eq with sox_io backend True
- speechbrain.dataio.dataio.read_audio_multichannel(waveforms_obj, backend=None)[source]
基于自定义标记的通用音频加载。
预期的用例是与 JSON 指定的数据集结合使用。
自定义标记
标注可以只是一个文件路径:“/path/to/wav1.wav”
可以指定多个(可能是多通道)文件,只要它们的长度相同:{“files”: [
“/path/to/wav1.wav”, “/path/to/wav2.wav” ]
}
或者你可以更简洁地指定单个文件:{“files”: “/path/to/wav2.wav”}
还可以指定偏移采样点数和停止采样点数,以仅读取文件中的一个片段。{“files”: [
“/path/to/wav1.wav”, “/path/to/wav2.wav” ]
“start”: 8000 “stop”: 16000 }
- 参数:
- 引发:
ValueError – 如果
backend
不是允许的值之一。必须是 [None, ‘ffmpeg’, ‘sox’, ‘soundfile’] 之一。- 返回:
形状为 (samples, ) 的音频张量。
- 返回类型:
torch.Tensor
示例
>>> dummywav = torch.rand(16000, 2) >>> import os >>> tmpfile = str(getfixture('tmpdir') / "wave.wav") >>> write_audio(tmpfile, dummywav, 16000) >>> asr_example = { "wav": tmpfile, "spk_id": "foo", "words": "foo bar"} >>> loaded = read_audio(asr_example["wav"]) >>> loaded.allclose(dummywav.squeeze(0),atol=1e-4) # replace with eq with sox_io backend True
- speechbrain.dataio.dataio.write_audio(filepath, audio, samplerate)[source]
将音频写入磁盘。它基本上是一个包装器,用于支持以 SpeechBrain 格式(音频,通道)保存音频信号。
- 参数:
filepath (path) – 保存音频文件的路径。
audio (torch.Tensor) – 预期 SpeechBrain 格式(信号,通道)的音频文件。
samplerate (int) – 采样率(例如,16000)。
示例
>>> import os >>> tmpfile = str(getfixture('tmpdir') / "wave.wav") >>> dummywav = torch.rand(16000, 2) >>> write_audio(tmpfile, dummywav, 16000) >>> loaded = read_audio(tmpfile) >>> loaded.allclose(dummywav,atol=1e-4) # replace with eq with sox_io backend True
- speechbrain.dataio.dataio.convert_index_to_lab(batch, ind2lab)[source]
将一批整数 ID 转换为字符串标签。
- 参数:
- 返回:
列表的列表,大小与批次相同,包含来自 ind2lab 的标签。
- 返回类型:
示例
>>> ind2lab = {1: "h", 2: "e", 3: "l", 4: "o"} >>> out = convert_index_to_lab([[4,1], [1,2,3,3,4]], ind2lab) >>> for seq in out: ... print("".join(seq)) oh hello
- speechbrain.dataio.dataio.relative_time_to_absolute(batch, relative_lens, rate)[source]
将 SpeechBrain 风格的相对长度转换为绝对持续时间。
在批次级别上操作。
- 参数:
batch (torch.Tensor) – 用于确定持续时间的序列。
relative_lens (torch.Tensor) – 批次中每个序列的相对长度。批次中最长的序列必须具有相对长度 1.0。
rate (float) – 序列元素在真实世界时间中出现的速率。如果批次是原始 wavs (推荐),则是采样率;如果批次是特征,则是 1/frame_shift。单位必须是 1/s。
- 返回:
每个序列的持续时间(秒)。
- 返回类型:
torch.Tensor
示例
>>> batch = torch.ones(2, 16000) >>> relative_lens = torch.tensor([3./4., 1.0]) >>> rate = 16000 >>> print(relative_time_to_absolute(batch, relative_lens, rate)) tensor([0.7500, 1.0000])
- class speechbrain.dataio.dataio.IterativeCSVWriter(outstream, data_fields, defaults={})[source]
基类:
object
逐行写入 CSV 文件。
- 参数:
示例
>>> import io >>> f = io.StringIO() >>> writer = IterativeCSVWriter(f, ["phn"]) >>> print(f.getvalue()) ID,duration,phn,phn_format,phn_opts >>> writer.write("UTT1",2.5,"sil hh ee ll ll oo sil","string","") >>> print(f.getvalue()) ID,duration,phn,phn_format,phn_opts UTT1,2.5,sil hh ee ll ll oo sil,string, >>> writer.write(ID="UTT2",phn="sil ww oo rr ll dd sil",phn_format="string") >>> print(f.getvalue()) ID,duration,phn,phn_format,phn_opts UTT1,2.5,sil hh ee ll ll oo sil,string, UTT2,,sil ww oo rr ll dd sil,string, >>> writer.set_default('phn_format', 'string') >>> writer.write_batch(ID=["UTT3","UTT4"],phn=["ff oo oo", "bb aa rr"]) >>> print(f.getvalue()) ID,duration,phn,phn_format,phn_opts UTT1,2.5,sil hh ee ll ll oo sil,string, UTT2,,sil ww oo rr ll dd sil,string, UTT3,,ff oo oo,string, UTT4,,bb aa rr,string,
- speechbrain.dataio.dataio.write_txt_file(data, filename, sampling_rate=None)[source]
以文本格式写入数据。
- 参数:
data (str, list, torch.Tensor, numpy.ndarray) – 要写入文本文件的数据。
filename (str) – 写入数据的文件路径。
sampling_rate (None) – 未使用,仅用于接口兼容性。
示例
>>> tmpdir = getfixture('tmpdir') >>> signal=torch.tensor([1,2,3,4]) >>> write_txt_file(signal, tmpdir / 'example.txt')
- speechbrain.dataio.dataio.write_stdout(data, filename=None, sampling_rate=None)[source]
将数据写入标准输出。
- 参数:
data (str, list, torch.Tensor, numpy.ndarray) – 要写入文本文件的数据。
filename (None) – 未使用,仅用于兼容性。
sampling_rate (None) – 未使用,仅用于兼容性。
示例
>>> tmpdir = getfixture('tmpdir') >>> signal = torch.tensor([[1,2,3,4]]) >>> write_stdout(signal, tmpdir / 'example.txt') [1, 2, 3, 4]
- speechbrain.dataio.dataio.length_to_mask(length, max_len=None, dtype=None, device=None)[source]
为每个序列创建一个二进制掩码。
参考: https://discuss.pytorch.org/t/how-to-generate-variable-length-mask/23397/3
- 参数:
length (torch.LongTensor) – 包含批次中每个序列长度的张量。必须是一维的。
max_len (int) – 掩码的最大长度,也是第二维的大小。
dtype (torch.dtype, default: None) – 生成掩码的数据类型。
device (torch.device, default: None) – 放置掩码变量的设备。
- 返回:
mask – 二进制掩码。
- 返回类型:
tensor
示例
>>> length=torch.Tensor([1,2,3]) >>> mask=length_to_mask(length) >>> mask tensor([[1., 0., 0.], [1., 1., 0.], [1., 1., 1.]])
- speechbrain.dataio.dataio.read_kaldi_lab(kaldi_ali, kaldi_lab_opts)[source]
读取 kaldi 格式的标签。
使用 kaldi IO。
- 参数:
- 返回:
lab – 包含标签的字典。
- 返回类型:
注意
这依赖于 kaldi-io-for-python。请单独安装。参见:https://github.com/vesis84/kaldi-io-for-python
示例
此示例需要 kaldi 文件。
` lab_folder = '/home/kaldi/egs/TIMIT/s5/exp/dnn4_pretrain-dbn_dnn_ali' read_kaldi_lab(lab_folder, 'ali-to-pdf') `
- speechbrain.dataio.dataio.get_md5(file)[source]
获取输入文件的 md5 校验和。
- 参数:
file (str) – 计算校验和的文件路径。
- 返回:
给定文件路径的校验和。
- 返回类型:
md5
示例
>>> get_md5('tests/samples/single-mic/example1.wav') 'c482d0081ca35302d30d12f1136c34e5'
- speechbrain.dataio.dataio.save_md5(files, out_file)[source]
将输入文件列表的 md5 保存为 pickled 字典到文件中。
示例
>>> files = ['tests/samples/single-mic/example1.wav'] >>> tmpdir = getfixture('tmpdir') >>> save_md5(files, tmpdir / "md5.pkl")
- speechbrain.dataio.dataio.save_pkl(obj, file)[source]
以 pkl 格式保存对象。
示例
>>> tmpfile = getfixture('tmpdir') / "example.pkl" >>> save_pkl([1, 2, 3, 4, 5], tmpfile) >>> load_pkl(tmpfile) [1, 2, 3, 4, 5]
- speechbrain.dataio.dataio.load_pkl(file)[source]
加载 pkl 文件。
示例请参见
save_pkl
。- 参数:
file (str) – 输入 pkl 文件的路径。
- 返回类型:
加载的对象。
- speechbrain.dataio.dataio.prepend_bos_token(label, bos_index)[source]
创建开头附加 <bos> 标记的标签。
- 参数:
label (torch.IntTensor) – 包含原始标签的张量。大小必须为:[batch_size, max_length]。
bos_index (int) – <bos> 标记的索引。
- 返回:
new_label – 开头带有 <bos> 的新标签。
- 返回类型:
tensor
示例
>>> label=torch.LongTensor([[1,0,0], [2,3,0], [4,5,6]]) >>> new_label=prepend_bos_token(label, bos_index=7) >>> new_label tensor([[7, 1, 0, 0], [7, 2, 3, 0], [7, 4, 5, 6]])
- speechbrain.dataio.dataio.append_eos_token(label, length, eos_index)[source]
创建附加 <eos> 标记的标签。
- 参数:
label (torch.IntTensor) – 包含原始标签的张量。大小必须为:[batch_size, max_length]。
length (torch.LongTensor) – 包含每个标签序列原始长度的张量。必须是一维的。
eos_index (int) – <eos> 标记的索引。
- 返回:
new_label – 末尾附加了 <eos> 的新标签。
- 返回类型:
tensor
示例
>>> label=torch.IntTensor([[1,0,0], [2,3,0], [4,5,6]]) >>> length=torch.LongTensor([1,2,3]) >>> new_label=append_eos_token(label, length, eos_index=7) >>> new_label tensor([[1, 7, 0, 0], [2, 3, 7, 0], [4, 5, 6, 7]], dtype=torch.int32)
- speechbrain.dataio.dataio.merge_char(sequences, space='_')[source]
将字符序列合并为单词序列。
- 参数:
sequences (list) – 每个项包含一个列表,该列表包含一个字符序列。
space (string) – 表示空格的标记。默认值:_
- 返回类型:
包含每个句子的词序列的列表。
示例
>>> sequences = [["a", "b", "_", "c", "_", "d", "e"], ["e", "f", "g", "_", "h", "i"]] >>> results = merge_char(sequences) >>> results [['ab', 'c', 'de'], ['efg', 'hi']]
- speechbrain.dataio.dataio.merge_csvs(data_folder, csv_lst, merged_csv)[source]
将多个 csv 文件合并为一个文件。
- 参数:
data_folder (string) – 存储待合并和合并后 csv 文件的文件夹。
csv_lst (list) – 待合并的 csv 文件名列表。
merged_csv (string) – 写入合并后 csv 文件的文件名。
示例
>>> tmpdir = getfixture('tmpdir') >>> os.symlink(os.path.realpath("tests/samples/annotation/speech.csv"), tmpdir / "speech.csv") >>> merge_csvs(tmpdir, ... ["speech.csv", "speech.csv"], ... "test_csv_merge.csv")
- speechbrain.dataio.dataio.split_word(sequences, space='_')[source]
将单词序列拆分为字符序列。
- 参数:
sequences (list) – 每个项包含一个列表,该列表包含一个词序列。
space (string) – 表示空格的标记。默认值:_
- 返回类型:
包含每个句子的词序列的列表。
示例
>>> sequences = [['ab', 'c', 'de'], ['efg', 'hi']] >>> results = split_word(sequences) >>> results [['a', 'b', '_', 'c', '_', 'd', 'e'], ['e', 'f', 'g', '_', 'h', 'i']]
- speechbrain.dataio.dataio.clean_padding_(tensor, length, len_dim=1, mask_value=0.0)[source]
将指定张量上的任何填充值设置为 mask_value。
例如,这可以用于在训练期间将自动编码器超出指定长度的输出归零。
这是一个就地操作。
- 参数:
tensor (torch.Tensor) – 任意维度的张量。
length (torch.Tensor) – 一维长度张量。
len_dim (int) – 表示长度的维度。
mask_value (mixed) – 要分配给填充位置的值。
示例
>>> import torch >>> x = torch.arange(5).unsqueeze(0).repeat(3, 1) >>> x = x + torch.arange(3).unsqueeze(-1) >>> x tensor([[0, 1, 2, 3, 4], [1, 2, 3, 4, 5], [2, 3, 4, 5, 6]]) >>> length = torch.tensor([0.4, 1.0, 0.6]) >>> clean_padding_(x, length=length, mask_value=10.) >>> x tensor([[ 0, 1, 10, 10, 10], [ 1, 2, 3, 4, 5], [ 2, 3, 4, 10, 10]]) >>> x = torch.arange(5)[None, :, None].repeat(3, 1, 2) >>> x = x + torch.arange(3)[:, None, None] >>> x = x * torch.arange(1, 3)[None, None, :] >>> x = x.transpose(1, 2) >>> x tensor([[[ 0, 1, 2, 3, 4], [ 0, 2, 4, 6, 8]], [[ 1, 2, 3, 4, 5], [ 2, 4, 6, 8, 10]], [[ 2, 3, 4, 5, 6], [ 4, 6, 8, 10, 12]]]) >>> clean_padding_(x, length=length, mask_value=10., len_dim=2) >>> x tensor([[[ 0, 1, 10, 10, 10], [ 0, 2, 10, 10, 10]], [[ 1, 2, 3, 4, 5], [ 2, 4, 6, 8, 10]], [[ 2, 3, 4, 10, 10], [ 4, 6, 8, 10, 10]]])
- speechbrain.dataio.dataio.clean_padding(tensor, length, len_dim=1, mask_value=0.0)[source]
将指定张量上的任何填充值设置为 mask_value。
例如,这可以用于在训练期间将自动编码器超出指定长度的输出归零。
此版本操作不会修改原始张量。
- 参数:
tensor (torch.Tensor) – 任意维度的张量。
length (torch.Tensor) – 一维长度张量。
len_dim (int) – 表示长度的维度。
mask_value (mixed) – 要分配给填充位置的值。
- 返回:
result – 填充更新后的张量。
- 返回类型:
torch.Tensor
示例
>>> import torch >>> x = torch.arange(5).unsqueeze(0).repeat(3, 1) >>> x = x + torch.arange(3).unsqueeze(-1) >>> x tensor([[0, 1, 2, 3, 4], [1, 2, 3, 4, 5], [2, 3, 4, 5, 6]]) >>> length = torch.tensor([0.4, 1.0, 0.6]) >>> x_p = clean_padding(x, length=length, mask_value=10.) >>> x_p tensor([[ 0, 1, 10, 10, 10], [ 1, 2, 3, 4, 5], [ 2, 3, 4, 10, 10]]) >>> x = torch.arange(5)[None, :, None].repeat(3, 1, 2) >>> x = x + torch.arange(3)[:, None, None] >>> x = x * torch.arange(1, 3)[None, None, :] >>> x = x.transpose(1, 2) >>> x tensor([[[ 0, 1, 2, 3, 4], [ 0, 2, 4, 6, 8]], [[ 1, 2, 3, 4, 5], [ 2, 4, 6, 8, 10]], [[ 2, 3, 4, 5, 6], [ 4, 6, 8, 10, 12]]]) >>> x_p = clean_padding(x, length=length, mask_value=10., len_dim=2) >>> x_p tensor([[[ 0, 1, 10, 10, 10], [ 0, 2, 10, 10, 10]], [[ 1, 2, 3, 4, 5], [ 2, 4, 6, 8, 10]], [[ 2, 3, 4, 10, 10], [ 4, 6, 8, 10, 10]]])
- speechbrain.dataio.dataio.extract_concepts_values(sequences, keep_values, tag_in, tag_out, space)[source]
保留语义概念和值用于评估。
- 参数:
- 返回类型:
包含每个句子的概念和值序列的列表。
示例
>>> sequences = [['<response>','_','n','o','_','>','_','<localisation-ville>','_','L','e','_','M','a','n','s','_','>'], ['<response>','_','s','i','_','>'],['v','a','_','b','e','n','e']] >>> results = extract_concepts_values(sequences, True, '<', '>', '_') >>> results [['<response> no', '<localisation-ville> Le Mans'], ['<response> si'], ['']]