3.4.1. 基础结构演进¶
3.4.1.2. MMOE¶

图3.4.2 MMOE模型结构¶
针对Shared-Bottom对相关性低的多个任务出现负迁移的现象,OMOE将底层共享的一个Shared-Bottom模块拆分成了多个Expert,最终OMOE的输出为多个Expert的加权和,本质可以看成是专家网络和全局门控的双层结构。
虽然OMOE通过底层多专家融合的方式提升了特征表征的多样性,从最终的实验结果看,确实可以一定程度上缓解低相关性任务的负迁移问题,但没有彻底解决多任务冲突的问题。因为不同任务反向传播的梯度还是会直接影响底层专家网络的学习。
为了进一步缓解多任务冲突,MMOE为每个任务配备专属门控网络,实现了门控从“全局共享”升级为“任务自适应”的方式。MMoE的数学表达式可以表示为:
其中, - \(\mathbf{x}\)表示底层的特征输入
\(\mathbf{e}_k\)表示第k个专家网络的输出
\(g_t(\mathbf{x})\)表示第\(t\)个任务融合专家网络的门控向量
\(\mathbf{h}_t\)表示第\(t\)个任务融合专家网络的输出
\(\hat{y}_t\)表示第\(t\)个任务的预测结果
差异化特征融合门控网络\(g_t\)根据任务特性选择专家组合,例如在电商场景,CTR任务门控加权“即时兴趣”“价格敏感”专家,CVR任务门控:侧重“消费能力”“品牌忠诚”专家。
当任务\(i\)与\(j\)冲突时,如果模型学的还不错,那么这两个任务对所有Expert的融合权重是有差异的。例如出现了某个\(e_m\)专家,对于任务\(i\)来说非常大,但是对于任务\(j\)来说非常小,那么最终\(e_m\)参数的更新就会被任务\(i\)来主导,起到了一定的梯度隔离的效果。
MMoE代码实践
MMoE模型构建代码如下,先组装输入到MoE网络中的特征dnn_inputs, 然后为每个任务创建一个门控网络输出最终融合Expert的门控向量。最后为每个任务都创建一个任务塔,并且不同任务塔的输入都是对应任务的门控向量和多个Expert融合后的向量。
def build_mmoe_model(
feature_columns,
task_name_list,
expert_nums=4,
expert_dnn_units=[128, 64],
gate_dnn_units=[128, 64],
task_tower_dnn_units=[128, 64],
):
input_layer_dict = build_input_layer(feature_columns)
# 构建特征嵌入表字典
group_embedding_feature_dict = build_group_feature_embedding_table_dict(feature_columns, input_layer_dict, prefix="embedding/")
# 连接不同组的嵌入向量作为各个网络的输入
dnn_inputs = concat_group_embedding(group_embedding_feature_dict, 'dnn')
# 创建多个专家
expert_output_list = []
for i in range(expert_nums):
expert_output = DNNs(expert_dnn_units, name=f"expert_{str(i)}")(dnn_inputs)
expert_output_list.append(expert_output)
expert_concat = tf.keras.layers.Lambda(lambda x: tf.stack(x, axis=1))(expert_output_list) # (None, expert_num, dims)
# 定义不同任务的专家门控网络
task_tower_input_list = []
for i in range(len(task_name_list)):
gate_output = DNNs(gate_dnn_units, name=f"task_{str(i)}_gates")(dnn_inputs)
gate_output = tf.keras.layers.Dense(expert_nums, use_bias=False, activation='softmax', name=f"task_{i}_softmax")(gate_output)
gate_output = tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, axis=-1))(gate_output) # (None,expert_num, 1)
gate_expert_output = tf.keras.layers.Lambda(lambda x: x[0] * x[1])([gate_output, expert_concat])
gate_expert_output = tf.keras.layers.Lambda(lambda x: tf.reduce_sum(x, axis=1, keepdims=False))(gate_expert_output)
task_tower_input_list.append(gate_expert_output)
# 不同任务融合多个专家的门控
task_output_list = []
for i in range(len(task_name_list)):
task_output_logit = DNNs(task_tower_dnn_units + [1])(task_tower_input_list[i])
task_output_prob = PredictLayer(name="task_" + task_name_list[i])(task_output_logit)
task_output_list.append(task_output_prob)
# 构建模型
model = tf.keras.Model(inputs=list(input_layer_dict.values()), outputs=task_output_list)
return model
完整训练流程如下
1. 导入需要的包,这一步在前面Shared-Bottom的过程已经导入过了
2. 数据和特征准备,由于和Shared-Bottom一样,这里不再赘述
config = funrec.load_config('mmoe')
train_data, test_data = funrec.load_data(config.data)
feature_columns, processed_data = funrec.prepare_features(config.features, train_data, test_data)
3. 构建模型
model = funrec.train_model(config.training, feature_columns, processed_data)
4. 模型效果评估
metrics = funrec.evaluate_model(model, processed_data, config.evaluation, feature_columns)
print(build_metrics_table(metrics))
+----------------+---------------+-----------------+-------------+-----------------+----------------+------------------+--------------+------------------------+-----------------------+-------------------------+
| auc_is_click | auc_is_like | auc_long_view | auc_macro | gauc_is_click | gauc_is_like | gauc_long_view | gauc_macro | valid_users_is_click | valid_users_is_like | valid_users_long_view |
+================+===============+=================+=============+=================+================+==================+==============+========================+=======================+=========================+
| 0.6019 | 0.4164 | 0.438 | 0.4855 | 0.5722 | 0.4436 | 0.4456 | 0.4871 | 928 | 530 | 925 |
+----------------+---------------+-----------------+-------------+-----------------+----------------+------------------+--------------+------------------------+-----------------------+-------------------------+
3.4.1.3. PLE¶
MMoE 通过为每个任务配备专属门控网络,在一定程度上缓解了多任务冲突问题。专属门控网络能够根据任务特性选择专家组合,从而使得不同任务可以关注不同的特征表示。但其架构仍存在一个根本性局限:所有专家对所有任务门控可见。这种“软隔离”设计在实践中仍面临两大挑战:
负迁移未根除:
干扰路径未切断:即使某个专家(如\(e_m\))被任务\(i\)的门控高度加权而被任务\(j\)的门控忽略,任务\(j\)的梯度在反向传播时仍会流经\(e_m\)(因为\(e_m\)是任务\(j\)门控的可选项)。当任务冲突强烈时,这种“潜在通路”仍可能导致共享表征被污染。
专家角色模糊:MMoE缺乏机制强制专家明确分工。一个专家可能同时承载共享信息和多个任务的特定信息,成为冲突的“重灾区”。尤其在任务相关性低时,这种耦合会加剧负迁移。
门控决策负担重:
每个任务的门控需要在所有\(K\)个专家上进行权重分配。当专家数量增加(通常需扩大\(K\)以提升模型能力)时,门控网络面临高维决策问题,易导致训练不稳定或陷入次优解。
门控需要“费力”地从包含混杂信息(共享+所有任务特定) 的专家池中筛选有用信息,增加了学习难度。
为解决上述问题,CGC结构被提出,其核心思想是通过硬性结构约束,显式分离共享知识与任务特定知识:

图3.4.3 CGC网络结构¶
专家职责强制分离:
共享专家 (C-Experts):一组专家仅负责学习所有任务的共性知识。设其数量为\(M\),输出为\({\mathbf{c}_1, \mathbf{c}_2, ..., \mathbf{c}_M}\)。
任务专家 (T-Experts):每个任务\(t\)拥有自己专属的专家组,仅负责学习该任务特有的知识或模式。设任务\(t\)的专属专家数量为\(N_t\),输出为\({\mathbf{t}_t^1, \mathbf{t}_t^2, ..., \mathbf{t}_t^{N_t}}\)。
任务专属门控的输入限制:
任务\(t\)的门控\(g_t\)的输入被严格限制为:共享专家输出 (\({\mathbf{c}k}{k=1}^M\)) + 本任务专属专家输出 (\({\mathbf{t}t^j}{j=1}^{N_t}\))。
物理切断干扰路径:任务\(t\)的门控完全无法访问其他任务\(s (s \neq t)\)的专属专家\({\mathbf{t}_s^j}\)。同样,任务\(s\)的梯度绝不会更新任务\(t\)的专属专家参数。
CGC门控的计算如下:
其中:
\(\mathbf{W}_t, \mathbf{b}_t\):任务\(t\)门控的参数。
\(g_{t,k}\):分配给第\(k\)个共享专家的权重。
\(g_{t, M+j}\):分配给任务\(t\)第\(j\)个专属专家的权重。
CGC核心代码为:
def cgc_net(
input_list,
task_num,
task_expert_num,
shared_expert_num,
task_expert_dnn_units,
shared_expert_dnn_units,
task_gate_dnn_units,
shared_gate_dnn_units,
leval_name=None,
is_last=False):
"""cgc结构
input_list, 每个task都有一个输入,这些task的输入都是共享的,为了方便处理,给每个task都复制了一份
is_last主要是判断是否是最后一层cgc,如果是的话,就不需要把share部分添加到输出中了
"""
# 创建每个任务的task expert
task_expert_list = []
for i in range(task_num):
task_i_expert_list = []
for j in range(task_expert_num):
expert_dnn = DNNs(task_expert_dnn_units, name=f"{leval_name}_task_{str(i)}_expert_{str(j)}")(input_list[i])
task_i_expert_list.append(expert_dnn)
task_expert_list.append(task_i_expert_list)
# 创建所有任务的shared expert
shared_expert_list = []
for i in range(shared_expert_num):
expert_dnn = DNNs(shared_expert_dnn_units, name=f"{leval_name}_shared_expert_{str(i)}")(input_list[-1])
shared_expert_list.append(expert_dnn)
# 创建每个任务的融合门控
task_gate_list = []
fusion_expert_num = task_expert_num + shared_expert_num
for i in range(task_num):
gate_dnn = DNNs(task_gate_dnn_units, name=f"{leval_name}_task_{str(i)}_gate")(input_list[i])
gate_dnn = tf.keras.layers.Dense(fusion_expert_num, use_bias=False, activation='softmax')(gate_dnn)
gate_dnn = tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, axis=-1))(gate_dnn) # (None, gate_num, 1)
task_gate_list.append(gate_dnn)
# cgc输出结果
cgc_output_list = []
for i in range(task_num):
cur_experts = task_expert_list[i] + shared_expert_list
expert_concat = tf.keras.layers.Lambda(lambda x: tf.stack(x, axis=1))(cur_experts) # None, gate_num, dim
cur_gate = task_gate_list[i]
task_gate_fusion_dnn = tf.keras.layers.Lambda(lambda x: tf.reduce_sum(x[0] * x[1], axis=1, keepdims=False))([cur_gate, expert_concat]) # None, dim
cgc_output_list.append(task_gate_fusion_dnn)
# 如果不是最后一层还需要更新shared expert下一层的输入,也就是当前层需要融合所有task和shared expert
if not is_last:
cur_experts = list(itertools.chain.from_iterable(task_expert_list)) + shared_expert_list
cur_expert_num = len(cur_experts)
expert_concat = tf.keras.layers.Lambda(lambda x: tf.stack(x, axis=1))(cur_experts) # None, cur_expert_num, dim
shared_gate_dnn = DNNs(shared_gate_dnn_units, name=f"{leval_name}_shared_gate")(input_list[-1])
shared_gate_dnn = tf.keras.layers.Dense(cur_expert_num, use_bias=False, activation='softmax')(shared_gate_dnn) # None, cur_expert_num
shared_gate = tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, -1))(shared_gate_dnn) # None, cur_expert_num, 1
shared_gate_fusion_output = tf.keras.layers.Lambda(lambda x: tf.reduce_sum(x[0] * x[1], axis=1, keepdims=False))([shared_gate, expert_concat])
cgc_output_list.append(shared_gate_fusion_output)
return cgc_output_list
CGC解决了知识分离的核心问题,但其本质是单层结构,表征学习深度有限。受深度神经网络逐层抽象特征的启发,PLE (Progressive Layered Extraction) 将多个CGC单元纵向堆叠,形成深层架构,实现渐进式知识提取与融合。

图3.4.4 PLE网络结构¶
第1层 (输入层 CGC):
输入:原始特征 \(\mathbf{x}\)。
结构:一个标准的CGC模块(包含 \(M^{(1)}\)个C-Experts, 每个任务\(t\)有 \(N_t^{(1)}\)个T-Experts, 以及对应的门控 \(g_t^{(1)}\))。
输出:每个任务获得初步融合表示 \(\mathbf{h}_t^{(1)}\)(或更常见的是,该层所有专家(C+T)的输出被拼接/收集起来作为下一层输入)。
第\(l\)层 (\(l \geq 2\)) CGC:
输入关键点:第\(l\)层的输入是第\(l-1\)层所有专家(包括所有C-Experts和所有任务的T-Experts)的输出。设第\(l-1\)层总专家数为 \(E^{(l-1)}\),则输入向量维度相应增加。
结构:一个新的CGC模块(包含 \(M^{(l)}\)个C-Experts, 每个任务\(t\)有 \(N_t^{(l)}\)个T-Experts, 以及门控 \(g_t^{(l)}\))。
处理:在本层输入特征(即上一层更丰富的表征)上,再次进行显式的知识分离(新的C-Experts学习更深层共享模式,新的T-Experts学习更深层任务特定模式)和融合(通过新的门控)。
输出:任务\(t\)的当前层表示 \(\mathbf{h}_t^{(l)}\) 或收集所有专家输出。
输出层 (第\(L\)层):
最后一层(\(L\))各任务的门控输出 \(\mathbf{h}_t^{(L)}\) 送入各自的任务专属塔网络(Tower)\(f_t\),得到最终预测 \(\hat{y}_t = f_t(\mathbf{h}_t^{(L)})\)。
PLE模型的核心代码:
def build_ple_model(
feature_columns,
task_name_list,
ple_level_nums=1,
task_expert_num=4,
shared_expert_num=2,
task_expert_dnn_units=[128, 64],
shared_expert_dnn_units=[128, 64],
task_gate_dnn_units=[128, 64],
shared_gate_dnn_units=[128, 64],
task_tower_dnn_units=[128, 64],
):
input_layer_dict = build_input_layer(feature_columns)
# 构建特征嵌入表字典
group_embedding_feature_dict = build_group_feature_embedding_table_dict(feature_columns, input_layer_dict, prefix="embedding/")
# 连接不同组的嵌入向量作为各个网络的输入
dnn_inputs = concat_group_embedding(group_embedding_feature_dict, 'dnn')
task_num = len(task_name_list)
ple_input_list = [dnn_inputs] * (task_num + 1)
for i in range(ple_level_nums):
if i == ple_level_nums - 1:
cgc_output_list = cgc_net(ple_input_list,
task_num,
task_expert_num,
shared_expert_num,
task_expert_dnn_units,
shared_expert_dnn_units,
task_gate_dnn_units,
shared_gate_dnn_units,
leval_name=f"cgc_level_{str(i)}", is_last=True)
else:
cgc_output_list = cgc_net(ple_input_list,
task_num,
task_expert_num,
shared_expert_num,
task_expert_dnn_units,
shared_expert_dnn_units,
task_gate_dnn_units,
shared_gate_dnn_units,
leval_name=f"cgc_level_{str(i)}",
is_last=False)
ple_input_list = cgc_output_list
# 不同任务融合多个专家的门控
task_output_list = []
for i in range(task_num):
task_output_logit = DNNs(task_tower_dnn_units + [1])(cgc_output_list[i])
task_output_prob = PredictLayer(name="task_" + task_name_list[i])(task_output_logit)
task_output_list.append(task_output_prob)
# 构建模型
model = tf.keras.Model(inputs=list(input_layer_dict.values()), outputs=task_output_list)
return model
完整训练流程如下
1. 导入需要的包,这一步在前面Shared-Bottom的过程已经导入过了
2. 数据和特征准备,由于和Shared-Bottom一样,这里不再赘述
config = funrec.load_config('ple')
train_data, test_data = funrec.load_data(config.data)
feature_columns, processed_data = funrec.prepare_features(config.features, train_data, test_data)
3. 构建模型
model = funrec.train_model(config.training, feature_columns, processed_data)
4. 模型效果评估
metrics = funrec.evaluate_model(model, processed_data, config.evaluation, feature_columns)
print(build_metrics_table(metrics))
+----------------+---------------+-----------------+-------------+-----------------+----------------+------------------+--------------+------------------------+-----------------------+-------------------------+
| auc_is_click | auc_is_like | auc_long_view | auc_macro | gauc_is_click | gauc_is_like | gauc_long_view | gauc_macro | valid_users_is_click | valid_users_is_like | valid_users_long_view |
+================+===============+=================+=============+=================+================+==================+==============+========================+=======================+=========================+
| 0.5971 | 0.4375 | 0.4715 | 0.502 | 0.5732 | 0.4433 | 0.4685 | 0.495 | 928 | 530 | 925 |
+----------------+---------------+-----------------+-------------+-----------------+----------------+------------------+--------------+------------------------+-----------------------+-------------------------+