.. _sec_projects_online_pipeline: 在线流程 ======== 本节将详细介绍在线推理流程的实现,包括冷启动检测、多路召回、精准排序和多样性重排。 代码结构 -------- 在线流水线的代码位于 ``web_project/backend/online/`` 目录下: :: online/ ├── pipeline.py # 推荐主流程 ├── cold_start/ # 冷启动处理 │ ├── detector.py # 冷启动检测 │ ├── service.py # 冷启动服务 │ ├── ucb_genre.py # UCB 类型探索 │ └── preferred_genre.py # 偏好类型策略 ├── recall/ # 多路召回 │ ├── service.py # 召回服务与融合 │ ├── youtubednn.py # YoutubeDNN 召回 │ ├── item_based.py # 物品相似度召回 │ └── trending.py # 热门召回 ├── ranking/ # 排序模型 │ ├── service.py # 排序服务 │ └── deepfm.py # DeepFM 排序 └── reranking/ # 重排策略 ├── service.py # 重排服务 └── dispersion.py # 打散策略 离线流程的产出物(共享目录中的模型、Redis中的用户特征、预计算的物品向量)是在线服务的基础。在线流程需要在200毫秒内完成推荐,这要求模型推理快、数据访问高效、多阶段协同。 整个流程封装在 ``RecommendationPipeline`` 类中(\ ``online/pipeline.py``\ ): .. raw:: latex \diilbookstyleinputcell .. code:: python class RecommendationPipeline: def __init__(self): self.recall_service = get_recall_service() self.ranking_service = get_ranking_service() self.reranking_service = get_reranking_service() self.cold_start_service = get_cold_start_service() async def recommend( self, user_features: Dict[str, Any], item_features_provider: Optional[ItemFeaturesProvider] = None, config: Optional[PipelineConfig] = None ) -> RecommendationResult: config = config or PipelineConfig() # 冷启动检测 if config.enable_cold_start and self._is_cold_start(user_features, config): return await self._cold_start_recommend(user_features, item_features_provider, config) # 正常流程:召回 → 排序 → 重排 candidates = await self._recall(user_features, config.recall_top_k) ranked_items, ranking_strategy = await self._rank( user_features, candidates, item_features_provider, config.ranking_top_k ) reranked_items, reranking_strategies = await self._rerank( ranked_items, user_features, item_features_provider ) return RecommendationResult(items=reranked_items, ...) ``PipelineConfig`` 控制各阶段的行为: .. raw:: latex \diilbookstyleinputcell .. code:: python @dataclass class PipelineConfig: recall_top_k: int = 100 # 召回阶段返回的候选数量 ranking_top_k: int = 20 # 排序阶段返回的结果数量 enable_ranking: bool = True # 是否启用排序模型 enable_reranking: bool = True # 是否启用重排 # 冷启动配置 enable_cold_start: bool = True cold_start_threshold: int = 5 # 交互次数少于此阈值的用户为冷启动用户 cold_start_top_k: int = 20 下面依次介绍冷启动检测、多路召回、精准排序和多样性重排的具体实现。 冷启动检测与处理 ---------------- 冷启动是推荐系统中的经典难题。当用户刚注册时,系统没有关于其偏好的行为数据,传统的协同过滤和向量召回都无法有效工作。本项目通过独立的冷启动模块来处理这类用户。 冷启动检测 ~~~~~~~~~~ 本节代码位于 ``online/cold_start/detector.py`` 冷启动检测的逻辑很简单:如果用户的历史交互次数少于设定的阈值,就认为是冷启动用户。 .. raw:: latex \diilbookstyleinputcell .. code:: python class ColdStartDetector: def __init__(self, threshold: int = 5): self.threshold = threshold def is_cold_start(self, user_features: Dict[str, Any]) -> bool: hist_movie_ids = user_features.get("hist_movie_ids", []) if not hist_movie_ids: return True return len(hist_movie_ids) < self.threshold 阈值的选择需要权衡。设得太低,用户可能还没建立稳定的偏好模式就进入正常流程;设得太高,用户需要更长时间才能体验到个性化推荐。本项目默认使用 5 次交互作为阈值,这个值可以根据实际业务需求调整。 冷启动策略的设计 ~~~~~~~~~~~~~~~~ 对于冷启动用户,我们设计了三种策略: 1. **UCB 类型探索策略**\ :使用 Upper Confidence Bound 算法,在探索新类型和利用已知偏好之间取得平衡。 2. **偏好类型策略**\ :如果用户在注册时设置了偏好类型(preferred_genres),基于这些类型推荐高评分电影。 3. **热门推荐策略**\ :作为兜底策略,推荐近期的热门电影。 这三种策略通过 ``ColdStartService`` 统一管理: .. raw:: latex \diilbookstyleinputcell .. code:: python class ColdStartService: def __init__(self): self.detector = ColdStartDetector(threshold=5) self.strategies: List[ColdStartStrategy] = [ UCBGenreStrategy(), # 优先级 1:UCB 探索 PreferredGenreStrategy(), # 优先级 2:用户偏好 PopularRecentStrategy(), # 优先级 3:热门兜底 ] async def recommend(self, user_features: Dict[str, Any], top_k: int = 20): # 找出可用的策略 applicable_strategies = [ s for s in self.strategies if s.can_handle(user_features) ] # 根据用户状态分配配额 if has_ucb_data: allocations = self._get_ucb_weighted_allocation(applicable_strategies, top_k) elif has_preferences: allocations = self._get_preference_weighted_allocation(applicable_strategies, top_k) else: allocations = self._get_fallback_allocation(applicable_strategies, top_k) # 并行执行各策略 results_list = await asyncio.gather(*[ self._run_strategy(strategy, user_features, k) for strategy, k in allocations if k > 0 ]) # 合并结果 return self._merge_results(results_list, top_k) 策略的配额分配根据用户状态动态调整。如果用户已经有一些评分记录,UCB 策略获得 70% 的配额;如果只有偏好设置,偏好策略获得 80% 的配额;如果什么都没有,则完全使用热门推荐。 UCB 类型探索策略 ~~~~~~~~~~~~~~~~ 本节代码位于 ``online/cold_start/ucb_genre.py`` UCB(Upper Confidence Bound)是解决探索与利用(Exploration vs Exploitation)问题的经典算法。在冷启动场景中,我们面临一个选择:是推荐用户已经表现出兴趣的类型(利用),还是尝试推荐新类型来发现潜在兴趣(探索)? UCB 算法通过一个置信上界公式来平衡这两个目标: .. math:: \text{UCB}(g) = \bar{r}_g + c \cdot \sqrt{\frac{\ln N}{n_g}} 其中 :math:`\bar{r}_g` 是类型 :math:`g` 的历史平均评分,\ :math:`N` 是总推荐次数,\ :math:`n_g` 是类型 :math:`g` 被推荐的次数,\ :math:`c` 是探索系数。 公式的第一项代表利用:平均评分越高的类型得分越高。第二项代表探索:推荐次数越少的类型,不确定性越大,探索奖励也越高。 .. raw:: latex \diilbookstyleinputcell .. code:: python class UCBGenreStrategy(ColdStartStrategy): def __init__(self, exploration_c: float = 1.5): self.exploration_c = exploration_c self.redis_client = redis.Redis.from_url(settings.redis_url) def _calculate_ucb_scores( self, stats: Dict[str, Dict[str, float]], total_n: int ) -> Dict[str, float]: scores = {} for genre in self.available_genres: if genre in stats and stats[genre]["n"] > 0: n = stats[genre]["n"] reward = stats[genre]["reward"] avg_reward = reward / n # UCB 探索奖励 exploration_bonus = self.exploration_c * math.sqrt( math.log(total_n + 1) / (n + 1e-6) ) scores[genre] = avg_reward + exploration_bonus else: # 未探索的类型给予最高探索分 scores[genre] = 1.0 + self.exploration_c * 2 return scores 每个用户的 UCB 统计信息存储在 Redis 中,键为 ``user:{user_id}:genre_ucb``\ 。当用户对电影评分时,我们更新对应类型的统计: .. raw:: latex \diilbookstyleinputcell .. code:: python def update_ucb_genre_stats( user_id: int, movie_genres: List[str], rating: int ) -> None: # 归一化评分到 [0, 1] normalized_reward = rating / 10.0 key = f"user:{user_id}:genre_ucb" for genre in movie_genres: current_raw = redis_client.hget(key, genre) if current_raw: current = json.loads(current_raw) current["n"] = current.get("n", 0) + 1 current["reward"] = current.get("reward", 0) + normalized_reward else: current = {"n": 1, "reward": normalized_reward} redis_client.hset(key, genre, json.dumps(current)) 这个设计的优点是:随着用户产生更多的评分行为,系统会逐渐了解用户的偏好,UCB 的“利用”成分会增加;同时,对于用户从未接触过的类型,UCB 仍然会给予一定的推荐机会,避免陷入“信息茧房”。 偏好类型策略 ~~~~~~~~~~~~ 本节代码位于 ``online/cold_start/preferred_genre.py`` 如果用户在注册时设置了偏好类型,这是一个宝贵的信息来源。偏好类型策略通过 Elasticsearch 查询这些类型中的高评分电影: .. raw:: latex \diilbookstyleinputcell .. code:: python class PreferredGenreStrategy(ColdStartStrategy): def can_handle(self, user_features: Dict[str, Any]) -> bool: preferred_genres = user_features.get("preferred_genres", []) return bool(preferred_genres) async def recommend(self, user_features: Dict[str, Any], k: int): preferred_genres = user_features.get("preferred_genres", []) query = { "bool": { "must": [ {"terms": {"genres.keyword": preferred_genres}} ], "filter": [ {"range": {"avg_rating": {"gte": 6.0}}}, {"range": {"rating_count": {"gte": 20}}} ] } } res = es_service.client.search( index=es_service.INDEX_NAME, query=query, sort=[ {"avg_rating": {"order": "desc"}}, {"year": {"order": "desc"}} ], size=k ) # 处理结果... 查询条件中增加了评分和评分数的过滤,确保推荐的是经过一定用户验证的优质电影,而不是冷门或低质量内容。 多路召回 -------- 对于有足够历史行为的用户,系统进入正常的推荐流程。第一阶段是召回,目标是从全量电影库中快速筛选出一批候选电影。 为什么需要多路召回 ~~~~~~~~~~~~~~~~~~ 在 :numref:`two_tower` 中我们介绍了 YoutubeDNN 召回模型。既然已经有了一个不错的召回模型,为什么还需要多路召回? 原因在于单一召回策略的局限性。向量召回依赖模型学习到的表征,但模型可能无法捕捉所有相关性。例如,用户最近看了一部新上映的小众电影,由于训练数据中这部电影的样本很少,模型对它的向量表征可能不够准确,导致召回结果不理想。 多路召回的思路是“不要把鸡蛋放在一个篮子里”。我们同时运行多种召回策略,每种策略从不同角度检索候选,最后将结果合并。这样可以提高召回的覆盖率和多样性。 本项目实现了三种召回策略: .. raw:: latex \diilbookstyleinputcell .. code:: python class RecallService: def __init__(self): self.strategies: List[RecallStrategy] = [ UserPreferenceRecallStrategy(), # 用户偏好类目召回 ItemEmbeddingRecallStrategy(), # 物品相似度召回 YouTubeDNNRecallStrategy(), # 向量召回 ] YoutubeDNN 向量召回 ~~~~~~~~~~~~~~~~~~~ 本节代码位于 ``online/recall/youtubednn.py`` YoutubeDNN 召回的核心思想是:将用户和物品都映射到同一个向量空间,通过向量相似度来衡量用户对物品的兴趣。在离线阶段,我们已经训练好了模型并预计算了所有电影的向量。在线阶段,我们需要: 1. 根据用户的实时特征计算用户向量 2. 在物品向量空间中检索最相似的电影 用户特征的预处理包括类别特征编码和历史序列处理: .. raw:: latex \diilbookstyleinputcell .. code:: python class YouTubeDNNRecallStrategy(RecallStrategy): def preprocess_user(self, user_features: Dict[str, Any], max_hist_len: int = 10): inputs = {} encoders = self.resource_manager.encoders # 1. 处理标量特征(user_id, gender, age, occupation, zip_code) for feat in ["user_id", "gender", "age", "occupation", "zip_code"]: raw_val = user_features.get(feat) if raw_val is not None and feat in encoders: try: val = encoders[feat].transform([str(raw_val)])[0] + 1 except: val = 0 # 未知值 else: val = 0 inputs[feat] = np.array([val]) # 2. 处理历史序列特征 hist_mids_raw = user_features.get("hist_movie_ids", []) # 编码电影ID hist_mids_encoded = [] movie_le = encoders.get("movie_id") if movie_le: for mid in hist_mids_raw: if mid in movie_le.classes_: enc = movie_le.transform([mid])[0] + 1 hist_mids_encoded.append(enc) # 编码类型(展开为列表) hist_genres_encoded = [] for mid in hist_mids_raw: genres = self.resource_manager.movie_genre_map.get(mid, []) hist_genres_encoded.extend(genres) # 填充/截断到固定长度 def pad(seq, max_len): seq = seq[-max_len:] # 保留最近的 return [0] * (max_len - len(seq)) + seq inputs["hist_movie_id"] = np.array([pad(hist_mids_encoded, max_hist_len)]) inputs["hist_genres"] = np.array([pad(hist_genres_encoded, max_hist_len)]) return inputs 有了预处理后的输入,召回过程就是标准的向量检索: .. raw:: latex \diilbookstyleinputcell .. code:: python def _recall_sync(self, user_context: Dict[str, Any], k: int): model_inputs = self.preprocess_user(user_context) # 计算用户向量 user_emb = self.resource_manager.user_model.predict(model_inputs, verbose=0) user_emb = user_emb / np.linalg.norm(user_emb, axis=1, keepdims=True) # 计算与所有物品的相似度(内积) scores = np.dot(user_emb, self.resource_manager.item_embedding_matrix.T)[0] # 取 Top K top_indices = np.argsort(scores)[::-1][:k] results = [] for idx in top_indices: if idx == 0: continue # 跳过 padding if idx - 1 < len(self.resource_manager.all_movie_ids): raw_mid = self.resource_manager.all_movie_ids[idx - 1] results.append({ "movie_id": int(raw_mid), "score": float(scores[idx]), "recall_type": "youtube_dnn" }) return results 由于用户向量和物品向量都经过归一化,内积等价于余弦相似度。在生产环境中,如果物品库很大(百万级以上),可以使用 FAISS 等向量检索库来加速。本项目的电影库只有 3000 多部,直接计算内积即可满足延迟要求。 物品相似度召回(I2I) ~~~~~~~~~~~~~~~~~~~~~ 本节代码位于 ``online/recall/item_based.py`` 物品相似度召回的思路更加直接:用户刚看过什么电影,就推荐与之相似的电影。这种策略特别适合捕捉用户的即时兴趣。 .. raw:: latex \diilbookstyleinputcell .. code:: python class ItemEmbeddingRecallStrategy(RecallStrategy): async def recall(self, user_context: Dict[str, Any], k: int): hist_movie_ids = user_context.get("hist_movie_ids", []) if not hist_movie_ids: return [] # 取最近观看的电影 last_movie_id = hist_movie_ids[0] # 获取该电影的向量 enc_idx = movie_le.transform([last_movie_id])[0] + 1 target_emb = self.resource_manager.item_embedding_matrix[enc_idx] target_emb = target_emb / np.linalg.norm(target_emb) # 计算与所有物品的相似度 scores = np.dot(self.resource_manager.item_embedding_matrix, target_emb) # 取 Top K(排除自身) top_indices = np.argsort(scores)[::-1][:k+2] results = [] for idx in top_indices: if idx == 0 or idx == enc_idx: continue # 构建结果... return results 这里复用了 YoutubeDNN 训练得到的物品向量。由于物品向量本身就蕴含了协同过滤的信息(相似用户喜欢的物品向量接近),I2I 召回可以有效地找到与种子电影风格相近的候选。物品向量也可以用其他方法生成,比如使用Word2Vec、Item2Vec等方法生成。 用户偏好类目召回 ~~~~~~~~~~~~~~~~ 偏好类目召回是一种基于统计的策略。它根据用户的历史行为统计出偏好的类型,然后从这些类型中召回热门电影。 这种策略的优势在于稳定性。即使用户的最近行为出现偏差(例如偶尔看了一部不感兴趣的电影),偏好类目召回仍然能够推荐用户长期喜欢的类型。 用户的偏好类目在离线阶段计算并存入 Redis: .. raw:: latex \diilbookstyleinputcell .. code:: python # 离线流程中的计算逻辑 user_movies = df_movies[df_movies["movie_id"].isin(movie_ids)] all_genres = [] for genres in user_movies["genres"]: all_genres.extend(genres) if all_genres: counts = Counter(all_genres) top_3 = [g for g, c in counts.most_common(3)] redis_client.hset(f"user:{user_id}:profile", "frequent_genres", ",".join(top_3)) 在线召回时,从 Redis 读取偏好类目,查询对应类型的电影。 Snake Merge 多路融合 ~~~~~~~~~~~~~~~~~~~~ 本节代码位于 ``online/recall/service.py`` 多路召回产生多组候选列表后,需要将它们合并成一个统一的候选集。简单的做法是按分数排序后合并,但这可能导致某一路占据主导地位。 我们采用 Snake Merge(蛇形合并)策略:从各路召回中轮流取出候选,确保每路都有代表性的候选进入排序阶段。 .. raw:: latex \diilbookstyleinputcell .. code:: python def _merge_results_round_robin( self, results_list: List[List[Dict[str, Any]]], top_k: int ) -> List[Dict[str, Any]]: merged_candidates = [] seen_movie_ids = set() sources = [r if r else [] for r in results_list] source_pointers = [0] * len(sources) direction = 1 # 1 表示正向,-1 表示反向 current_idx = 0 while len(merged_candidates) < top_k: # 检查是否所有源都已耗尽 all_exhausted = all( source_pointers[i] >= len(sources[i]) for i in range(len(sources)) ) if all_exhausted: break # 从当前源取一个候选 src_list = sources[current_idx] ptr = source_pointers[current_idx] if ptr < len(src_list): item = src_list[ptr] source_pointers[current_idx] += 1 mid = item["movie_id"] if mid not in seen_movie_ids: merged_candidates.append(item) seen_movie_ids.add(mid) # 移动到下一个源(蛇形) current_idx += direction if direction == 1 and current_idx >= len(sources): direction = -1 current_idx = len(sources) - 1 elif direction == -1 and current_idx < 0: direction = 1 current_idx = 0 return merged_candidates 蛇形合并的名称来源于其遍历顺序:假设有三路召回 A、B、C,合并顺序是 A→B→C→C→B→A→A→B→C→…,像蛇一样来回穿梭。 精准排序 -------- 召回阶段从全量电影库中筛选出约 100 个候选电影,但这些候选的顺序是由召回分数决定的,不够精准。排序阶段的任务是使用更复杂的模型,对这些候选进行精确的 CTR(点击率)预估,并按预测分数重新排序。 排序模型的在线推理 ~~~~~~~~~~~~~~~~~~ 本节代码位于 ``online/ranking/deepfm.py`` 本项目使用 DeepFM 作为排序模型。在 :numref:`deepfm` 中我们介绍了 DeepFM 的结构:它同时包含 FM 部分(学习二阶特征交叉)和 DNN 部分(学习高阶非线性),两部分的输出相加后经过 Sigmoid 函数得到点击概率。 在线推理的核心是特征构造。对于每个用户-候选对,我们需要提取用户特征和物品特征,编码成模型期望的输入格式: .. raw:: latex \diilbookstyleinputcell .. code:: python class DeepFMRankingStrategy(RankingStrategy): def _prepare_batch_inputs( self, user_features: Dict[str, Any], candidates: List[Dict[str, Any]] ) -> Dict[str, np.ndarray]: rm = self.resource_manager batch_size = len(candidates) inputs = {} # 用户特征(所有候选共享,需要复制) for feat in rm.user_features: raw_val = user_features.get(feat) encoded_val = rm.encode_feature(feat, raw_val) inputs[feat] = np.full(batch_size, encoded_val, dtype=np.int32) # 物品特征(每个候选不同) for feat in rm.item_features: encoded_values = [] for candidate in candidates: raw_val = candidate.get(feat) encoded_val = rm.encode_feature(feat, raw_val) encoded_values.append(encoded_val) inputs[feat] = np.array(encoded_values, dtype=np.int32) return inputs 用户特征包括 user_id、gender、age、occupation 等,这些对于同一次请求的所有候选都是相同的。物品特征包括 movie_id、genres、isAdult、startYear 等,每个候选各不相同。 特征编码使用离线阶段训练时保存的 LabelEncoder: .. raw:: latex \diilbookstyleinputcell .. code:: python def encode_feature(self, feat_name: str, raw_value: Any) -> int: if raw_value is None: return 0 # 未知值编码为 0 encoder = self.encoders.get(feat_name) if encoder is None: return 0 try: # 确保类型匹配 if isinstance(encoder.classes_[0], str) and not isinstance(raw_value, str): raw_value = str(raw_value) if raw_value in encoder.classes_: return int(encoder.transform([raw_value])[0]) + 1 else: return 0 # 训练时未见过的值 except Exception: return 0 编码时有一个重要的细节:编码值从 1 开始,0 预留给未知值。这与离线训练时的处理保持一致。 准备好输入后,调用模型进行批量预测: .. raw:: latex \diilbookstyleinputcell .. code:: python def _rank_sync( self, user_features: Dict[str, Any], candidates: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: # 准备输入 inputs = self._prepare_batch_inputs(user_features, candidates) # 批量预测 predictions = self.resource_manager.ranking_model.predict( inputs, verbose=0, batch_size=min(len(candidates), 256) ) # 展平预测结果 if predictions.ndim > 1: predictions = predictions.flatten() # 组合结果并排序 ranked_results = [] for i, candidate in enumerate(candidates): ranked_results.append({ "movie_id": candidate["movie_id"], "score": float(predictions[i]), # 排序分数(CTR 预测) "recall_score": candidate.get("score", 0.0), # 保留召回分数 "recall_type": candidate.get("recall_type"), }) ranked_results.sort(key=lambda x: x["score"], reverse=True) return ranked_results 批量预测相比逐个预测有显著的性能优势。TensorFlow 在批量推理时可以更好地利用 CPU 的向量化指令或 GPU 的并行计算能力。对于 100 个候选的批量预测,耗时通常在 10-30 毫秒。 异步执行与降级策略 ~~~~~~~~~~~~~~~~~~ 由于模型推理是 CPU 密集型操作,为了不阻塞事件循环,我们将其放在线程池中执行: .. raw:: latex \diilbookstyleinputcell .. code:: python async def rank( self, user_features: Dict[str, Any], candidates: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: if not self.is_ready: return candidates # 模型未就绪时直接返回 loop = asyncio.get_event_loop() return await loop.run_in_executor( None, self._rank_sync, user_features, candidates ) 当排序模型不可用时(例如模型文件加载失败),系统会自动降级到 FallbackRankingStrategy,直接使用召回分数作为排序依据: .. raw:: latex \diilbookstyleinputcell .. code:: python class FallbackRankingStrategy(RankingStrategy): async def rank( self, user_features: Dict[str, Any], candidates: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: sorted_candidates = sorted( candidates, key=lambda x: x.get("score", 0.0), reverse=True ) return [ { "movie_id": c["movie_id"], "score": c.get("score", 0.0), "recall_score": c.get("score", 0.0), "recall_type": c.get("recall_type"), } for c in sorted_candidates ] 这种降级设计确保了服务的高可用性:即使排序模型出现问题,用户仍然可以获得推荐结果,只是质量有所下降。 多样性重排 ---------- 经过召回和排序后,我们得到了一个按 CTR 预测分数排序的电影列表。但这个列表可能存在多样性不足的问题:如果用户历史中动作片占比较高,排序模型可能会把动作片都排在前面,导致推荐列表清一色都是动作片。 用户体验研究表明,适度的多样性可以提升用户满意度和留存率。重排阶段的任务就是在保持整体相关性的前提下,增加推荐列表的多样性。 连续打散策略 ~~~~~~~~~~~~ 本节代码位于 ``online/reranking/dispersion.py`` 本项目采用“连续打散”(Consecutive Dispersion)策略,核心规则是:\ **不允许超过 N 个相同属性的物品连续出现**\ 。 以类型打散为例,设 N=2,则推荐列表中最多只能有 2 部相同类型的电影连续出现。如果原始排序是 [动作, 动作, 动作, 喜剧],打散后变为 [动作, 动作, 喜剧, 动作]。 算法的实现思路是: 1. 按原始排序顺序遍历候选 2. 如果当前候选可以加入结果列表(不违反连续约束),则加入 3. 如果不能加入,则放入“待定队列” 4. 每次加入新元素后,尝试从待定队列中取出可以加入的元素 5. 最后将待定队列中剩余的元素追加到末尾 .. raw:: latex \diilbookstyleinputcell .. code:: python class ConsecutiveDispersionStrategy(RerankingStrategy): def __init__( self, feature_extractor: Callable[[Dict[str, Any]], Optional[str]], max_consecutive: int = 2, strategy_name: str = "consecutive_dispersion" ): self._feature_extractor = feature_extractor self._max_consecutive = max_consecutive self._strategy_name = strategy_name def _can_add(self, item: Dict[str, Any], result: List[Dict[str, Any]]) -> bool: """检查加入该元素是否会违反连续约束""" if len(result) < self._max_consecutive: return True item_key = self._feature_extractor(item) # 没有该属性的元素总是可以加入 if item_key is None: return True # 检查最近 (max_consecutive - 1) 个元素 recent_keys = [ self._feature_extractor(r) for r in result[-(self._max_consecutive - 1):] ] # 如果最近的元素都与当前元素相同,则不能加入 return not all(k == item_key for k in recent_keys) async def rerank( self, items: List[Dict[str, Any]], user_features: Dict[str, Any] = None ) -> List[Dict[str, Any]]: if len(items) <= self._max_consecutive: return items result = [] deferred = [] for item in items: if self._can_add(item, result): result.append(item) # 尝试插入待定队列中的元素 self._try_insert_deferred(result, deferred) else: deferred.append(item) # 将剩余的待定元素追加到末尾 result.extend(deferred) return result 预定义的打散策略 ~~~~~~~~~~~~~~~~ 基于 ``ConsecutiveDispersionStrategy``\ ,我们定义了两种具体的打散策略: **类型打散**\ :确保不超过 2 部相同类型的电影连续出现。 .. raw:: latex \diilbookstyleinputcell .. code:: python def _extract_genre(item: Dict[str, Any]) -> Optional[str]: """提取主要类型(取第一个类型)""" genres = item.get("genres") if isinstance(genres, list) and genres: return genres[0] if isinstance(genres, str): return genres.split("|")[0].strip() if "|" in genres else genres.strip() return None class GenreDispersionStrategy(ConsecutiveDispersionStrategy): def __init__(self, max_consecutive: int = 2): super().__init__( feature_extractor=_extract_genre, max_consecutive=max_consecutive, strategy_name="genre_dispersion" ) **年代打散**\ :确保不超过 2 部相同年代的电影连续出现。年代按 10 年分桶(1990s、2000s 等)。 .. raw:: latex \diilbookstyleinputcell .. code:: python def _extract_decade(item: Dict[str, Any]) -> Optional[str]: """提取年代桶""" year = item.get("year") or item.get("startYear") if year is None: return None try: year_int = int(year) decade = (year_int // 10) * 10 return f"{decade}s" except (ValueError, TypeError): return None class DecadeDispersionStrategy(ConsecutiveDispersionStrategy): def __init__(self, max_consecutive: int = 2): super().__init__( feature_extractor=_extract_decade, max_consecutive=max_consecutive, strategy_name="decade_dispersion" ) 策略链的组合 ~~~~~~~~~~~~ 重排服务支持多个策略的链式组合。策略按顺序执行,每个策略的输出作为下一个策略的输入: .. raw:: latex \diilbookstyleinputcell .. code:: python class RerankingService: def __init__(self): self._strategies: List[RerankingStrategy] = [ GenreDispersionStrategy(max_consecutive=2), DecadeDispersionStrategy(max_consecutive=2), ] async def rerank( self, items: List[Dict[str, Any]], user_features: Optional[Dict[str, Any]] = None ) -> List[Dict[str, Any]]: if not items or not self._enabled: return items result = items for strategy in self._strategies: if strategy.is_ready: result = await strategy.rerank(result, user_features) return result 通过这种链式设计,我们可以灵活地组合多种打散维度。例如先按类型打散,再按年代打散,确保推荐列表在内容类型和时间跨度两个维度上都具有多样性。 打散算法的一个重要特性是\ **保序性**\ :在满足连续约束的前提下,尽量保持原始的排序顺序。这意味着高分的电影仍然会排在前面,只是位置可能略有调整。这样既保证了推荐的相关性,又增加了多样性。 API 集成与服务启动 ------------------ 在线流程的各个组件开发完成后,需要将它们整合到 FastAPI 应用中,对外提供 HTTP 接口。 推荐 API 的设计 ~~~~~~~~~~~~~~~ 本节代码位于 ``app/api/v1/endpoints/recommendations.py`` 推荐接口的核心逻辑如下: .. raw:: latex \diilbookstyleinputcell .. code:: python @router.post("/recommend") async def get_recommendations( request: RecommendRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): pipeline = get_pipeline() if not pipeline.is_ready: raise HTTPException(status_code=503, detail="Recommendation service not ready") # 构建用户特征 user_features = await build_user_features(current_user, db) # 定义物品特征获取函数 async def item_features_provider(movie_ids: List[int]) -> Dict[int, Dict]: movies = await get_movies_by_ids(db, movie_ids) return { m.id: { "movie_id": m.id, "genres": m.genres.split("|") if m.genres else [], "year": m.year, "isAdult": m.is_adult, } for m in movies } # 执行推荐流程 config = PipelineConfig( recall_top_k=request.recall_top_k or 100, ranking_top_k=request.top_k or 20, enable_cold_start=True, ) result = await pipeline.recommend( user_features=user_features, item_features_provider=item_features_provider, config=config ) # 查询电影详细信息 movie_ids = [item.movie_id for item in result.items] movies = await get_movies_by_ids(db, movie_ids) movie_map = {m.id: m for m in movies} # 组装响应 return { "recommendations": [ { "movie_id": item.movie_id, "title": movie_map[item.movie_id].title, "poster_url": movie_map[item.movie_id].poster_url, "genres": movie_map[item.movie_id].genres, "year": movie_map[item.movie_id].year, "score": item.score, "recall_type": item.recall_type, } for item in result.items if item.movie_id in movie_map ], "is_cold_start": result.is_cold_start, "ranking_strategy": result.ranking_strategy, } 接口的设计有几个要点: 1. **用户特征的构建**\ :从数据库和 Redis 获取用户的基本信息和行为序列,组装成特征字典。 2. **物品特征的惰性加载**\ :通过 ``item_features_provider`` 回调函数,只在需要时才查询物品特征。这避免了在召回阶段就加载大量可能用不到的数据。 3. **结果组装**\ :流程返回的是电影 ID 和分数,还需要查询数据库获取电影的标题、海报等展示信息。 资源加载与单例模式 ~~~~~~~~~~~~~~~~~~ 在线服务启动时需要加载模型文件和词表。这些资源占用内存较大,应该在进程级别共享,而不是每个请求都重新加载。 本项目使用单例模式管理资源。以召回资源管理器为例: .. raw:: latex \diilbookstyleinputcell .. code:: python class RecallResourceManager: _instance = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self.user_model = None self.item_embedding_matrix = None self.encoders = {} self.all_movie_ids = [] self._initialized = True def _ensure_resources_loaded(self): """懒加载资源""" if self.user_model is not None: return # 从本地目录加载模型 self._load_from_local() def _load_from_local(self): deploy_dir = Path(os.getenv("MODEL_DEPLOY_DIR")) # 加载用户模型 with open(deploy_dir / "model" / "user_recall" / "active.json") as f: version_info = json.load(f) model_path = deploy_dir / version_info["path"] self.user_model = tf.keras.models.load_model(model_path) # 加载物品向量 self.item_embedding_matrix = np.load(deploy_dir / "item_embeddings.npy") # 加载词表 with open(deploy_dir / "vocab_dict.pkl", "rb") as f: vocab_dict = pickle.load(f) self.encoders = vocab_dict 采用惰性加载的方式,资源在第一次使用时才从本地目录加载。这样可以加快服务启动速度,同时确保资源只加载一次。 健康检查 ~~~~~~~~ 为了监控服务状态,流程提供了健康检查接口: .. raw:: latex \diilbookstyleinputcell .. code:: python def get_health_status(self) -> Dict[str, Any]: return { "cold_start": { "available": self.cold_start_service is not None, "ready": self.is_cold_start_ready, "strategies": self.cold_start_service.strategy_names if self.cold_start_service else [], }, "recall": { "available": self.recall_service is not None, "strategies": len(self.recall_service.strategies) if self.recall_service else 0, }, "ranking": { "available": self.ranking_service is not None, "ready": self.is_ranking_ready, "strategy": self.ranking_service.active_strategy if self.ranking_service else None, }, "reranking": { "available": self.reranking_service is not None, "ready": self.is_reranking_ready, "strategies": self.reranking_service.strategy_names if self.reranking_service else [], } } 这个状态信息可以通过 ``/health`` 接口暴露,用于监控系统的运行状况。如果某个组件不可用,可以及时发现并处理。