Skip to content

4.2 MusicGen 开源代码分析

首先我们在 MusicGen GitHub 项目中找到 models/musicgen.py 源码。

它负责加载模型、设置生成参数、准备 conditioning、调用 LM、拼接分段生成、解码为音频。

而实现 attention、实现 transformer 、实现 loss 、实现 CFG 控制强度等等这些主要的底层计算,可以通过以下文件找到源码:

  • models/lm.py
  • modules/transformer.py
  • models/encodec.py
  • modules/conditioners.py
  • models/genmodel.py

一、import

根据 musicgen.py 这个文件 最初的import部分,我们可以看到它导入了哪些方法和函数。

from .encodec import CompressionModel
from .genmodel import BaseGenModel
from .lm import LMModel
from .builders import get_debug_compression_model, get_debug_lm_model
from .loaders import load_compression_model, load_lm_model
from ..data.audio_utils import convert_audio
from ..modules.conditioners import ConditioningAttributes, WavCondition, StyleConditioner

从而也可以知道它的源码结构大概是:

musicgen.py
   │
   ├── BaseGenModel 父类
   ├── LMModel 语言模型:自回归预测音频token。
   │      └── transformer.py
   │             └── attention
   │
   ├── CompressionModel 压缩模型,其实就是编码音频token。
   │      └── encodec implementation 音频编码器
   │
   └── Conditioners 调度器。看prompt是文字就处理文字给模型、看输入是旋律就处理旋律给模型。

二、定义 MusicGen 的核心模型类

MusicGen 继承自 BaseGenModel 底层通用生成框架,MusicGen 是对它的具体实现 + 参数封装

class MusicGen(BaseGenModel):
    def __init__(self, name: str, compression_model: CompressionModel, lm: LMModel,
                 max_duration: tp.Optional[float] = None):
        super().__init__(name, compression_model, lm, max_duration)
        self.set_generation_params(duration=15)  # default duration

这段代码定义了 MusicGen 的核心模型类,并在初始化时设置了一个默认生成时长 15 秒。

1、输入参数包括 模型名、压缩模型是哪一个、语言模型是哪一个、可选:生成音频时长

2、super().init 调用父类构造函数:把所有核心组件交给 BaseGenModel 去初始化。

父类 BaseGenModel 做什么?

  • 保存模型名
  • 保存压缩模型
  • 保存语言模型
  • 设置最大生成长度

我们后面如何使用这个函数?

model = MusicGen(
    name="musicgen-small",
    compression_model=encodec_model,
    lm=transformer_model
)

像这样,实现了一个MusicGen类,命名为model。即可使用这个MusicGen类(就是这个model)来生成音乐:

audio = model.generate(["a happy piano melody"])

当然它的内部流程是:(可以自己去找 BaseGenModel 类的定义源码,在 genmodel.py 里)

  • duration = 15 秒(默认)
  • LM 生成对应 15 秒长度的离散 token
  • CompressionModel 解码成音频
  • 返回 tensor 返回张量

三、get_pretrained 自动下载并加载对应的预训练模型(语言模型 + 音频压缩模型)

并返回一个可直接生成音乐的 MusicGen 实例。

@staticmethod
    def get_pretrained(name: str = 'facebook/musicgen-melody', device=None):
        """
        Return pretrained model, we provide four models:
        - facebook/musicgen-small (300M), text to music,
          # see: https://huggingface.co/facebook/musicgen-small
        - facebook/musicgen-medium (1.5B), text to music,
          # see: https://huggingface.co/facebook/musicgen-medium
        - facebook/musicgen-melody (1.5B) text to music and text+melody to music,
          # see: https://huggingface.co/facebook/musicgen-melody
        """
        # 中间代码略:选 GPU cuda
        # 中间代码略:if name == 'debug'有一个 debug 模式(单元测试专用)
        # 中间代码略:兼容旧模型名字

        lm = load_lm_model(name, device=device) # 加载语言模型(LM)

        compression_model = load_compression_model(name, device=device) # 加载压缩模型(音频解码器)

        if 'self_wav' in lm.condition_provider.conditioners: # 对于 musicgen-melody 这类支持 melody 输入的模型
            lm.condition_provider.conditioners['self_wav'].match_len_on_eval = True # 在推理阶段对齐长度
            lm.condition_provider.conditioners['self_wav']._use_masking = False # 不使用训练时的 mask 机制

        return MusicGen(name, compression_model, lm) # 返回最终模型

1、@staticmethod 这是一个 静态方法。

因此我们可以直接调用 而不需要先实例化类 MusicGen:

model = MusicGen.get_pretrained("facebook/musicgen-small")

2、load_lm_model() 加载语言模型(LM)

3、load_compression_model( 加载压缩模型(音频解码器)

4、特殊处理 self_wav 条件器:(对于 musicgen-melody 这类支持 melody 输入的模型)

在推理阶段对齐长度,不使用训练时的 mask 机制

目的是:让 melody 条件输入在推理时更稳定。

5、返回最终模型

return MusicGen(name, compression_model, lm)

现在我们就得到了一个:

已加载权重、已准备好推理、已放到 GPU 的模型实例。


四、设置 模型 在 生成音乐时的 采样策略和控制参数。

    def set_generation_params(self, use_sampling: bool = True, top_k: int = 250,
                              top_p: float = 0.0, temperature: float = 1.0,
                              duration: float = 30.0, cfg_coef: float = 3.0,
                              cfg_coef_beta: tp.Optional[float] = None,
                              two_step_cfg: bool = False, extend_stride: float = 18,):
        """
        Set the generation parameters for MusicGen.
        """
        # 略:assert extend_stride < self.max_duration(滑动扩展时长 小于 一次最大生成时长)

        self.extend_stride = extend_stride    # 每次扩展 多少秒
        self.duration = duration              # 总生成时长 多少秒
        self.generation_params = {            # 保存参数(就是采样策略)这些参数会在 LM 采样时使用。
            'use_sampling': use_sampling,  # 随机采样,而不是更稳定的 argmax(最大概率)
            'temp': temperature,           # 温度 控制随机性
            'top_k': top_k,                # 每一步只在概率最高的 250 个 token 里采样。
            'top_p': top_p,                # 选前面概率累计达到 0.9(就是概率90%)的 token 集合。

            'cfg_coef': cfg_coef,            # 用于 text to music 的 MusicGen
            'cfg_coef_beta': cfg_coef_beta,  # 用于 melody + text to music 的 MusicGen

            'two_step_cfg': two_step_cfg,  # 是否分两次 forward 做 cfg,更稳定些?
        }

1、滑动扩展时长 extend_stride 到底是什么?

它会有用在“所需生成 超过 一次生成最大长度”时:

MusicGen 一次最多能生成 self.max_duration 秒(例如 30 秒)。

如果你想生成更长的音乐,比如 60 秒:

模型会采用一种 滑动扩展生成 的方式。

2、cfg:分类器自由引导系数

cfg_coef=3.0 控制 文本对生成的“控制强度”

公式是:

output = uncond + cfg * (cond - uncond)
输出   = 无条件 + cfg * (条件 - 无条件)

cfg 越大,越贴近文本描述,但可能音质变差

--

cfg_coef_beta = None 控制 旋律+文本对生成的“控制强度”

用于:melody 模型(melody + text to music 的 MusicGen)

作用:同时平衡 text 条件 和 audio 条件,只有在 text+melody 模型里才用。

示例输入1:默认参数

model.set_generation_params()

则实际上内部保存:

{
 'use_sampling': True,
 'temp': 1.0,
 'top_k': 250,
 'top_p': 0.0,
 'cfg_coef': 3.0,
 'two_step_cfg': False,
 'cfg_coef_beta': None,
}

按照 这些默认参数 配置模型,生成 30 秒音乐。

示例输入2:换换参数,生成更有创造性的音乐

model.set_generation_params(
    temperature=1.5,
    top_k=500,
    duration=20
)

效果:更随机 更实验风格 生成 20 秒

这里再讲一下 CFG:

CFG 是用于 Diffusion 模型的。但是 MusicGen 作为一个完全使用 Transformer 自回归的模型,为什么用 CFG?

这就是 CFG for 自回归模型。

“classifier-free guidance for autoregressive models”

意思就是:在自回归生成模型里,用无条件输出和有条件输出做差值放大,来增强条件控制能力。让模型更听话。

cond / uncond 是啥?“有条件时的 logits 分数矩阵” 和 “无条件时的 logits 分数矩阵”

cond logits 就是 “带提示词输入的得到的 logits 分数矩阵” 有条件 logits

带文本输入:“epic orchestral battle music”

模型会倾向生成史诗交响风。

经过模型得到 logits 分数矩阵:每个 token 的概率多大的分数 cond_logits

uncond logits 无条件 logits

输入空文本:"" (什么都不写)

模型只根据“音乐本身的统计规律”生成。

得到这次的 logits 分数矩阵 uncond_logits

MusicGen 的做法是这样:

在 sampling 时:

输入带文本 → 得到 cond logits 输入空文本 → 得到 uncond logits

做:

logits = uncond + scale * (cond - uncond)

再去 softmax + sampling

  • uncond = “自然生成”
  • cond = “受文本影响的生成”
  • cond - uncond = “文本带来的改变方向”

所以:这是在“放大文本的影响”

这就叫 “CFG for 自回归模型”

这个 CFG 的值比如说 3.0,那就是 指导强度(guidance strength)是 3.0,而 (cond - uncond) 是 指导方向。

logits = uncond + scale * (cond - uncond)

这就是:根据 CFG 的值是多大,来放大“文本带来的改变方向”。


五、模型的输入预处理函数

@torch.no_grad()
    def _prepare_tokens_and_attributes(
            self,
            descriptions: tp.Sequence[tp.Optional[str]],
            prompt: tp.Optional[torch.Tensor],
            melody_wavs: tp.Optional[MelodyList] = None,
    ) -> tp.Tuple[tp.List[ConditioningAttributes], tp.Optional[torch.Tensor]]:
        """
        准备模型输入数据。

        参数:
            descriptions (字符串列表): 用作文本条件控制的字符串列表。
            prompt (torch.Tensor): 用于音频续接生成的一批波形数据。
            melody_wavs (torch.Tensor, 可选): 用作旋律条件控制的一批波形数据
                (默认值为 None)。
        """
        attributes = [
            ConditioningAttributes(text={'description': description})
            for description in descriptions]

        if melody_wavs is None:
            for attr in attributes:
                attr.wav['self_wav'] = WavCondition(
                    torch.zeros((1, 1, 1), device=self.device),
                    torch.tensor([0], device=self.device),
                    sample_rate=[self.sample_rate],
                    path=[None])
        else:
            if 'self_wav' not in self.lm.condition_provider.conditioners:
                raise RuntimeError("This model doesn't support melody conditioning. "
                                   "Use the `melody` model.")
            assert len(melody_wavs) == len(descriptions), \
                f"number of melody wavs must match number of descriptions! " \
                f"got melody len={len(melody_wavs)}, and descriptions len={len(descriptions)}"
            for attr, melody in zip(attributes, melody_wavs):
                if melody is None:
                    attr.wav['self_wav'] = WavCondition(
                        torch.zeros((1, 1, 1), device=self.device),
                        torch.tensor([0], device=self.device),
                        sample_rate=[self.sample_rate],
                        path=[None])
                else:
                    attr.wav['self_wav'] = WavCondition(
                        melody[None].to(device=self.device),
                        torch.tensor([melody.shape[-1]], device=self.device),
                        sample_rate=[self.sample_rate],
                        path=[None],
                    )

        if prompt is not None:
            if descriptions is not None:
                assert len(descriptions) == len(prompt), "Prompt and nb. descriptions doesn't match"
            prompt = prompt.to(self.device)
            prompt_tokens, scale = self.compression_model.encode(prompt)
            assert scale is None
        else:
            prompt_tokens = None
        return attributes, prompt_tokens

1、descriptions 是文本描述(Text Prompt),比如:

descriptions = [
    "A calm piano melody with soft strings",
    "Upbeat electronic dance music with strong bass"
]

第一段音乐:温柔钢琴 + 弦乐 第二段音乐:电子舞曲 + 重低音

会把description列表的这些文字转换成 text conditioning文本条件给到模型:

ConditioningAttributes(text={'description': description})

模型会用 description 作为“文本控制条件”。

2、prompt 是音频续写的开头音频(Audio Prompt)

给模型一段已有音频,让它接着往下生成

假设我们有 5 秒音频:

prompt.shape = (2, 1, 160000)

batch = 2 两段音频 ;单声道;160000 采样点(假设 32kHz ≈ 5秒),那么:

第一段音频:钢琴开头 第二段音频:鼓点开头 模型会“继续写”

如果没有开头音频?那就不续写。

prompt = None

说明:从零开始生成

3、melody_wavs 是 旋律条件(Melody Conditioning)

本质是一个 waveform 列表

它用于:提供旋律,让模型围绕这个旋律生成音乐

注意:只有 melody 模型才支持它。

比如:

melody_wavs = [
    torch.randn(32000),  # 第一段旋律 1 秒
    torch.randn(48000)   # 第二段旋律 1.5 秒
]

意思是:

第一首歌围绕 melody_wavs[0] 的旋律创作 第二首歌围绕 melody_wavs[1] 的旋律创作

注意 这个旋律是完全按照给定音频来,这是MusicGen-Melody模型。

至于MusicGen-Style模型,是参考给定音频的风格,生成相似风格的音乐,而不是完全按照给定音频的旋律来。

举一个完整调用例子:

# 按 文本 和 参考旋律 生成的例子:有 text descriptions文本描述,有“旋律参考音频” 没有“需要续写的音频”

descriptions = [
    "Epic orchestral trailer music"
]

prompt = None

melody_wavs = [
    torch.randn(32000)
]

意思是根据给定旋律,生成一段史诗管弦乐

再举一个续写例子

# 续写:有 text descriptions,audio prompt,没有“旋律参考音频”

descriptions = [
    "Lo-fi hip hop beat"
]

prompt = torch.randn(1, 1, 160000)  # 已有 5 秒音频

melody_wavs = None

意思是接着这 5 秒 lo-fi 音乐继续生成

再举一个例子:如果三个都用?

descriptions = ["Jazz piano trio"]
prompt = 已有钢琴片段
melody_wavs = 给定旋律

模型会综合“文本风格、开头音频、指定旋律”一起生成。

输入输出示例(续写) :只有文本 + audio prompt(无 melody)

输入

descriptions = [
    "A happy pop song with guitar",
    "Sad piano ballad"
]

prompt = torch.randn(2, 1, 32000)  # 2条音频,每条1秒(假设16kHz采样)

melody_wavs = None

函数内部发生什么?

第一步:构造 attributes

attributes = [
    ConditioningAttributes(text={'description': "A happy pop song with guitar"}),
    ConditioningAttributes(text={'description': "Sad piano ballad"})
]

此时 attributes 结构大概是:

[
  {
    text: {"description": "A happy pop song with guitar"},
    wav: {}
  },
  {
    text: {"description": "Sad piano ballad"},
    wav: {}
  }
]

第二步:因为 melody_wavs=None,所以给每个样本加一个“空melody”:

attr.wav['self_wav'] = WavCondition(
    torch.zeros((1, 1, 1)),
    torch.tensor([0]),
    sample_rate=[16000],
    path=[None]
)

所以现在 attributes 变成:

[
  {
    text: {"description": "A happy pop song with guitar"},
    wav: {
      "self_wav": WavCondition(
          wav = tensor([[[0.]]]),
          length = tensor([0]),
          sample_rate = [16000],
          path = [None]
      )
    }
  },
  {
    text: {"description": "Sad piano ballad"},
    wav: {
      "self_wav": WavCondition(
          wav = tensor([[[0.]]]),
          length = tensor([0]),
          sample_rate = [16000],
          path = [None]
      )
    }
  }
]

第三步:处理 prompt

prompt = prompt.to(self.device)
prompt_tokens, scale = self.compression_model.encode(prompt)

假设 encode 后:

prompt_tokens.shape = (2, 50)  # 每条音频被压缩成50个token
scale = None

最终输出

attributes, prompt_tokens

1、attributes:

长度=2 的 ConditioningAttributes 列表,每个包含:

text.description wav.self_wav (空melody占位)

2、prompt_tokens:音频张量 Tensor shape: (2, 50)


六、支持长音频生成的“分段滑动生成策略”

def _generate_tokens(self, attributes: tp.List[ConditioningAttributes],
                         prompt_tokens: tp.Optional[torch.Tensor], progress: bool = False) -> torch.Tensor:

        """
        基于音频提示信息和/或条件信息生成离散的音频令牌。

        参数:
            attributes (ConditioningAttributes 列表): 用于生成任务的条件信息(文本/旋律)。
            prompt_tokens (torch.Tensor, 可选): 用于音频续接生成的音频提示令牌。
            progress (bool, 可选): 用于显示生成过程进度的标识位,默认值为 False。
        返回值:
            torch.Tensor: 生成的音频数据,形状为 [B, C, T],其中 T 由生成参数定义。
        """

        total_gen_len = int(self.duration * self.frame_rate)
        max_prompt_len = int(min(self.duration, self.max_duration) * self.frame_rate)
        current_gen_offset: int = 0

        def _progress_callback(generated_tokens: int, tokens_to_generate: int):
            generated_tokens += current_gen_offset
            if self._progress_callback is not None:
                self._progress_callback(generated_tokens, tokens_to_generate)
            else:
                print(f'{generated_tokens: 6d} / {tokens_to_generate: 6d}', end='\r')

        if prompt_tokens is not None:
            assert max_prompt_len >= prompt_tokens.shape[-1], \
                "Prompt is longer than audio to generate"

        callback = None
        if progress:
            callback = _progress_callback

情况一:总时长 小于等于 单次最大时长

直接调用 lm.generate

情况二:总时长 大于 单次最大时长

分段生成、重叠、滑动窗口拼接

比如举一个例子:

max_duration = 30 秒
duration = 70 秒
extend_stride = 10 秒
frame_rate = 50 token/s
  • max_gen_len = 1500 tokens
  • stride_tokens = 500 tokens
  • 重叠 1000 token

每一轮输入长度 = 1500,正前进 = 500,重叠 = 1000

而 overlap 是防止音乐边界断裂。


补充:为什么直接 lm.generate(max_gen_len=4500) 会报错?

Transformer 的限制来自三个地方:

  • 位置编码长度
  • attention mask 尺寸
  • KV cache 分配大小

只要其中一个是固定的,就不能无限增长。


七、最后一个步骤:文本 + 旋律条件生成音乐

   def generate_with_chroma(self, descriptions: tp.List[str], melody_wavs: MelodyType,
                             melody_sample_rate: int, progress: bool = False,
                             return_tokens: bool = False) -> tp.Union[torch.Tensor,
                                                                      tp.Tuple[torch.Tensor, torch.Tensor]]:
        if isinstance(melody_wavs, torch.Tensor):
            if melody_wavs.dim() == 2:
                melody_wavs = melody_wavs[None]
            if melody_wavs.dim() != 3:
                raise ValueError("Melody wavs should have a shape [B, C, T].")
            melody_wavs = list(melody_wavs)
        else:
            for melody in melody_wavs:
                if melody is not None:
                    assert melody.dim() == 2, "One melody in the list has the wrong number of dims."

        melody_wavs = [
            convert_audio(wav, melody_sample_rate, self.sample_rate, self.audio_channels)
            if wav is not None else None
            for wav in melody_wavs]
        attributes, prompt_tokens = self._prepare_tokens_and_attributes(descriptions=descriptions, prompt=None,
                                                                        melody_wavs=melody_wavs)
        assert prompt_tokens is None
        tokens = self._generate_tokens(attributes, prompt_tokens, progress)
        if return_tokens:
            return self.generate_audio(tokens), tokens
        return self.generate_audio(tokens)

这一部分代码即为结合之前定义的各个函数,组成完整的生成流程。主要部分我们只需要看:

        melody_wavs = [
            convert_audio(wav, melody_sample_rate, self.sample_rate, self.audio_channels)
            if wav is not None else None
            for wav in melody_wavs]
        attributes, prompt_tokens = self._prepare_tokens_and_attributes(descriptions=descriptions, prompt=None,
                                                                        melody_wavs=melody_wavs)
        assert prompt_tokens is None
        tokens = self._generate_tokens(attributes, prompt_tokens, progress)
        if return_tokens:
            return self.generate_audio(tokens), tokens
        return self.generate_audio(tokens)

首先:重采样 + 声道转换

假设用户输入音频:

  • 44.1kHz
  • stereo

模型内部会执行:

  • 32kHz
  • mono

自动转换为这样单声道 32KHz的音频。

接下来:构造 ConditioningAttributes。 调用我们之前的 _prepare_tokens_and_attributes 函数:构造 text conditioning,构造 wav conditioning,但不生成 prompt_tokens。

(这里说明当前不支持音频续写,必须纯文本 + 旋律生成)

然后:真正生成 token。 调用我们之前的 _generate_tokens(attributes, prompt_tokens, progress) 函数:注意这里真正经过 Transformer、Attention 流程。

最后:解码为音频。 调用 generate_audio(tokens) 解码我们最终生成的音频。输出:[B, C, T]


musicgen.py 完整流程分析完毕。大家可以自行去看函数细节的源码:

  • models/lm.py(LMModel.generate 和 LMModel.forward)
  • modules/transformer.py
  • models/encodec.py
  • modules/conditioners.py
  • models/genmodel.py