3.4.1. 基础结构演进

3.4.1.1. Shared-Bottom

Shared-Bottom 模型作为多目标建模的奠基性架构,采用“共享地基+独立塔楼”的设计范式。其核心结构包含两个关键组件:

  • 共享底层(Shared Bottom):所有任务共用同一组特征转换层,负责学习跨任务的通用特征表示;

  • 任务特定塔(Task-Specific Towers):每个任务拥有独立的顶层网络,基于共享表示学习任务特定决策边界。

../../_images/shared-bottom.png

图3.4.1 Shared-Bottom

这种架构的数学表达可描述为:

(3.4.1)\[\hat{y}_t = f_t(W_t \cdot g(W_s \mathbf{x}))\]

其中 \(\mathbf{W}_s\) 为共享层参数,\(g(\cdot)\) 为共享特征提取函数,\(f_t(\cdot)\) 为任务 \(t\) 的预测函数。其设计哲学建立在任务同质性假设上:不同任务共享相同的底层特征空间,仅需在顶层进行任务适配。

Shared-Bottom 模型在效率与泛化之间实现了良好的平衡,其核心优势主要体现在以下几点。

  1. 在参数效率方面,共享层占据了模型大部分的参数量这显著降低了模型的总参数量。

  2. 共享层具有正则化效应,它如同一个天然的正则化器,通过强制任务共用特征表示,有效防止了单个任务出现过拟合的情况。

  3. 在知识迁移方面,当任务之间存在潜在的相关性时,例如视频的点击率与完播率,共享层能够学习到通用的模式,从而提升小样本任务的泛化能力。

然而,Shared-Bottom 模型也存在一个致命缺陷,即负迁移现象。当任务之间存在本质上的冲突时,该模型的硬共享机制会引发负迁移问题。从机制本质上来看,共享层的梯度更新方向是由所有任务共同决定的,一旦任务目标之间出现冲突,参数优化就会陷入方向性的矛盾。

在一些典型场景中,例如电商平台同时优化“点击率”与“客单价”时,低价商品可能会推动点击率的提升,但同时却抑制了客单价的增长;又如内容平台在平衡“内容消费深度”与“广告曝光量”时,深度阅读行为往往与广告点击行为呈负相关。

从数学角度来解释,假设任务\(i\)与任务\(j\)的损失梯度分别为\(\nabla L_i\)\(\nabla L_j\),当\(\nabla L_i \cdot \nabla L_j\) < 0时,共享层参数更新就会产生内在的冲突。这种冲突使得模型在处理矛盾任务时呈现出“零和博弈”的特性,即提升某一目标的性能往往需要以牺牲另一目标为代价,我们一般也称这类问题为跷跷板问题。

Shared-Bottom代码实践

shared-bottom模型构建代码如下,先组装输入到shared-bottom网络中的特征dnn_inputs, 经过一个shared-bottom DNN网络,遍历创建各个任务独立的DNN塔,最后输出多个塔的预估值用于计算Loss。

def build_shared_bottom_model(
        feature_columns,
        task_name_list,
        share_dnn_units=[128, 64],
        task_tower_dnn_units=[128, 64],
        ):
    input_layer_dict = build_input_layer(feature_columns)
    # 构建特征嵌入表字典
    group_embedding_feature_dict = build_group_feature_embedding_table_dict(feature_columns, input_layer_dict, prefix="embedding/")

    # 连接不同组的嵌入向量作为各个网络的输入
    dnn_inputs = concat_group_embedding(group_embedding_feature_dict, 'dnn')

    # 构建DNN层
    shared_bottom_feature = DNNs(share_dnn_units)(dnn_inputs)

    task_output_list = []
    for task_name in task_name_list:
        task_output_logit = DNNs(task_tower_dnn_units + [1])(shared_bottom_feature)
        task_output_prob = PredictLayer(name="task_" + task_name)(task_output_logit)
        task_output_list.append(task_output_prob)

    # 构建模型
    model = tf.keras.Model(inputs=list(input_layer_dict.values()), outputs=task_output_list)
    return model

完整训练流程如下

1. 导入需要的包

import sys
import funrec
from funrec.utils import build_metrics_table

config = funrec.load_config('shared_bottom')

2. 数据和特征准备

train_data, test_data = funrec.load_data(config.data)
feature_columns, processed_data = funrec.prepare_features(config.features, train_data, test_data)

3. 模型构建及训练

model = funrec.train_model(config.training, feature_columns, processed_data)

4. 模型评估

metrics = funrec.evaluate_model(model, processed_data, config.evaluation, feature_columns)
print(build_metrics_table(metrics))
+----------------+---------------+-----------------+-------------+-----------------+----------------+------------------+--------------+------------------------+-----------------------+-------------------------+
|   auc_is_click |   auc_is_like |   auc_long_view |   auc_macro |   gauc_is_click |   gauc_is_like |   gauc_long_view |   gauc_macro |   valid_users_is_click |   valid_users_is_like |   valid_users_long_view |
+================+===============+=================+=============+=================+================+==================+==============+========================+=======================+=========================+
|         0.5965 |        0.4325 |           0.438 |       0.489 |          0.5754 |         0.4409 |           0.4469 |       0.4877 |                    928 |                   530 |                     925 |
+----------------+---------------+-----------------+-------------+-----------------+----------------+------------------+--------------+------------------------+-----------------------+-------------------------+

3.4.1.2. MMOE

../../_images/mmoe.png

图3.4.2 MMOE模型结构

针对Shared-Bottom对相关性低的多个任务出现负迁移的现象,OMOE将底层共享的一个Shared-Bottom模块拆分成了多个Expert,最终OMOE的输出为多个Expert的加权和,本质可以看成是专家网络和全局门控的双层结构。

虽然OMOE通过底层多专家融合的方式提升了特征表征的多样性,从最终的实验结果看,确实可以一定程度上缓解低相关性任务的负迁移问题,但没有彻底解决多任务冲突的问题。因为不同任务反向传播的梯度还是会直接影响底层专家网络的学习。

为了进一步缓解多任务冲突,MMOE为每个任务配备专属门控网络,实现了门控从“全局共享”升级为“任务自适应”的方式。MMoE的数学表达式可以表示为:

(3.4.2)\[\begin{split}\begin{aligned} \mathbf{e}_k &= f_k(\mathbf{x}) \\ g_t(\mathbf{x}) &= \text{softmax}(\mathbf{W}_t \mathbf{x}) \\ \mathbf{h}_t &= \sum_{k=1}^K g_{t,k} \cdot \mathbf{e}_k \\ \hat{y}_t &= f_t(\mathbf{h}_t) \end{aligned}\end{split}\]

其中, - \(\mathbf{x}\)表示底层的特征输入

  • \(\mathbf{e}_k\)表示第k个专家网络的输出

  • \(g_t(\mathbf{x})\)表示第\(t\)个任务融合专家网络的门控向量

  • \(\mathbf{h}_t\)表示第\(t\)个任务融合专家网络的输出

  • \(\hat{y}_t\)表示第\(t\)个任务的预测结果

差异化特征融合门控网络\(g_t\)根据任务特性选择专家组合,例如在电商场景,CTR任务门控加权“即时兴趣”“价格敏感”专家,CVR任务门控:侧重“消费能力”“品牌忠诚”专家。

当任务\(i\)\(j\)冲突时,如果模型学的还不错,那么这两个任务对所有Expert的融合权重是有差异的。例如出现了某个\(e_m\)专家,对于任务\(i\)来说非常大,但是对于任务\(j\)来说非常小,那么最终\(e_m\)参数的更新就会被任务\(i\)来主导,起到了一定的梯度隔离的效果。

MMoE代码实践

MMoE模型构建代码如下,先组装输入到MoE网络中的特征dnn_inputs, 然后为每个任务创建一个门控网络输出最终融合Expert的门控向量。最后为每个任务都创建一个任务塔,并且不同任务塔的输入都是对应任务的门控向量和多个Expert融合后的向量。

def build_mmoe_model(
        feature_columns,
        task_name_list,
        expert_nums=4,
        expert_dnn_units=[128, 64],
        gate_dnn_units=[128, 64],
        task_tower_dnn_units=[128, 64],
        ):
    input_layer_dict = build_input_layer(feature_columns)
    # 构建特征嵌入表字典
    group_embedding_feature_dict = build_group_feature_embedding_table_dict(feature_columns, input_layer_dict, prefix="embedding/")

    # 连接不同组的嵌入向量作为各个网络的输入
    dnn_inputs = concat_group_embedding(group_embedding_feature_dict, 'dnn')

    # 创建多个专家
    expert_output_list = []
    for i in range(expert_nums):
        expert_output = DNNs(expert_dnn_units, name=f"expert_{str(i)}")(dnn_inputs)
        expert_output_list.append(expert_output)
    expert_concat = tf.keras.layers.Lambda(lambda x: tf.stack(x, axis=1))(expert_output_list) # (None, expert_num, dims)

    # 定义不同任务的专家门控网络
    task_tower_input_list = []
    for i in range(len(task_name_list)):
        gate_output = DNNs(gate_dnn_units, name=f"task_{str(i)}_gates")(dnn_inputs)
        gate_output = tf.keras.layers.Dense(expert_nums, use_bias=False, activation='softmax', name=f"task_{i}_softmax")(gate_output)
        gate_output = tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, axis=-1))(gate_output) # (None,expert_num, 1)
        gate_expert_output = tf.keras.layers.Lambda(lambda x: x[0] * x[1])([gate_output, expert_concat])
        gate_expert_output = tf.keras.layers.Lambda(lambda x: tf.reduce_sum(x, axis=1, keepdims=False))(gate_expert_output)
        task_tower_input_list.append(gate_expert_output)

    # 不同任务融合多个专家的门控
    task_output_list = []
    for i in range(len(task_name_list)):
        task_output_logit = DNNs(task_tower_dnn_units + [1])(task_tower_input_list[i])
        task_output_prob = PredictLayer(name="task_" + task_name_list[i])(task_output_logit)
        task_output_list.append(task_output_prob)

    # 构建模型
    model = tf.keras.Model(inputs=list(input_layer_dict.values()), outputs=task_output_list)
    return model

完整训练流程如下

1. 导入需要的包,这一步在前面Shared-Bottom的过程已经导入过了

2. 数据和特征准备,由于和Shared-Bottom一样,这里不再赘述

config = funrec.load_config('mmoe')
train_data, test_data = funrec.load_data(config.data)
feature_columns, processed_data = funrec.prepare_features(config.features, train_data, test_data)

3. 构建模型

model = funrec.train_model(config.training, feature_columns, processed_data)

4. 模型效果评估

metrics = funrec.evaluate_model(model, processed_data, config.evaluation, feature_columns)
print(build_metrics_table(metrics))
+----------------+---------------+-----------------+-------------+-----------------+----------------+------------------+--------------+------------------------+-----------------------+-------------------------+
|   auc_is_click |   auc_is_like |   auc_long_view |   auc_macro |   gauc_is_click |   gauc_is_like |   gauc_long_view |   gauc_macro |   valid_users_is_click |   valid_users_is_like |   valid_users_long_view |
+================+===============+=================+=============+=================+================+==================+==============+========================+=======================+=========================+
|         0.6019 |        0.4164 |           0.438 |      0.4855 |          0.5722 |         0.4436 |           0.4456 |       0.4871 |                    928 |                   530 |                     925 |
+----------------+---------------+-----------------+-------------+-----------------+----------------+------------------+--------------+------------------------+-----------------------+-------------------------+

3.4.1.3. PLE

MMoE 通过为每个任务配备专属门控网络,在一定程度上缓解了多任务冲突问题。专属门控网络能够根据任务特性选择专家组合,从而使得不同任务可以关注不同的特征表示。但其架构仍存在一个根本性局限:所有专家对所有任务门控可见。这种“软隔离”设计在实践中仍面临两大挑战:

  1. 负迁移未根除:

    • 干扰路径未切断:即使某个专家(如\(e_m\))被任务\(i\)的门控高度加权而被任务\(j\)的门控忽略,任务\(j\)的梯度在反向传播时仍会流经\(e_m\)(因为\(e_m\)是任务\(j\)门控的可选项)。当任务冲突强烈时,这种“潜在通路”仍可能导致共享表征被污染。

    • 专家角色模糊:MMoE缺乏机制强制专家明确分工。一个专家可能同时承载共享信息和多个任务的特定信息,成为冲突的“重灾区”。尤其在任务相关性低时,这种耦合会加剧负迁移。

  2. 门控决策负担重:

    • 每个任务的门控需要在所有\(K\)个专家上进行权重分配。当专家数量增加(通常需扩大\(K\)以提升模型能力)时,门控网络面临高维决策问题,易导致训练不稳定或陷入次优解。

    • 门控需要“费力”地从包含混杂信息(共享+所有任务特定) 的专家池中筛选有用信息,增加了学习难度。

为解决上述问题,CGC结构被提出,其核心思想是通过硬性结构约束,显式分离共享知识与任务特定知识:

../../_images/cgc.png

图3.4.3 CGC网络结构

专家职责强制分离

  • 共享专家 (C-Experts):一组专家仅负责学习所有任务的共性知识。设其数量为\(M\),输出为\({\mathbf{c}_1, \mathbf{c}_2, ..., \mathbf{c}_M}\)

  • 任务专家 (T-Experts):每个任务\(t\)拥有自己专属的专家组,仅负责学习该任务特有的知识或模式。设任务\(t\)的专属专家数量为\(N_t\),输出为\({\mathbf{t}_t^1, \mathbf{t}_t^2, ..., \mathbf{t}_t^{N_t}}\)

任务专属门控的输入限制

  • 任务\(t\)的门控\(g_t\)的输入被严格限制为:共享专家输出 (\({\mathbf{c}k}{k=1}^M\)) + 本任务专属专家输出 (\({\mathbf{t}t^j}{j=1}^{N_t}\))。

  • 物理切断干扰路径:任务\(t\)的门控完全无法访问其他任务\(s (s \neq t)\)的专属专家\({\mathbf{t}_s^j}\)。同样,任务\(s\)的梯度绝不会更新任务\(t\)的专属专家参数。

CGC门控的计算如下:

(3.4.3)\[g_t(\mathbf{x}) = \text{softmax}\Big(\mathbf{W}_t \cdot \mathbf{x} + \mathbf{b}_t\Big) \quad\]
(3.4.4)\[\mathbf{h}_t = \sum_{k=1}^{M} g_{t,k} \cdot \mathbf{c}_k + \sum_{j=1}^{N_t} g_{t, M+j} \cdot \mathbf{t}_t^j\]
(3.4.5)\[\hat{y}_t = f_t(\mathbf{h}_t)\]

其中:

  • \(\mathbf{W}_t, \mathbf{b}_t\):任务\(t\)门控的参数。

  • \(g_{t,k}\):分配给第\(k\)个共享专家的权重。

  • \(g_{t, M+j}\):分配给任务\(t\)\(j\)个专属专家的权重。

CGC核心代码为:

def cgc_net(
        input_list,
        task_num,
        task_expert_num,
        shared_expert_num,
        task_expert_dnn_units,
        shared_expert_dnn_units,
        task_gate_dnn_units,
        shared_gate_dnn_units,
        leval_name=None,
        is_last=False):
    """cgc结构
    input_list, 每个task都有一个输入,这些task的输入都是共享的,为了方便处理,给每个task都复制了一份
    is_last主要是判断是否是最后一层cgc,如果是的话,就不需要把share部分添加到输出中了
    """

    # 创建每个任务的task expert
    task_expert_list = []
    for i in range(task_num):
        task_i_expert_list = []
        for j in range(task_expert_num):
            expert_dnn = DNNs(task_expert_dnn_units, name=f"{leval_name}_task_{str(i)}_expert_{str(j)}")(input_list[i])
            task_i_expert_list.append(expert_dnn)
        task_expert_list.append(task_i_expert_list)

    # 创建所有任务的shared expert
    shared_expert_list = []
    for i in range(shared_expert_num):
        expert_dnn = DNNs(shared_expert_dnn_units, name=f"{leval_name}_shared_expert_{str(i)}")(input_list[-1])
        shared_expert_list.append(expert_dnn)

    # 创建每个任务的融合门控
    task_gate_list = []
    fusion_expert_num = task_expert_num + shared_expert_num
    for i in range(task_num):
        gate_dnn = DNNs(task_gate_dnn_units, name=f"{leval_name}_task_{str(i)}_gate")(input_list[i])
        gate_dnn = tf.keras.layers.Dense(fusion_expert_num, use_bias=False, activation='softmax')(gate_dnn)
        gate_dnn = tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, axis=-1))(gate_dnn) # (None, gate_num, 1)
        task_gate_list.append(gate_dnn)

    # cgc输出结果
    cgc_output_list = []
    for i in range(task_num):
        cur_experts = task_expert_list[i] + shared_expert_list
        expert_concat = tf.keras.layers.Lambda(lambda x: tf.stack(x, axis=1))(cur_experts) # None, gate_num, dim
        cur_gate = task_gate_list[i]
        task_gate_fusion_dnn = tf.keras.layers.Lambda(lambda x: tf.reduce_sum(x[0] * x[1], axis=1, keepdims=False))([cur_gate, expert_concat]) # None, dim
        cgc_output_list.append(task_gate_fusion_dnn)

    # 如果不是最后一层还需要更新shared expert下一层的输入,也就是当前层需要融合所有task和shared expert
    if not is_last:
        cur_experts = list(itertools.chain.from_iterable(task_expert_list)) + shared_expert_list
        cur_expert_num = len(cur_experts)
        expert_concat = tf.keras.layers.Lambda(lambda x: tf.stack(x, axis=1))(cur_experts) # None, cur_expert_num, dim
        shared_gate_dnn = DNNs(shared_gate_dnn_units, name=f"{leval_name}_shared_gate")(input_list[-1])
        shared_gate_dnn = tf.keras.layers.Dense(cur_expert_num, use_bias=False, activation='softmax')(shared_gate_dnn) # None, cur_expert_num
        shared_gate = tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, -1))(shared_gate_dnn) # None, cur_expert_num, 1
        shared_gate_fusion_output = tf.keras.layers.Lambda(lambda x: tf.reduce_sum(x[0] * x[1], axis=1, keepdims=False))([shared_gate, expert_concat])
        cgc_output_list.append(shared_gate_fusion_output)
    return cgc_output_list

CGC解决了知识分离的核心问题,但其本质是单层结构,表征学习深度有限。受深度神经网络逐层抽象特征的启发,PLE (Progressive Layered Extraction) 将多个CGC单元纵向堆叠,形成深层架构,实现渐进式知识提取与融合。

../../_images/ple.png

图3.4.4 PLE网络结构

  • 第1层 (输入层 CGC):

    • 输入:原始特征 \(\mathbf{x}\)

    • 结构:一个标准的CGC模块(包含 \(M^{(1)}\)个C-Experts, 每个任务\(t\)\(N_t^{(1)}\)个T-Experts, 以及对应的门控 \(g_t^{(1)}\))。

    • 输出:每个任务获得初步融合表示 \(\mathbf{h}_t^{(1)}\)(或更常见的是,该层所有专家(C+T)的输出被拼接/收集起来作为下一层输入)。

  • \(l\)层 (\(l \geq 2\)) CGC:

    • 输入关键点:第\(l\)层的输入是第\(l-1\)层所有专家(包括所有C-Experts和所有任务的T-Experts)的输出。设第\(l-1\)层总专家数为 \(E^{(l-1)}\),则输入向量维度相应增加。

    • 结构:一个新的CGC模块(包含 \(M^{(l)}\)个C-Experts, 每个任务\(t\)\(N_t^{(l)}\)个T-Experts, 以及门控 \(g_t^{(l)}\))。

    • 处理:在本层输入特征(即上一层更丰富的表征)上,再次进行显式的知识分离(新的C-Experts学习更深层共享模式,新的T-Experts学习更深层任务特定模式)和融合(通过新的门控)。

    • 输出:任务\(t\)的当前层表示 \(\mathbf{h}_t^{(l)}\) 或收集所有专家输出。

  • 输出层 (第\(L\)层):

    • 最后一层(\(L\))各任务的门控输出 \(\mathbf{h}_t^{(L)}\) 送入各自的任务专属塔网络(Tower)\(f_t\),得到最终预测 \(\hat{y}_t = f_t(\mathbf{h}_t^{(L)})\)

PLE模型的核心代码:

def build_ple_model(
        feature_columns,
        task_name_list,
        ple_level_nums=1,
        task_expert_num=4,
        shared_expert_num=2,
        task_expert_dnn_units=[128, 64],
        shared_expert_dnn_units=[128, 64],
        task_gate_dnn_units=[128, 64],
        shared_gate_dnn_units=[128, 64],
        task_tower_dnn_units=[128, 64],
        ):
    input_layer_dict = build_input_layer(feature_columns)
    # 构建特征嵌入表字典
    group_embedding_feature_dict = build_group_feature_embedding_table_dict(feature_columns, input_layer_dict, prefix="embedding/")

    # 连接不同组的嵌入向量作为各个网络的输入
    dnn_inputs = concat_group_embedding(group_embedding_feature_dict, 'dnn')

    task_num = len(task_name_list)
    ple_input_list = [dnn_inputs] * (task_num + 1)
    for i in range(ple_level_nums):
        if i == ple_level_nums - 1:
            cgc_output_list = cgc_net(ple_input_list,
                                        task_num,
                                        task_expert_num,
                                        shared_expert_num,
                                        task_expert_dnn_units,
                                        shared_expert_dnn_units,
                                        task_gate_dnn_units,
                                        shared_gate_dnn_units,
                                        leval_name=f"cgc_level_{str(i)}", is_last=True)
        else:
            cgc_output_list = cgc_net(ple_input_list,
                                        task_num,
                                        task_expert_num,
                                        shared_expert_num,
                                        task_expert_dnn_units,
                                        shared_expert_dnn_units,
                                        task_gate_dnn_units,
                                        shared_gate_dnn_units,
                                        leval_name=f"cgc_level_{str(i)}",
                                        is_last=False)
            ple_input_list = cgc_output_list

    # 不同任务融合多个专家的门控
    task_output_list = []
    for i in range(task_num):
        task_output_logit = DNNs(task_tower_dnn_units + [1])(cgc_output_list[i])
        task_output_prob = PredictLayer(name="task_" + task_name_list[i])(task_output_logit)
        task_output_list.append(task_output_prob)

    # 构建模型
    model = tf.keras.Model(inputs=list(input_layer_dict.values()), outputs=task_output_list)
    return model

完整训练流程如下

1. 导入需要的包,这一步在前面Shared-Bottom的过程已经导入过了

2. 数据和特征准备,由于和Shared-Bottom一样,这里不再赘述

config = funrec.load_config('ple')
train_data, test_data = funrec.load_data(config.data)
feature_columns, processed_data = funrec.prepare_features(config.features, train_data, test_data)

3. 构建模型

model = funrec.train_model(config.training, feature_columns, processed_data)

4. 模型效果评估

metrics = funrec.evaluate_model(model, processed_data, config.evaluation, feature_columns)
print(build_metrics_table(metrics))
+----------------+---------------+-----------------+-------------+-----------------+----------------+------------------+--------------+------------------------+-----------------------+-------------------------+
|   auc_is_click |   auc_is_like |   auc_long_view |   auc_macro |   gauc_is_click |   gauc_is_like |   gauc_long_view |   gauc_macro |   valid_users_is_click |   valid_users_is_like |   valid_users_long_view |
+================+===============+=================+=============+=================+================+==================+==============+========================+=======================+=========================+
|         0.5971 |        0.4375 |          0.4715 |       0.502 |          0.5732 |         0.4433 |           0.4685 |        0.495 |                    928 |                   530 |                     925 |
+----------------+---------------+-----------------+-------------+-----------------+----------------+------------------+--------------+------------------------+-----------------------+-------------------------+