第二章 Audio Tokenizer 音频编码器
把 MAESTRO 音乐数据转成离散音频 token 数据集
Introduction:
1、下载数据集:Maestro-v3.0.0
2、音频预处理:加载 + Resample
3、量化器:EnCodec 的量化器是 RVQ
4、编码 :编码器 + 量化器(RVQ)
5、保存 index
0、环境配置
!pip install -q torch torchaudio encodec torchcodec huggingface_hub
import torch
import torchaudio
import encodec
print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
import os
os.environ["TORCH_HOME"] = "/mnt/workspace/torch_cache"
os.environ["HF_ENDPOINT"] = "https://hf-mirror.com"
!apt-get update -qq && apt-get install -y -qq jq
1、下载数据集:Maestro-v3.0.0
Maestro-v3.0.0: https://huggingface.co/datasets/ddPn08/maestro-v3.0.0/tree/main/2004
运行下面 Cell 共 72 个 .wav 文件。(7.0 GB)
下载 10 个左右的 .wav 文件可能会断开。由于是顺序下载,所以找到下载停止的文件,在 wavlist.txt 里找到这一行,删去这一行以上所有文件名。
之后重新运行以下 Cell 即可继续下载。
%%bash
total=$(wc -l < midi_list.txt)
count=0
while read file; do
count=$((count+1))
echo "Downloading $count / $total : $file"
mkdir -p maestro_2018_midi/$(dirname "$file")
wget -c --progress=bar:force \
-O maestro_2018_midi/$file \
https://hf-mirror.com/datasets/ddPn08/maestro-v3.0.0/resolve/main/$file
done < midi_list.txt
2、音频预处理:加载 + 重采样(Resample)
这里首先用 librosa.load 加载。
import os
import torch
import librosa
AUDIO_DIR = "maestro_2004_wav/2004"
TARGET_SR = 24000
audio_tensors = []
for fname in os.listdir(AUDIO_DIR):
if fname.endswith(".wav"):
path = os.path.join(AUDIO_DIR, fname)
# librosa 直接解 mp3(不依赖 torchcodec)
wav, sr = librosa.load(path, sr=TARGET_SR, mono=True)
# 转成 torch tensor: [1, T]
wav = torch.from_numpy(wav).unsqueeze(0)
audio_tensors.append(wav)
print(f"Loaded {len(audio_tensors)} audios.")
for i, wav in enumerate(audio_tensors[:5]):
duration = wav.shape[1] / TARGET_SR
print(f"Audio {i+1}: shape={wav.shape}, duration={duration:.2f}s")
输出:
Loaded 65 audios.
Audio 1: shape=torch.Size([1, 23270772]), duration=969.62s
Audio 2: shape=torch.Size([1, 6418527]), duration=267.44s
Audio 3: shape=torch.Size([1, 4669586]), duration=194.57s
Audio 4: shape=torch.Size([1, 7503739]), duration=312.66s
Audio 5: shape=torch.Size([1, 33848683]), duration=1410.36s
同样可以用 torchaudio.load 加载。下面这个 Cell 可选。
加载音频的同时,做如下两个处理:
一、转单声道(如果有立体声 → 单声道) - 用 torchaudio.load() 读出来的音频可能是: - 单声道:[1, N] - 双声道:[2, N](有些 .wav 文件可能是立体声(双声道)。)
因此需要统一输入的音频格式 同时减少计算量
wav.mean 对两个声道取平均,把 stereo 混合成 mono
二、重采样(Resample)
采样率(Sample Rate)表示:每秒采样多少个数据点
很多模型需要固定采样率训练,如果输入采样率不同,频谱分布会错位
- 16kHz 语音模型
- 24kHz 音乐模型
- 44.1kHz 音频模型
另外,采样率 控制 频率上限
最大可表达频率:最高频率 = 采样率 / 2
- 48000 Hz → 最高 24kHz
- 24000 Hz → 最高 12kHz
如果任务不需要高频,降采样可以减小数据量;降低计算成本
import os
import torch
import torchaudio
AUDIO_DIR = "maestro_2004_wav/2004"
TARGET_SR = 24000
audio_tensors = []
for fname in os.listdir(AUDIO_DIR):
if fname.endswith(".wav"):
path = os.path.join(AUDIO_DIR, fname)
try:
wav, sr = torchaudio.load(path)
# 转单声道
if wav.shape[0] > 1:
wav = wav.mean(dim=0, keepdim=True)
# 重采样
if sr != TARGET_SR:
resampler = torchaudio.transforms.Resample(sr, TARGET_SR)
wav = resampler(wav)
audio_tensors.append(wav)
except Exception as e:
print(f"Failed to load {fname}: {e}")
print(f"Loaded {len(audio_tensors)} audios.")
输出:
Loaded 65 audios.
torch.Size([1, 4404480]) 表示 Audio 1 总共有 4404480 个采样点。(每秒有 24000 个采样点)
这里讲 duration 计算方法:它就是采样点数 ÷ 采样率。
- 采样率 sample_rate = 24000(24 kHz)就是“每秒有 24000 个 sample”
- 这 4404480 个数中,每一个数 = 1 个采样点(sample)
所以:时长 duration (s) = num_samples / sample_rate = 4404480 / 24000 = 183.52(秒)
3、EnCodec 的量化器是 RVQ(Residual Vector Quantization)
EnCodec 里量化器具体干嘛?
量化器做的事就是:
把 latent 向量(最初的连续向量),用 codebook 里的向量来近似替换
from encodec import EncodecModel
model = EncodecModel.encodec_model_24khz()
quantizer = model.quantizer
codebooks = [layer.codebook for layer in quantizer.vq.layers]
print([cb.shape for cb in codebooks])
输出:
[torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128]), torch.Size([1024, 128])]
每一层量化器对应每一本 codebook。共 31 层量化器则有 31 本 codebook。
可以看到,每一本 codebook 的形状都是:1024 个音;每个音的向量表示是 128 维。
下面看一下随机一个 codebook 的音的向量表示具体长什么样:
from encodec import EncodecModel
model = EncodecModel.encodec_model_24khz()
quantizer = model.quantizer
layer = quantizer.vq.layers[0] # 先看第 1 个 RVQ layer
codebook = layer.codebook # shape: (num_codes, 128)
print(codebook[:1]) # 看前 1 个 128 维向量
输出:
tensor([[ 5.3395, 13.1336, -3.3514, -0.5637, 3.2171, 11.4735, -8.4687,
0.4034, 0.7212, 8.6175, 5.7454, -1.2644, 2.0950, -10.3753,
-0.2346, -12.8185, -6.0519, 6.8965, -0.8932, 0.3201, -12.9295,
-4.5874, 1.0253, -3.0017, 6.2650, 1.6609, 13.7952, -4.4926,
-9.5459, -2.4131, -9.9515, 0.5068, 2.3795, -8.9276, 2.6603,
-4.5152, 2.6780, -11.3535, -1.2518, 4.9170, 3.6355, 1.2266,
18.9035, 5.1384, 1.8919, -1.1213, -4.3447, 6.0878, 1.4327,
-4.1681, -8.1615, -8.1800, 0.3895, 5.5438, -9.7784, 2.4889,
-2.7340, -8.0559, -4.9292, -0.4302, -11.3153, -6.0190, 0.1818,
-3.4706, -11.1346, -4.3844, 2.1268, -2.7332, 0.0789, -0.6986,
-3.0472, -6.6059, -1.3161, 1.0986, 0.2661, -11.3157, -3.3526,
-1.5227, -0.0636, -2.2600, -1.9390, -7.0191, -10.3219, 7.9803,
5.6771, -1.0727, 8.7551, -7.5301, 0.9749, 0.5898, 12.7960,
3.7519, -0.7389, 6.8869, -11.4781, 7.0113, 3.0283, 20.7458,
-1.8931, 6.5462, 7.0486, 13.2628, 5.7293, 1.9618, -12.3246,
-3.3307, -0.7211, -6.1105, 1.1360, 3.0271, -13.1079, -5.0016,
-6.7679, 0.6489, -4.9507, -1.6126, 8.9500, -0.5464, 4.2155,
-4.8056, -0.6793, 7.9327, 5.2310, 0.3282, 2.5067, 2.2543,
-4.5506, 3.7425]])
接下来,多个 RVQ layer 一起看(EnCodec 是多层量化)
输出:每层第一个向量的前 8 维
for l, layer in enumerate(quantizer.vq.layers):
cb = layer.codebook
print(f"第 {l} 级量化器: shape={cb.shape}")
print(f"取这级 codebook 中的第一个向量:{cb[0][:8]}...直到 128 维"+"\n") # 每层第一个向量的前 8 维
输出:
第 0 级量化器: shape=torch.Size([1024, 128])
取这级 codebook 中的第一个向量:tensor([ 5.3395, 13.1336, -3.3514, -0.5637, 3.2171, 11.4735, -8.4687, 0.4034])...直到 128 维
第 1 级量化器: shape=torch.Size([1024, 128])
取这级 codebook 中的第一个向量:tensor([-0.5989, -0.5179, 0.0062, 0.3614, 0.4851, 0.4453, 0.7357, 0.3656])...直到 128 维
...
第 31 级量化器: shape=torch.Size([1024, 128])
取这级 codebook 中的第一个向量:tensor([ 3.2981, -0.2873, 0.8708, 1.2206, 0.6484, -1.2183, 0.2486, 0.9563])...直到 128 维
我们会发现不同 layer 的 codebook 分布是不一样的,这也是 RVQ 的精髓之一。
4、编码 Encode:encoder + quantizer(RVQ)
音频 → token(离散化)并保存 .pt 文件
1、模型加载 - 加载 24kHz 版本的 Encodec 模型 - 设置压缩比为 3 kbps(带宽越小,压缩越强 音质越差) - 移动模型到 GPU;eval() 关闭 dropout / 训练行为
2、遍历每首音频,切块处理,每次切 10 秒。
3、Encodec 模型编码音频 - 将音频张量 变形 [1, 1, T],符合 Encodec 输入格式。 - 将音频 归一化,把振幅缩放到 [-1,1] - model.encode(chunk) 对音频进行 编码,Encodec 返回每一个“10s 音频”的编码后矩阵,全是 token 编号。
4、保存元数据 为 .pt 文件。
from encodec import EncodecModel
import torch
import os
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")
model = EncodecModel.encodec_model_24khz()
model.set_target_bandwidth(3.0)
model = model.to(device)
model.eval()
CHUNK_SIZE = 24000 * 10 # 10 秒
os.makedirs("maestro_tokens_3kbps", exist_ok=True)
total_chunks = 0
for audio_idx, wav in enumerate(audio_tensors):
song_tokens = []
total_len = wav.shape[1]
num_chunks = total_len // CHUNK_SIZE
print(f"\nAudio {audio_idx + 1}/{len(audio_tensors)}")
for chunk_idx, start in enumerate(range(0, total_len, CHUNK_SIZE)):
chunk = wav[:, start:start + CHUNK_SIZE]
if chunk.shape[1] < CHUNK_SIZE:
continue
with torch.no_grad():
chunk = chunk.unsqueeze(0).float().to(device)
chunk = chunk / (chunk.abs().max() + 1e-9)
encoded = model.encode(chunk)
codes, scale = encoded[0] # [1, K, T]
codes = codes.squeeze(0).cpu() # [K, T]
song_tokens.append(codes)
total_chunks += 1
print(f"codes shape: {codes.shape}")
save_data = {
"tokens": song_tokens,
"num_chunks": len(song_tokens),
"bandwidth": 3.0,
"sample_rate": 24000,
"chunk_size": CHUNK_SIZE,
"music_id": audio_idx
}
save_path = f"maestro_tokens_3kbps/music_{audio_idx:04d}.pt"
torch.save(save_data, save_path)
print(f"Saved {save_path}")
print(f"\nTotal token chunks: {total_chunks}")
输出:
Using device: cuda
Audio 1/65
codes shape: torch.Size([4, 750])
Saved maestro_tokens_3kbps/music_0000.pt
Audio 2/65
codes shape: torch.Size([4, 750])
Saved maestro_tokens_3kbps/music_0001.pt
...
Audio 65/65
codes shape: torch.Size([4, 750])
Saved maestro_tokens_3kbps/music_0064.pt
Total token chunks: 4215
5、保存 index
import json
index = {
"num_music": len(audio_tensors),
"bandwidth": 3.0,
"sample_rate": 24000,
"chunk_size": CHUNK_SIZE,
"music": [
{"id": i, "file": f"music_{i:04d}.pt"}
for i in range(len(audio_tensors))
]
}
with open("maestro_tokens_3kbps/index.json", "w") as f:
json.dump(index, f, indent=2)
已保存 index,本章结束