4.1. 基于贪心的重排¶
贪心算法以其思路直观、计算高效、易于实现的特点,成为重排阶段解决多样性、新颖性等问题的首选策略之一。它们通常不依赖复杂的模型训练,而是基于预先定义的规则或目标函数,通过逐步选择当前最优解(贪心选择)的方式来构建或调整最终推荐列表。本节将深入剖析两种经典的、基于贪心的重排算法:最大边际相关(Maximal Marginal Relevance, MMR) 和 行列式点过程(Determinantal Point Process, DPP)。
4.1.1. MMR:最大边际相关¶
在精排输出的按CTR降序排列的列表中,头部物品往往具有高度相似性(如连续推荐同品类商品或同风格视频)。这种同质化现象直接导致两大问题:
用户体验恶化:用户浏览时产生审美疲劳,兴趣衰减速度加快;
系统效率损失:长尾优质内容曝光不足,平台生态多样性下降。
MMR算法 (Carbonell and Goldstein, 1998) 的核心目标是在保留高相关性物品的前提下,通过主动引入多样性打破同质化,实现“相关性与多样性的帕累托最优”。
MMR通过定义边际收益函数量化物品对列表的增量价值:
其中:
\(S\):已选物品集合
\(\text{Rel}(i)\):物品\(i\)的相关性分数,直接继承精排模型输出(如CTR预估分)
\(\text{Sim}(i,j)\):物品\(i\)与\(j\)的相似度,计算方式包括:
\(\lambda\):权衡参数 (\(0 \leq \lambda \leq 1\))
\(\lambda \to 1\):退化为精排序(纯相关性优先)
\(\lambda \to 0\):强制多样性优先(可能牺牲相关性)
当精排候选内容数量太多的时候,可以通过滑动窗口来对齐进行优化,也就是计算相似度的时候不是直接计算所有的相似度,而是计算窗口内的相似度,
其中\(W \subseteq S\)是最近选择的\(w\)个物品(\(w = |W| \ll |S|\))。
MMR核心代码实现:
import copy
from typing import Any, Callable, Dict, List
import numpy as np
from numpy.linalg import norm
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
class Item:
def __init__(
self,
id: str,
rel: float,
dense_vector: List[float] = None,
sparse_features: Dict[str, Any] = None
):
self.id = id
self.rel = rel # 相关性分数(精排分)
self.dense_vector = dense_vector # 稠密向量表示(如嵌入向量)
self.sparse_features = sparse_features # 稀疏特征(标签、类别、作者等)
def MMR_Reranking(
item_pool: List[Item],
k: int,
lambda_param: float, # 权衡参数
sim_func: Callable[[Item, Item], float], # 相似度计算函数
window_size: int = None # 滑动窗口大小
) -> List[Item]:
"""
基于最大边际相关(MMR)算法的重排实现,支持滑动窗口优化
参数:
item_pool -- 候选物品列表
k -- 最终返回的物品数量
lambda_param -- 相关性与多样性权衡参数 (0-1)
sim_func -- 物品相似度计算函数
window_size -- 滑动窗口大小,默认为None(使用所有已选物品)
返回:
重排后的物品列表
"""
# 创建副本避免修改原始输入
candidates = copy.deepcopy(item_pool)
S = [] # 初始化重排结果列表
if not candidates:
return S
# 第一步:选取精排最高分物品
first_item = max(candidates, key=lambda x: x.rel)
S.append(first_item)
candidates.remove(first_item)
# 第二步:贪心迭代选择
while len(S) < k and candidates:
best_score = -float('inf')
best_item = None
# 确定要考虑的已选物品窗口
if window_size and len(S) > window_size:
# 只使用最近选择的window_size个物品
window = S[-window_size:]
else:
# 使用所有已选物品
window = S
for item in candidates:
# 计算与窗口中物品的最大相似度
max_sim = max(sim_func(item, s) for s in window) if window else 0
# 使用MMR公式: MR(i) = $\lambda$ * Rel(i) - (1 - $\lambda$) * max_sim(i, window)
score = lambda_param * item.rel - (1 - lambda_param) * max_sim
if score > best_score:
best_score = score
best_item = item
if best_item:
S.append(best_item)
candidates.remove(best_item)
else:
break # 无有效候选时退出
return S
def cosine_similarity(x, y):
vec1 = np.array(x.dense_vector)
vec2 = np.array(y.dense_vector)
return np.dot(vec1, vec2) / (norm(vec1) * norm(vec2))
# 生成测试数据
item_size = 1000
feature_dimension = 100
max_length = 50
# 创建测试物品
test_items = []
for i in range(item_size):
rel_score = np.exp(0.01 * np.random.randn() + 0.2)
dense_vec = np.random.randn(feature_dimension)
dense_vec /= norm(dense_vec) # 归一化
item = Item(
id=f"item_{i}",
rel=rel_score,
dense_vector=dense_vec.tolist()
)
test_items.append(item)
# 计算相似度
sim_func = cosine_similarity
# 滑动窗口大小
window_size = 3
# 权衡参数
lambda_param = 0.7
# 重排
reranked_items = MMR_Reranking(test_items, 3, lambda_param, sim_func, window_size)
# 输出结果
print(f'选择了 {len(reranked_items)} 个物品')
选择了 3 个物品
假如有5个待重排的物品,已知精排打分和item之间的两两相似度,重排需要从5个物品中筛选出top3条内容的详细计算流程如下: 1. 假设候选集包含5个商品及其精排分(Rel),相似度矩阵如下:
商品 |
Rel |
A |
B |
C |
D |
E |
---|---|---|---|---|---|---|
A |
0.95 |
1.0 |
0.2 |
0.8 |
0.1 |
0.3 |
B |
0.90 |
0.2 |
1.0 |
0.1 |
0.7 |
0.4 |
C |
0.85 |
0.8 |
0.1 |
1.0 |
0.3 |
0.6 |
D |
0.80 |
0.1 |
0.7 |
0.3 |
1.0 |
0.5 |
E |
0.75 |
0.3 |
0.4 |
0.6 |
0.5 |
1.0 |
\(\lambda=0.7\)时的MMR过程:
初始选择:A (Rel=0.95)
第二轮计算:
B: 0.90 - 0.7*max(Sim(A,B)=0.2) = 0.90 - 0.14 = 0.76 C: 0.85 - 0.7*0.8 = 0.85 - 0.56 = 0.29 D: 0.80 - 0.7*0.1 = 0.80 - 0.07 = 0.73 E: 0.75 - 0.7*0.3 = 0.75 - 0.21 = 0.54
选择 B (score=0.76)
第三轮计算(对比当前列表S=[A,B]):
C: 0.85 - 0.7*max(Sim(A,C)=0.8, Sim(B,C)=0.1) = 0.85-0.56=0.29 D: 0.80 - 0.7*max(0.1, 0.7) = 0.80-0.49=0.31 E: 0.75 - 0.7*max(0.3, 0.4) = 0.75-0.28=0.47
选择 E (score=0.47)
最终序列: [A, B, E] (对比精排序[A, B, C] 多样性提升37%)
4.1.2. DPP:行列式点过程¶
4.1.2.1. 行列式如何度量多样性¶
上述MMR原理中可以看出,MMR通过候选内容和已选内容计算两两相似度,贪心的选择一个和已选所有内容相似度最低的内容。这种方式无法捕捉多个物品间的复杂排斥关系(如三个相似物品的冗余效应),而行列式可以实现这一点。为了解释清楚行列式如何度量多样性,下面会花一定的篇幅做详细的介绍。
假设我们通过余弦相似度的方式来计算物品之间的相似度,对于每一个物品都有一个向量表示\(x_i\),那么对于待排序的所有物品\(X\),很容易得到所有物品两两之间的相似度矩阵\(S=X^TX\)。
我们知道矩阵行列式的几何意义表示的是,矩阵列向量张成的超面体的“有向体积”。在矩阵\(S\)中,如果列向量都线性相关,意味着列向量“塌缩”在更低维的空间中(在2D中,两个向量共线;在3D中,三个向量共面),此时矩阵\(S\)的行列式\(det(S)=0\)。反之,如果线性不相关,向量张成的高纬空间没有冗余,线性不相关。
假如我们有4个物品,对应的标签分别为:\(a=\text{科幻动作片},b=\text{科幻喜剧片},c=\text{古装爱情片},d=\text{古装悬疑片}\),计算物品之间的两两相似度,得到相似度矩阵\(S_t\),物品\({a,b,c,d}\)的相似度矩阵:
分别计算物品 \({a,b}\) 和物品 \({b,d}\) 的相似度矩阵 \(S_{a,b}\) 和 \(S_{b,d}\):
它们的行列式分别为:
\(|S_{a,b}|=1*1-0.9*0.9=0.19\)
\(|S_{b,d}|=1*1-0.1*0.1=0.81\)
从行列式的结果可以看出,当相似度矩阵的行列式值较大时,对应物品的多样性越高,反之行列式的值越低,多样性越低。
4.1.2.2. 相关性与多样性融合¶
在推荐中,相关性和多样性是两个重要的指标。相关性指的是物品之间的相似性,即物品的相关性越高,推荐的结果越相似。在DPP中,通过引入一个半正定的核矩阵\(L\)来同时优化物品的相关性和多样性。该半正定核矩阵可以分解为\(L=B^TB\),其中\(B\)的每一列表示重排候选集中物品的表示向量。具体来说,\(B\)的向量是通过相关性得分\(r_i\)和归一化后的物品向量的乘积计算得来。因此核矩阵中的元素\(L_{i,j}\)可以表示为:
其中,\(\langle \mathbf{f}_i, \mathbf{f}_j \rangle\)表示物品\(i\)和物品\(j\)的内积,即相似度得分\(S_{ij}\)。因此,核矩阵\(L\)可以进一步表示为:\(\mathbf{L} = \text{Diag}(\mathbf{r}) \cdot \mathbf{S} \cdot \text{Diag}(\mathbf{r})\),即分别对相似性矩阵的每一行和每一列分别乘以\(r_i\)。
在公式推导之前,我们看一个核矩阵的详细构造过程,假设我们有 3 个物品,它们之间的相似度矩阵 \(S\) 如下:
相关性向量 \(r\) :
构建对角阵:
计算核矩阵:
首先计算\(\text{Diag}(r) \cdot S\)
然后计算 \((\text{Diag}(r) \cdot S) \cdot \text{Diag}(r)\):
构建完核矩阵后,继续上述的公式推导,根据行列式的乘法性质可得到:
对于用户\(u\)来说,被选中的候选物品集合为\(R_u\),核矩阵的行列式表示为:
两边取对数,得到:
其中:
第一项只跟“相关性”有关,越相关 \(r_{u,i}^2\) 越大;
第二项 \(\log |S|\) 只跟“多样性”有关,S 越接近正交(余弦越接近 0),行列式越大。
经过上述的简单推到,我们会发现DPP最终优化的目标也变成了类似MMR的相关性和多样性的线形组合。所以在实际应用时会通过一个超参\(\theta\)来平衡相关性和多样性的权重。
4.1.2.3. 贪心求解过程¶
上述介绍了相似矩阵的行列式可以度量多样性,通过核矩阵可以融合相关性和多样性,下面继续来看一下贪心求解的过程。重排从物品候选列表中选择一个子集,使得\(\log |L_{R_u}|\)的值最大需要通过DPP(行列式点过程)来实现。
DPP是一种性能较高的概率模型,能将复杂的概率计算转换成简单的行列式计算,通过核矩阵的行列式计算每一个子集的概率,这一筛选过程就是行列式点过程的最大后验概率推断MAP(maximum a posteriori inference),行列式点过程的MAP求解是一个复杂的过程,Hulu的论文中提出了一种改进的贪心算法能够快速求解 (Chen et al., 2018)。
这一求解过程简单来说就是每次从候选集中贪心地选择一个能使边际收益(Marginal Gain)最大的商品加入到最终的结果子集中,直到满足停止条件为止,即每次选择物品\(j\)添加到结果集\(Y_g\)中, \(Y_g\)初始化为空集,物品\(j\)需要满足下面的等式:
由于\(L\)是一个半正定矩阵,所有主子矩阵也都是半正定矩阵,假设\(det(L_{Y_g}) > 0\),\(det(L_{Y_g})\)的Cholesky分解可以表示为\(L_{Y_g}=VV^T\),其中\(V\)是一个可逆的下三角矩阵。
对于新加入的物品\(i\),我们构造构造一个新的矩阵\(\mathbf{L}_{Y_g \cup \{i\}}\),它包含了\(L_{Y_g}\)和新物品\(i\)相关的元素,新增物品\(i\)后的核矩阵\(\mathbf{L}_{Y_g \cup \{i\}}\)的Cholesky分解为:
其中:
\(\mathbf{V}\) 是已选择物品集合 \(Y_g\) 对应的Cholesky分解的下三角矩阵
\(\mathbf{L}_{Y_g,i}\) 是核矩阵 \(\mathbf{L}\) 中已选择物品集合 \(Y_g\) 与候选物品 \(i\) 之间的相关性向量
\(L_{ii}\) 是核矩阵 \(\mathbf{L}\) 中物品 \(i\) 的对角元素,\(r_i r_i \langle \mathbf{f}_i, \mathbf{f}_i \rangle=r_i^2\)
此外,行向量\(c_i\)和标量\(d_i\)满足如下条件:
因此,\(c_i\)和\(d_i\)可以求解得到:
\(c_i^T = V^T \mathbf{L}_{Y_g,i}\)
\(d_i = \sqrt{\mathbf{L}_{ii} - \|\mathbf{c}_i\|_2^2}\)
根据上述公式,我们有Cholesky分解的分块形式:
设 \(\mathbf{M} = \begin{bmatrix} \mathbf{V} & \mathbf{0} \\ \mathbf{c}_i & d_i \end{bmatrix}\),则:
根据矩阵行列式的性质:
由于 \(\mathbf{M}\) 是分块下三角矩阵,其行列式等于对角块行列式的乘积:
因此:
而由于 \(\mathbf{L}_{Y_g} = \mathbf{V}\mathbf{V}^T\),所以:
最终得到:
因此,\(\det(\mathbf{L}_{\mathbf{Y}_g \cup \{i\}})\)的计算可以表示为:
再将\(\det(\mathbf{L}_{\mathbf{Y}_g \cup \{i\}})\)的结果代入优化目标可得:
如果上式得解,即可以得到\(L_{Y_k \cup \{j\}}\)的Cholesky分解:
按照上述的思路,如果\(c_j\)和\(d_j\)被求解出来了,\(Y_g\)就会被更新。在此基础上,对于未被选中的内容\(i\),我们可以快速的计算出对应的\(c_i'\)和\(d_i'\)
即(详细推导不展开):
当 \(d_i'\) 得到之后可根据 \(j = \arg\max_{i \in Z \setminus Y_g} \log(d_i^2)\) 得到最优的内容放入到 \(Y_g\)中。
最终贪心算法的算法流程如下
初始化:
\(\mathbf{c}_i = [], d_i^2 = \mathbf{L}_{ii}, j = \arg\max_{i \in Z} \log(d_i^2), Y_g = \{j\}\)
迭代:
当停止条件不满足时,执行以下步骤:
对于每个 \(i \in Z \setminus Y_g\):
计算 \(\mathbf{e}_i = (\mathbf{L}_{ji} - \langle \mathbf{c}_j, \mathbf{c}_i \rangle) / d_j\)
更新 \(\mathbf{c}_i = [\mathbf{c}_i \quad \mathbf{e}_i], d_i^2 = d_i^2 - \mathbf{e}_i^2\)
选择 \(j = \arg\max_{i \in Z \setminus Y_g} \log(d_i^2)\),更新 \(Y_g = Y_g \cup \{j\}\)
返回:返回 \(Y_g\)
DPP贪心求解算法的代码实现如下:
import copy
import math
import time
from typing import Any, Callable, Dict, List
import numpy as np
class Item:
def __init__(
self,
id: str,
rel: float,
dense_vector: List[float] = None,
sparse_features: Dict[str, Any] = None
) -> None:
self.id = id
self.rel = rel # 相关性分数(精排分)
self.dense_vector = dense_vector # 稠密向量表示(如嵌入向量)
self.sparse_features = sparse_features # 稀疏特征(标签、类别、作者等)
def DPP_Reranking(
item_pool: List[Item],
k: int,
kernel_matrix: np.ndarray,
epsilon: float = 1E-10) -> List[Item]:
"""
基于行列式点过程(DPP)的贪心重排实现
参数:
item_pool -- 候选物品列表
k -- 最终返回的物品数量
kernel_matrix -- 核矩阵,融合了相关性和多样性
epsilon -- 数值稳定性阈值
返回:
重排后的物品列表
"""
# 创建副本避免修改原始输入
candidates = copy.deepcopy(item_pool)
item_size = len(candidates)
if not candidates or k <= 0:
return []
# 初始化Cholesky分解相关变量
cis = np.zeros((k, item_size)) # 存储c_i向量
di2s = np.copy(np.diag(kernel_matrix)) # 存储d_i^2值
selected_items = [] # 已选择物品的索引
# 第一步:选择初始物品(d_i^2最大的物品)
selected_item_idx = np.argmax(di2s)
selected_items.append(selected_item_idx)
# 第二步:贪心迭代选择
while len(selected_items) < k and len(selected_items) < item_size:
k_current = len(selected_items) - 1 # 当前迭代轮次
# 获取当前选中物品的c_i和d_i
ci_optimal = cis[:k_current, selected_item_idx]
di_optimal = math.sqrt(di2s[selected_item_idx])
# 计算与当前选中物品的核矩阵元素
elements = kernel_matrix[selected_item_idx, :]
# 更新所有候选物品的c_i和d_i^2
# e_i = (L_{ji} - <c_j, c_i>) / d_j
eis = (elements - np.dot(ci_optimal, cis[:k_current, :])) / di_optimal
cis[k_current, :] = eis
# 更新d_i^2 = d_i^2 - e_i^2
di2s -= np.square(eis)
# 选择下一个物品:argmax log(d_i^2)
selected_item_idx = np.argmax(di2s)
# 数值稳定性检查
if di2s[selected_item_idx] < epsilon:
break
selected_items.append(selected_item_idx)
# 根据选中的索引返回对应的物品
result = [candidates[idx] for idx in selected_items]
return result
def create_kernel_matrix(
item_pool: List[Item],
sim_func: Callable[[Item, Item], float]
) -> np.ndarray:
"""
构建DPP核矩阵,融合相关性和多样性
参数:
item_pool -- 候选物品列表
sim_func -- 物品相似度计算函数
返回:
核矩阵 L = diag(r) * S * diag(r)
"""
n = len(item_pool)
# 构建相关性向量
relevance_scores = np.array([item.rel for item in item_pool])
# 构建相似度矩阵
similarity_matrix = np.zeros((n, n))
for i in range(n):
for j in range(n):
if i == j:
similarity_matrix[i, j] = 1.0
else:
similarity_matrix[i, j] = sim_func(item_pool[i], item_pool[j])
# 构建核矩阵: L = diag(r) * S * diag(r)
kernel_matrix = (
relevance_scores.reshape((n, 1)) *
similarity_matrix *
relevance_scores.reshape((1, n))
)
return kernel_matrix
# 生成测试数据
item_size = 1000
feature_dimension = 100
max_length = 50
# 创建测试物品
test_items = []
for i in range(item_size):
rel_score = np.exp(0.01 * np.random.randn() + 0.2)
dense_vec = np.random.randn(feature_dimension)
dense_vec /= np.linalg.norm(dense_vec) # 归一化
item = Item(
id=f"item_{i}",
rel=rel_score,
dense_vector=dense_vec.tolist()
)
test_items.append(item)
# 定义相似度函数
def cosine_similarity_func(item1: Item, item2: Item) -> float:
vec1 = np.array(item1.dense_vector)
vec2 = np.array(item2.dense_vector)
return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
# 构建核矩阵
print('开始构建核矩阵...')
kernel_matrix = create_kernel_matrix(test_items, cosine_similarity_func)
print('核矩阵生成完成!')
# 执行DPP重排
t = time.time()
result = DPP_Reranking(test_items, max_length, kernel_matrix)
print('算法运行时间: ' + '\t' + "{0:.4e}".format(time.time() - t))
print(f'选择了 {len(result)} 个物品')
开始构建核矩阵...
核矩阵生成完成!
算法运行时间: 2.4055e-02
选择了 50 个物品