10.4. 在线流程¶
本节将详细介绍在线推理流程的实现,包括冷启动检测、多路召回、精准排序和多样性重排。
10.4.1. 代码结构¶
在线流水线的代码位于 web_project/backend/online/ 目录下:
online/
├── pipeline.py # 推荐主流程
├── cold_start/ # 冷启动处理
│ ├── detector.py # 冷启动检测
│ ├── service.py # 冷启动服务
│ ├── ucb_genre.py # UCB 类型探索
│ └── preferred_genre.py # 偏好类型策略
├── recall/ # 多路召回
│ ├── service.py # 召回服务与融合
│ ├── youtubednn.py # YoutubeDNN 召回
│ ├── item_based.py # 物品相似度召回
│ └── trending.py # 热门召回
├── ranking/ # 排序模型
│ ├── service.py # 排序服务
│ └── deepfm.py # DeepFM 排序
└── reranking/ # 重排策略
├── service.py # 重排服务
└── dispersion.py # 打散策略
离线流程的产出物(共享目录中的模型、Redis中的用户特征、预计算的物品向量)是在线服务的基础。在线流程需要在200毫秒内完成推荐,这要求模型推理快、数据访问高效、多阶段协同。
整个流程封装在 RecommendationPipeline
类中(online/pipeline.py):
class RecommendationPipeline:
def __init__(self):
self.recall_service = get_recall_service()
self.ranking_service = get_ranking_service()
self.reranking_service = get_reranking_service()
self.cold_start_service = get_cold_start_service()
async def recommend(
self,
user_features: Dict[str, Any],
item_features_provider: Optional[ItemFeaturesProvider] = None,
config: Optional[PipelineConfig] = None
) -> RecommendationResult:
config = config or PipelineConfig()
# 冷启动检测
if config.enable_cold_start and self._is_cold_start(user_features, config):
return await self._cold_start_recommend(user_features, item_features_provider, config)
# 正常流程:召回 → 排序 → 重排
candidates = await self._recall(user_features, config.recall_top_k)
ranked_items, ranking_strategy = await self._rank(
user_features, candidates, item_features_provider, config.ranking_top_k
)
reranked_items, reranking_strategies = await self._rerank(
ranked_items, user_features, item_features_provider
)
return RecommendationResult(items=reranked_items, ...)
PipelineConfig 控制各阶段的行为:
@dataclass
class PipelineConfig:
recall_top_k: int = 100 # 召回阶段返回的候选数量
ranking_top_k: int = 20 # 排序阶段返回的结果数量
enable_ranking: bool = True # 是否启用排序模型
enable_reranking: bool = True # 是否启用重排
# 冷启动配置
enable_cold_start: bool = True
cold_start_threshold: int = 5 # 交互次数少于此阈值的用户为冷启动用户
cold_start_top_k: int = 20
下面依次介绍冷启动检测、多路召回、精准排序和多样性重排的具体实现。
10.4.2. 冷启动检测与处理¶
冷启动是推荐系统中的经典难题。当用户刚注册时,系统没有关于其偏好的行为数据,传统的协同过滤和向量召回都无法有效工作。本项目通过独立的冷启动模块来处理这类用户。
10.4.2.1. 冷启动检测¶
本节代码位于
online/cold_start/detector.py
冷启动检测的逻辑很简单:如果用户的历史交互次数少于设定的阈值,就认为是冷启动用户。
class ColdStartDetector:
def __init__(self, threshold: int = 5):
self.threshold = threshold
def is_cold_start(self, user_features: Dict[str, Any]) -> bool:
hist_movie_ids = user_features.get("hist_movie_ids", [])
if not hist_movie_ids:
return True
return len(hist_movie_ids) < self.threshold
阈值的选择需要权衡。设得太低,用户可能还没建立稳定的偏好模式就进入正常流程;设得太高,用户需要更长时间才能体验到个性化推荐。本项目默认使用 5 次交互作为阈值,这个值可以根据实际业务需求调整。
10.4.2.2. 冷启动策略的设计¶
对于冷启动用户,我们设计了三种策略:
UCB 类型探索策略:使用 Upper Confidence Bound 算法,在探索新类型和利用已知偏好之间取得平衡。
偏好类型策略:如果用户在注册时设置了偏好类型(preferred_genres),基于这些类型推荐高评分电影。
热门推荐策略:作为兜底策略,推荐近期的热门电影。
这三种策略通过 ColdStartService 统一管理:
class ColdStartService:
def __init__(self):
self.detector = ColdStartDetector(threshold=5)
self.strategies: List[ColdStartStrategy] = [
UCBGenreStrategy(), # 优先级 1:UCB 探索
PreferredGenreStrategy(), # 优先级 2:用户偏好
PopularRecentStrategy(), # 优先级 3:热门兜底
]
async def recommend(self, user_features: Dict[str, Any], top_k: int = 20):
# 找出可用的策略
applicable_strategies = [
s for s in self.strategies if s.can_handle(user_features)
]
# 根据用户状态分配配额
if has_ucb_data:
allocations = self._get_ucb_weighted_allocation(applicable_strategies, top_k)
elif has_preferences:
allocations = self._get_preference_weighted_allocation(applicable_strategies, top_k)
else:
allocations = self._get_fallback_allocation(applicable_strategies, top_k)
# 并行执行各策略
results_list = await asyncio.gather(*[
self._run_strategy(strategy, user_features, k)
for strategy, k in allocations if k > 0
])
# 合并结果
return self._merge_results(results_list, top_k)
策略的配额分配根据用户状态动态调整。如果用户已经有一些评分记录,UCB 策略获得 70% 的配额;如果只有偏好设置,偏好策略获得 80% 的配额;如果什么都没有,则完全使用热门推荐。
10.4.2.3. UCB 类型探索策略¶
本节代码位于
online/cold_start/ucb_genre.py
UCB(Upper Confidence Bound)是解决探索与利用(Exploration vs Exploitation)问题的经典算法。在冷启动场景中,我们面临一个选择:是推荐用户已经表现出兴趣的类型(利用),还是尝试推荐新类型来发现潜在兴趣(探索)?
UCB 算法通过一个置信上界公式来平衡这两个目标:
其中 \(\bar{r}_g\) 是类型 \(g\) 的历史平均评分,\(N\) 是总推荐次数,\(n_g\) 是类型 \(g\) 被推荐的次数,\(c\) 是探索系数。
公式的第一项代表利用:平均评分越高的类型得分越高。第二项代表探索:推荐次数越少的类型,不确定性越大,探索奖励也越高。
class UCBGenreStrategy(ColdStartStrategy):
def __init__(self, exploration_c: float = 1.5):
self.exploration_c = exploration_c
self.redis_client = redis.Redis.from_url(settings.redis_url)
def _calculate_ucb_scores(
self,
stats: Dict[str, Dict[str, float]],
total_n: int
) -> Dict[str, float]:
scores = {}
for genre in self.available_genres:
if genre in stats and stats[genre]["n"] > 0:
n = stats[genre]["n"]
reward = stats[genre]["reward"]
avg_reward = reward / n
# UCB 探索奖励
exploration_bonus = self.exploration_c * math.sqrt(
math.log(total_n + 1) / (n + 1e-6)
)
scores[genre] = avg_reward + exploration_bonus
else:
# 未探索的类型给予最高探索分
scores[genre] = 1.0 + self.exploration_c * 2
return scores
每个用户的 UCB 统计信息存储在 Redis 中,键为
user:{user_id}:genre_ucb。当用户对电影评分时,我们更新对应类型的统计:
def update_ucb_genre_stats(
user_id: int,
movie_genres: List[str],
rating: int
) -> None:
# 归一化评分到 [0, 1]
normalized_reward = rating / 10.0
key = f"user:{user_id}:genre_ucb"
for genre in movie_genres:
current_raw = redis_client.hget(key, genre)
if current_raw:
current = json.loads(current_raw)
current["n"] = current.get("n", 0) + 1
current["reward"] = current.get("reward", 0) + normalized_reward
else:
current = {"n": 1, "reward": normalized_reward}
redis_client.hset(key, genre, json.dumps(current))
这个设计的优点是:随着用户产生更多的评分行为,系统会逐渐了解用户的偏好,UCB 的“利用”成分会增加;同时,对于用户从未接触过的类型,UCB 仍然会给予一定的推荐机会,避免陷入“信息茧房”。
10.4.2.4. 偏好类型策略¶
本节代码位于
online/cold_start/preferred_genre.py
如果用户在注册时设置了偏好类型,这是一个宝贵的信息来源。偏好类型策略通过 Elasticsearch 查询这些类型中的高评分电影:
class PreferredGenreStrategy(ColdStartStrategy):
def can_handle(self, user_features: Dict[str, Any]) -> bool:
preferred_genres = user_features.get("preferred_genres", [])
return bool(preferred_genres)
async def recommend(self, user_features: Dict[str, Any], k: int):
preferred_genres = user_features.get("preferred_genres", [])
query = {
"bool": {
"must": [
{"terms": {"genres.keyword": preferred_genres}}
],
"filter": [
{"range": {"avg_rating": {"gte": 6.0}}},
{"range": {"rating_count": {"gte": 20}}}
]
}
}
res = es_service.client.search(
index=es_service.INDEX_NAME,
query=query,
sort=[
{"avg_rating": {"order": "desc"}},
{"year": {"order": "desc"}}
],
size=k
)
# 处理结果...
查询条件中增加了评分和评分数的过滤,确保推荐的是经过一定用户验证的优质电影,而不是冷门或低质量内容。
10.4.3. 多路召回¶
对于有足够历史行为的用户,系统进入正常的推荐流程。第一阶段是召回,目标是从全量电影库中快速筛选出一批候选电影。
10.4.3.1. 为什么需要多路召回¶
在 2.3节 中我们介绍了 YoutubeDNN 召回模型。既然已经有了一个不错的召回模型,为什么还需要多路召回?
原因在于单一召回策略的局限性。向量召回依赖模型学习到的表征,但模型可能无法捕捉所有相关性。例如,用户最近看了一部新上映的小众电影,由于训练数据中这部电影的样本很少,模型对它的向量表征可能不够准确,导致召回结果不理想。
多路召回的思路是“不要把鸡蛋放在一个篮子里”。我们同时运行多种召回策略,每种策略从不同角度检索候选,最后将结果合并。这样可以提高召回的覆盖率和多样性。
本项目实现了三种召回策略:
class RecallService:
def __init__(self):
self.strategies: List[RecallStrategy] = [
UserPreferenceRecallStrategy(), # 用户偏好类目召回
ItemEmbeddingRecallStrategy(), # 物品相似度召回
YouTubeDNNRecallStrategy(), # 向量召回
]
10.4.3.2. YoutubeDNN 向量召回¶
本节代码位于
online/recall/youtubednn.py
YoutubeDNN 召回的核心思想是:将用户和物品都映射到同一个向量空间,通过向量相似度来衡量用户对物品的兴趣。在离线阶段,我们已经训练好了模型并预计算了所有电影的向量。在线阶段,我们需要:
根据用户的实时特征计算用户向量
在物品向量空间中检索最相似的电影
用户特征的预处理包括类别特征编码和历史序列处理:
class YouTubeDNNRecallStrategy(RecallStrategy):
def preprocess_user(self, user_features: Dict[str, Any], max_hist_len: int = 10):
inputs = {}
encoders = self.resource_manager.encoders
# 1. 处理标量特征(user_id, gender, age, occupation, zip_code)
for feat in ["user_id", "gender", "age", "occupation", "zip_code"]:
raw_val = user_features.get(feat)
if raw_val is not None and feat in encoders:
try:
val = encoders[feat].transform([str(raw_val)])[0] + 1
except:
val = 0 # 未知值
else:
val = 0
inputs[feat] = np.array([val])
# 2. 处理历史序列特征
hist_mids_raw = user_features.get("hist_movie_ids", [])
# 编码电影ID
hist_mids_encoded = []
movie_le = encoders.get("movie_id")
if movie_le:
for mid in hist_mids_raw:
if mid in movie_le.classes_:
enc = movie_le.transform([mid])[0] + 1
hist_mids_encoded.append(enc)
# 编码类型(展开为列表)
hist_genres_encoded = []
for mid in hist_mids_raw:
genres = self.resource_manager.movie_genre_map.get(mid, [])
hist_genres_encoded.extend(genres)
# 填充/截断到固定长度
def pad(seq, max_len):
seq = seq[-max_len:] # 保留最近的
return [0] * (max_len - len(seq)) + seq
inputs["hist_movie_id"] = np.array([pad(hist_mids_encoded, max_hist_len)])
inputs["hist_genres"] = np.array([pad(hist_genres_encoded, max_hist_len)])
return inputs
有了预处理后的输入,召回过程就是标准的向量检索:
def _recall_sync(self, user_context: Dict[str, Any], k: int):
model_inputs = self.preprocess_user(user_context)
# 计算用户向量
user_emb = self.resource_manager.user_model.predict(model_inputs, verbose=0)
user_emb = user_emb / np.linalg.norm(user_emb, axis=1, keepdims=True)
# 计算与所有物品的相似度(内积)
scores = np.dot(user_emb, self.resource_manager.item_embedding_matrix.T)[0]
# 取 Top K
top_indices = np.argsort(scores)[::-1][:k]
results = []
for idx in top_indices:
if idx == 0: continue # 跳过 padding
if idx - 1 < len(self.resource_manager.all_movie_ids):
raw_mid = self.resource_manager.all_movie_ids[idx - 1]
results.append({
"movie_id": int(raw_mid),
"score": float(scores[idx]),
"recall_type": "youtube_dnn"
})
return results
由于用户向量和物品向量都经过归一化,内积等价于余弦相似度。在生产环境中,如果物品库很大(百万级以上),可以使用 FAISS 等向量检索库来加速。本项目的电影库只有 3000 多部,直接计算内积即可满足延迟要求。
10.4.3.3. 物品相似度召回(I2I)¶
本节代码位于
online/recall/item_based.py
物品相似度召回的思路更加直接:用户刚看过什么电影,就推荐与之相似的电影。这种策略特别适合捕捉用户的即时兴趣。
class ItemEmbeddingRecallStrategy(RecallStrategy):
async def recall(self, user_context: Dict[str, Any], k: int):
hist_movie_ids = user_context.get("hist_movie_ids", [])
if not hist_movie_ids:
return []
# 取最近观看的电影
last_movie_id = hist_movie_ids[0]
# 获取该电影的向量
enc_idx = movie_le.transform([last_movie_id])[0] + 1
target_emb = self.resource_manager.item_embedding_matrix[enc_idx]
target_emb = target_emb / np.linalg.norm(target_emb)
# 计算与所有物品的相似度
scores = np.dot(self.resource_manager.item_embedding_matrix, target_emb)
# 取 Top K(排除自身)
top_indices = np.argsort(scores)[::-1][:k+2]
results = []
for idx in top_indices:
if idx == 0 or idx == enc_idx:
continue
# 构建结果...
return results
这里复用了 YoutubeDNN 训练得到的物品向量。由于物品向量本身就蕴含了协同过滤的信息(相似用户喜欢的物品向量接近),I2I 召回可以有效地找到与种子电影风格相近的候选。物品向量也可以用其他方法生成,比如使用Word2Vec、Item2Vec等方法生成。
10.4.3.4. 用户偏好类目召回¶
偏好类目召回是一种基于统计的策略。它根据用户的历史行为统计出偏好的类型,然后从这些类型中召回热门电影。
这种策略的优势在于稳定性。即使用户的最近行为出现偏差(例如偶尔看了一部不感兴趣的电影),偏好类目召回仍然能够推荐用户长期喜欢的类型。
用户的偏好类目在离线阶段计算并存入 Redis:
# 离线流程中的计算逻辑
user_movies = df_movies[df_movies["movie_id"].isin(movie_ids)]
all_genres = []
for genres in user_movies["genres"]:
all_genres.extend(genres)
if all_genres:
counts = Counter(all_genres)
top_3 = [g for g, c in counts.most_common(3)]
redis_client.hset(f"user:{user_id}:profile", "frequent_genres", ",".join(top_3))
在线召回时,从 Redis 读取偏好类目,查询对应类型的电影。
10.4.3.5. Snake Merge 多路融合¶
本节代码位于
online/recall/service.py
多路召回产生多组候选列表后,需要将它们合并成一个统一的候选集。简单的做法是按分数排序后合并,但这可能导致某一路占据主导地位。
我们采用 Snake Merge(蛇形合并)策略:从各路召回中轮流取出候选,确保每路都有代表性的候选进入排序阶段。
def _merge_results_round_robin(
self,
results_list: List[List[Dict[str, Any]]],
top_k: int
) -> List[Dict[str, Any]]:
merged_candidates = []
seen_movie_ids = set()
sources = [r if r else [] for r in results_list]
source_pointers = [0] * len(sources)
direction = 1 # 1 表示正向,-1 表示反向
current_idx = 0
while len(merged_candidates) < top_k:
# 检查是否所有源都已耗尽
all_exhausted = all(
source_pointers[i] >= len(sources[i])
for i in range(len(sources))
)
if all_exhausted:
break
# 从当前源取一个候选
src_list = sources[current_idx]
ptr = source_pointers[current_idx]
if ptr < len(src_list):
item = src_list[ptr]
source_pointers[current_idx] += 1
mid = item["movie_id"]
if mid not in seen_movie_ids:
merged_candidates.append(item)
seen_movie_ids.add(mid)
# 移动到下一个源(蛇形)
current_idx += direction
if direction == 1 and current_idx >= len(sources):
direction = -1
current_idx = len(sources) - 1
elif direction == -1 and current_idx < 0:
direction = 1
current_idx = 0
return merged_candidates
蛇形合并的名称来源于其遍历顺序:假设有三路召回 A、B、C,合并顺序是 A→B→C→C→B→A→A→B→C→…,像蛇一样来回穿梭。
10.4.4. 精准排序¶
召回阶段从全量电影库中筛选出约 100 个候选电影,但这些候选的顺序是由召回分数决定的,不够精准。排序阶段的任务是使用更复杂的模型,对这些候选进行精确的 CTR(点击率)预估,并按预测分数重新排序。
10.4.4.1. 排序模型的在线推理¶
本节代码位于
online/ranking/deepfm.py
本项目使用 DeepFM 作为排序模型。在 3.2.1.6节 中我们介绍了 DeepFM 的结构:它同时包含 FM 部分(学习二阶特征交叉)和 DNN 部分(学习高阶非线性),两部分的输出相加后经过 Sigmoid 函数得到点击概率。
在线推理的核心是特征构造。对于每个用户-候选对,我们需要提取用户特征和物品特征,编码成模型期望的输入格式:
class DeepFMRankingStrategy(RankingStrategy):
def _prepare_batch_inputs(
self,
user_features: Dict[str, Any],
candidates: List[Dict[str, Any]]
) -> Dict[str, np.ndarray]:
rm = self.resource_manager
batch_size = len(candidates)
inputs = {}
# 用户特征(所有候选共享,需要复制)
for feat in rm.user_features:
raw_val = user_features.get(feat)
encoded_val = rm.encode_feature(feat, raw_val)
inputs[feat] = np.full(batch_size, encoded_val, dtype=np.int32)
# 物品特征(每个候选不同)
for feat in rm.item_features:
encoded_values = []
for candidate in candidates:
raw_val = candidate.get(feat)
encoded_val = rm.encode_feature(feat, raw_val)
encoded_values.append(encoded_val)
inputs[feat] = np.array(encoded_values, dtype=np.int32)
return inputs
用户特征包括 user_id、gender、age、occupation 等,这些对于同一次请求的所有候选都是相同的。物品特征包括 movie_id、genres、isAdult、startYear 等,每个候选各不相同。
特征编码使用离线阶段训练时保存的 LabelEncoder:
def encode_feature(self, feat_name: str, raw_value: Any) -> int:
if raw_value is None:
return 0 # 未知值编码为 0
encoder = self.encoders.get(feat_name)
if encoder is None:
return 0
try:
# 确保类型匹配
if isinstance(encoder.classes_[0], str) and not isinstance(raw_value, str):
raw_value = str(raw_value)
if raw_value in encoder.classes_:
return int(encoder.transform([raw_value])[0]) + 1
else:
return 0 # 训练时未见过的值
except Exception:
return 0
编码时有一个重要的细节:编码值从 1 开始,0 预留给未知值。这与离线训练时的处理保持一致。
准备好输入后,调用模型进行批量预测:
def _rank_sync(
self,
user_features: Dict[str, Any],
candidates: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
# 准备输入
inputs = self._prepare_batch_inputs(user_features, candidates)
# 批量预测
predictions = self.resource_manager.ranking_model.predict(
inputs,
verbose=0,
batch_size=min(len(candidates), 256)
)
# 展平预测结果
if predictions.ndim > 1:
predictions = predictions.flatten()
# 组合结果并排序
ranked_results = []
for i, candidate in enumerate(candidates):
ranked_results.append({
"movie_id": candidate["movie_id"],
"score": float(predictions[i]), # 排序分数(CTR 预测)
"recall_score": candidate.get("score", 0.0), # 保留召回分数
"recall_type": candidate.get("recall_type"),
})
ranked_results.sort(key=lambda x: x["score"], reverse=True)
return ranked_results
批量预测相比逐个预测有显著的性能优势。TensorFlow 在批量推理时可以更好地利用 CPU 的向量化指令或 GPU 的并行计算能力。对于 100 个候选的批量预测,耗时通常在 10-30 毫秒。
10.4.4.2. 异步执行与降级策略¶
由于模型推理是 CPU 密集型操作,为了不阻塞事件循环,我们将其放在线程池中执行:
async def rank(
self,
user_features: Dict[str, Any],
candidates: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
if not self.is_ready:
return candidates # 模型未就绪时直接返回
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
self._rank_sync,
user_features,
candidates
)
当排序模型不可用时(例如模型文件加载失败),系统会自动降级到 FallbackRankingStrategy,直接使用召回分数作为排序依据:
class FallbackRankingStrategy(RankingStrategy):
async def rank(
self,
user_features: Dict[str, Any],
candidates: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
sorted_candidates = sorted(
candidates,
key=lambda x: x.get("score", 0.0),
reverse=True
)
return [
{
"movie_id": c["movie_id"],
"score": c.get("score", 0.0),
"recall_score": c.get("score", 0.0),
"recall_type": c.get("recall_type"),
}
for c in sorted_candidates
]
这种降级设计确保了服务的高可用性:即使排序模型出现问题,用户仍然可以获得推荐结果,只是质量有所下降。
10.4.5. 多样性重排¶
经过召回和排序后,我们得到了一个按 CTR 预测分数排序的电影列表。但这个列表可能存在多样性不足的问题:如果用户历史中动作片占比较高,排序模型可能会把动作片都排在前面,导致推荐列表清一色都是动作片。
用户体验研究表明,适度的多样性可以提升用户满意度和留存率。重排阶段的任务就是在保持整体相关性的前提下,增加推荐列表的多样性。
10.4.5.1. 连续打散策略¶
本节代码位于
online/reranking/dispersion.py
本项目采用“连续打散”(Consecutive Dispersion)策略,核心规则是:不允许超过 N 个相同属性的物品连续出现。
以类型打散为例,设 N=2,则推荐列表中最多只能有 2 部相同类型的电影连续出现。如果原始排序是 [动作, 动作, 动作, 喜剧],打散后变为 [动作, 动作, 喜剧, 动作]。
算法的实现思路是:
按原始排序顺序遍历候选
如果当前候选可以加入结果列表(不违反连续约束),则加入
如果不能加入,则放入“待定队列”
每次加入新元素后,尝试从待定队列中取出可以加入的元素
最后将待定队列中剩余的元素追加到末尾
class ConsecutiveDispersionStrategy(RerankingStrategy):
def __init__(
self,
feature_extractor: Callable[[Dict[str, Any]], Optional[str]],
max_consecutive: int = 2,
strategy_name: str = "consecutive_dispersion"
):
self._feature_extractor = feature_extractor
self._max_consecutive = max_consecutive
self._strategy_name = strategy_name
def _can_add(self, item: Dict[str, Any], result: List[Dict[str, Any]]) -> bool:
"""检查加入该元素是否会违反连续约束"""
if len(result) < self._max_consecutive:
return True
item_key = self._feature_extractor(item)
# 没有该属性的元素总是可以加入
if item_key is None:
return True
# 检查最近 (max_consecutive - 1) 个元素
recent_keys = [
self._feature_extractor(r)
for r in result[-(self._max_consecutive - 1):]
]
# 如果最近的元素都与当前元素相同,则不能加入
return not all(k == item_key for k in recent_keys)
async def rerank(
self,
items: List[Dict[str, Any]],
user_features: Dict[str, Any] = None
) -> List[Dict[str, Any]]:
if len(items) <= self._max_consecutive:
return items
result = []
deferred = []
for item in items:
if self._can_add(item, result):
result.append(item)
# 尝试插入待定队列中的元素
self._try_insert_deferred(result, deferred)
else:
deferred.append(item)
# 将剩余的待定元素追加到末尾
result.extend(deferred)
return result
10.4.5.2. 预定义的打散策略¶
基于 ConsecutiveDispersionStrategy,我们定义了两种具体的打散策略:
类型打散:确保不超过 2 部相同类型的电影连续出现。
def _extract_genre(item: Dict[str, Any]) -> Optional[str]:
"""提取主要类型(取第一个类型)"""
genres = item.get("genres")
if isinstance(genres, list) and genres:
return genres[0]
if isinstance(genres, str):
return genres.split("|")[0].strip() if "|" in genres else genres.strip()
return None
class GenreDispersionStrategy(ConsecutiveDispersionStrategy):
def __init__(self, max_consecutive: int = 2):
super().__init__(
feature_extractor=_extract_genre,
max_consecutive=max_consecutive,
strategy_name="genre_dispersion"
)
年代打散:确保不超过 2 部相同年代的电影连续出现。年代按 10 年分桶(1990s、2000s 等)。
def _extract_decade(item: Dict[str, Any]) -> Optional[str]:
"""提取年代桶"""
year = item.get("year") or item.get("startYear")
if year is None:
return None
try:
year_int = int(year)
decade = (year_int // 10) * 10
return f"{decade}s"
except (ValueError, TypeError):
return None
class DecadeDispersionStrategy(ConsecutiveDispersionStrategy):
def __init__(self, max_consecutive: int = 2):
super().__init__(
feature_extractor=_extract_decade,
max_consecutive=max_consecutive,
strategy_name="decade_dispersion"
)
10.4.5.3. 策略链的组合¶
重排服务支持多个策略的链式组合。策略按顺序执行,每个策略的输出作为下一个策略的输入:
class RerankingService:
def __init__(self):
self._strategies: List[RerankingStrategy] = [
GenreDispersionStrategy(max_consecutive=2),
DecadeDispersionStrategy(max_consecutive=2),
]
async def rerank(
self,
items: List[Dict[str, Any]],
user_features: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
if not items or not self._enabled:
return items
result = items
for strategy in self._strategies:
if strategy.is_ready:
result = await strategy.rerank(result, user_features)
return result
通过这种链式设计,我们可以灵活地组合多种打散维度。例如先按类型打散,再按年代打散,确保推荐列表在内容类型和时间跨度两个维度上都具有多样性。
打散算法的一个重要特性是保序性:在满足连续约束的前提下,尽量保持原始的排序顺序。这意味着高分的电影仍然会排在前面,只是位置可能略有调整。这样既保证了推荐的相关性,又增加了多样性。
10.4.6. API 集成与服务启动¶
在线流程的各个组件开发完成后,需要将它们整合到 FastAPI 应用中,对外提供 HTTP 接口。
10.4.6.1. 推荐 API 的设计¶
本节代码位于
app/api/v1/endpoints/recommendations.py
推荐接口的核心逻辑如下:
@router.post("/recommend")
async def get_recommendations(
request: RecommendRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
pipeline = get_pipeline()
if not pipeline.is_ready:
raise HTTPException(status_code=503, detail="Recommendation service not ready")
# 构建用户特征
user_features = await build_user_features(current_user, db)
# 定义物品特征获取函数
async def item_features_provider(movie_ids: List[int]) -> Dict[int, Dict]:
movies = await get_movies_by_ids(db, movie_ids)
return {
m.id: {
"movie_id": m.id,
"genres": m.genres.split("|") if m.genres else [],
"year": m.year,
"isAdult": m.is_adult,
}
for m in movies
}
# 执行推荐流程
config = PipelineConfig(
recall_top_k=request.recall_top_k or 100,
ranking_top_k=request.top_k or 20,
enable_cold_start=True,
)
result = await pipeline.recommend(
user_features=user_features,
item_features_provider=item_features_provider,
config=config
)
# 查询电影详细信息
movie_ids = [item.movie_id for item in result.items]
movies = await get_movies_by_ids(db, movie_ids)
movie_map = {m.id: m for m in movies}
# 组装响应
return {
"recommendations": [
{
"movie_id": item.movie_id,
"title": movie_map[item.movie_id].title,
"poster_url": movie_map[item.movie_id].poster_url,
"genres": movie_map[item.movie_id].genres,
"year": movie_map[item.movie_id].year,
"score": item.score,
"recall_type": item.recall_type,
}
for item in result.items
if item.movie_id in movie_map
],
"is_cold_start": result.is_cold_start,
"ranking_strategy": result.ranking_strategy,
}
接口的设计有几个要点:
用户特征的构建:从数据库和 Redis 获取用户的基本信息和行为序列,组装成特征字典。
物品特征的惰性加载:通过
item_features_provider回调函数,只在需要时才查询物品特征。这避免了在召回阶段就加载大量可能用不到的数据。结果组装:流程返回的是电影 ID 和分数,还需要查询数据库获取电影的标题、海报等展示信息。
10.4.6.2. 资源加载与单例模式¶
在线服务启动时需要加载模型文件和词表。这些资源占用内存较大,应该在进程级别共享,而不是每个请求都重新加载。
本项目使用单例模式管理资源。以召回资源管理器为例:
class RecallResourceManager:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self.user_model = None
self.item_embedding_matrix = None
self.encoders = {}
self.all_movie_ids = []
self._initialized = True
def _ensure_resources_loaded(self):
"""懒加载资源"""
if self.user_model is not None:
return
# 从本地目录加载模型
self._load_from_local()
def _load_from_local(self):
deploy_dir = Path(os.getenv("MODEL_DEPLOY_DIR"))
# 加载用户模型
with open(deploy_dir / "model" / "user_recall" / "active.json") as f:
version_info = json.load(f)
model_path = deploy_dir / version_info["path"]
self.user_model = tf.keras.models.load_model(model_path)
# 加载物品向量
self.item_embedding_matrix = np.load(deploy_dir / "item_embeddings.npy")
# 加载词表
with open(deploy_dir / "vocab_dict.pkl", "rb") as f:
vocab_dict = pickle.load(f)
self.encoders = vocab_dict
采用惰性加载的方式,资源在第一次使用时才从本地目录加载。这样可以加快服务启动速度,同时确保资源只加载一次。
10.4.6.3. 健康检查¶
为了监控服务状态,流程提供了健康检查接口:
def get_health_status(self) -> Dict[str, Any]:
return {
"cold_start": {
"available": self.cold_start_service is not None,
"ready": self.is_cold_start_ready,
"strategies": self.cold_start_service.strategy_names if self.cold_start_service else [],
},
"recall": {
"available": self.recall_service is not None,
"strategies": len(self.recall_service.strategies) if self.recall_service else 0,
},
"ranking": {
"available": self.ranking_service is not None,
"ready": self.is_ranking_ready,
"strategy": self.ranking_service.active_strategy if self.ranking_service else None,
},
"reranking": {
"available": self.reranking_service is not None,
"ready": self.is_reranking_ready,
"strategies": self.reranking_service.strategy_names if self.reranking_service else [],
}
}
这个状态信息可以通过 /health
接口暴露,用于监控系统的运行状况。如果某个组件不可用,可以及时发现并处理。