10.4. 在线流程

本节将详细介绍在线推理流程的实现,包括冷启动检测、多路召回、精准排序和多样性重排。

10.4.1. 代码结构

在线流水线的代码位于 web_project/backend/online/ 目录下:

online/
├── pipeline.py               # 推荐主流程
├── cold_start/               # 冷启动处理
│   ├── detector.py           # 冷启动检测
│   ├── service.py            # 冷启动服务
│   ├── ucb_genre.py          # UCB 类型探索
│   └── preferred_genre.py    # 偏好类型策略
├── recall/                   # 多路召回
│   ├── service.py            # 召回服务与融合
│   ├── youtubednn.py         # YoutubeDNN 召回
│   ├── item_based.py         # 物品相似度召回
│   └── trending.py           # 热门召回
├── ranking/                  # 排序模型
│   ├── service.py            # 排序服务
│   └── deepfm.py             # DeepFM 排序
└── reranking/                # 重排策略
    ├── service.py            # 重排服务
    └── dispersion.py         # 打散策略

离线流程的产出物(共享目录中的模型、Redis中的用户特征、预计算的物品向量)是在线服务的基础。在线流程需要在200毫秒内完成推荐,这要求模型推理快、数据访问高效、多阶段协同。

整个流程封装在 RecommendationPipeline 类中(online/pipeline.py):

class RecommendationPipeline:
    def __init__(self):
        self.recall_service = get_recall_service()
        self.ranking_service = get_ranking_service()
        self.reranking_service = get_reranking_service()
        self.cold_start_service = get_cold_start_service()

    async def recommend(
        self,
        user_features: Dict[str, Any],
        item_features_provider: Optional[ItemFeaturesProvider] = None,
        config: Optional[PipelineConfig] = None
    ) -> RecommendationResult:
        config = config or PipelineConfig()

        # 冷启动检测
        if config.enable_cold_start and self._is_cold_start(user_features, config):
            return await self._cold_start_recommend(user_features, item_features_provider, config)

        # 正常流程:召回 → 排序 → 重排
        candidates = await self._recall(user_features, config.recall_top_k)
        ranked_items, ranking_strategy = await self._rank(
            user_features, candidates, item_features_provider, config.ranking_top_k
        )
        reranked_items, reranking_strategies = await self._rerank(
            ranked_items, user_features, item_features_provider
        )

        return RecommendationResult(items=reranked_items, ...)

PipelineConfig 控制各阶段的行为:

@dataclass
class PipelineConfig:
    recall_top_k: int = 100       # 召回阶段返回的候选数量
    ranking_top_k: int = 20       # 排序阶段返回的结果数量
    enable_ranking: bool = True   # 是否启用排序模型
    enable_reranking: bool = True # 是否启用重排

    # 冷启动配置
    enable_cold_start: bool = True
    cold_start_threshold: int = 5  # 交互次数少于此阈值的用户为冷启动用户
    cold_start_top_k: int = 20

下面依次介绍冷启动检测、多路召回、精准排序和多样性重排的具体实现。

10.4.2. 冷启动检测与处理

冷启动是推荐系统中的经典难题。当用户刚注册时,系统没有关于其偏好的行为数据,传统的协同过滤和向量召回都无法有效工作。本项目通过独立的冷启动模块来处理这类用户。

10.4.2.1. 冷启动检测

本节代码位于 online/cold_start/detector.py

冷启动检测的逻辑很简单:如果用户的历史交互次数少于设定的阈值,就认为是冷启动用户。

class ColdStartDetector:
    def __init__(self, threshold: int = 5):
        self.threshold = threshold

    def is_cold_start(self, user_features: Dict[str, Any]) -> bool:
        hist_movie_ids = user_features.get("hist_movie_ids", [])

        if not hist_movie_ids:
            return True

        return len(hist_movie_ids) < self.threshold

阈值的选择需要权衡。设得太低,用户可能还没建立稳定的偏好模式就进入正常流程;设得太高,用户需要更长时间才能体验到个性化推荐。本项目默认使用 5 次交互作为阈值,这个值可以根据实际业务需求调整。

10.4.2.2. 冷启动策略的设计

对于冷启动用户,我们设计了三种策略:

  1. UCB 类型探索策略:使用 Upper Confidence Bound 算法,在探索新类型和利用已知偏好之间取得平衡。

  2. 偏好类型策略:如果用户在注册时设置了偏好类型(preferred_genres),基于这些类型推荐高评分电影。

  3. 热门推荐策略:作为兜底策略,推荐近期的热门电影。

这三种策略通过 ColdStartService 统一管理:

class ColdStartService:
    def __init__(self):
        self.detector = ColdStartDetector(threshold=5)

        self.strategies: List[ColdStartStrategy] = [
            UCBGenreStrategy(),        # 优先级 1:UCB 探索
            PreferredGenreStrategy(),  # 优先级 2:用户偏好
            PopularRecentStrategy(),   # 优先级 3:热门兜底
        ]

    async def recommend(self, user_features: Dict[str, Any], top_k: int = 20):
        # 找出可用的策略
        applicable_strategies = [
            s for s in self.strategies if s.can_handle(user_features)
        ]

        # 根据用户状态分配配额
        if has_ucb_data:
            allocations = self._get_ucb_weighted_allocation(applicable_strategies, top_k)
        elif has_preferences:
            allocations = self._get_preference_weighted_allocation(applicable_strategies, top_k)
        else:
            allocations = self._get_fallback_allocation(applicable_strategies, top_k)

        # 并行执行各策略
        results_list = await asyncio.gather(*[
            self._run_strategy(strategy, user_features, k)
            for strategy, k in allocations if k > 0
        ])

        # 合并结果
        return self._merge_results(results_list, top_k)

策略的配额分配根据用户状态动态调整。如果用户已经有一些评分记录,UCB 策略获得 70% 的配额;如果只有偏好设置,偏好策略获得 80% 的配额;如果什么都没有,则完全使用热门推荐。

10.4.2.3. UCB 类型探索策略

本节代码位于 online/cold_start/ucb_genre.py

UCB(Upper Confidence Bound)是解决探索与利用(Exploration vs Exploitation)问题的经典算法。在冷启动场景中,我们面临一个选择:是推荐用户已经表现出兴趣的类型(利用),还是尝试推荐新类型来发现潜在兴趣(探索)?

UCB 算法通过一个置信上界公式来平衡这两个目标:

(10.4.1)\[\text{UCB}(g) = \bar{r}_g + c \cdot \sqrt{\frac{\ln N}{n_g}}\]

其中 \(\bar{r}_g\) 是类型 \(g\) 的历史平均评分,\(N\) 是总推荐次数,\(n_g\) 是类型 \(g\) 被推荐的次数,\(c\) 是探索系数。

公式的第一项代表利用:平均评分越高的类型得分越高。第二项代表探索:推荐次数越少的类型,不确定性越大,探索奖励也越高。

class UCBGenreStrategy(ColdStartStrategy):
    def __init__(self, exploration_c: float = 1.5):
        self.exploration_c = exploration_c
        self.redis_client = redis.Redis.from_url(settings.redis_url)

    def _calculate_ucb_scores(
        self,
        stats: Dict[str, Dict[str, float]],
        total_n: int
    ) -> Dict[str, float]:
        scores = {}

        for genre in self.available_genres:
            if genre in stats and stats[genre]["n"] > 0:
                n = stats[genre]["n"]
                reward = stats[genre]["reward"]
                avg_reward = reward / n

                # UCB 探索奖励
                exploration_bonus = self.exploration_c * math.sqrt(
                    math.log(total_n + 1) / (n + 1e-6)
                )

                scores[genre] = avg_reward + exploration_bonus
            else:
                # 未探索的类型给予最高探索分
                scores[genre] = 1.0 + self.exploration_c * 2

        return scores

每个用户的 UCB 统计信息存储在 Redis 中,键为 user:{user_id}:genre_ucb。当用户对电影评分时,我们更新对应类型的统计:

def update_ucb_genre_stats(
    user_id: int,
    movie_genres: List[str],
    rating: int
) -> None:
    # 归一化评分到 [0, 1]
    normalized_reward = rating / 10.0

    key = f"user:{user_id}:genre_ucb"

    for genre in movie_genres:
        current_raw = redis_client.hget(key, genre)

        if current_raw:
            current = json.loads(current_raw)
            current["n"] = current.get("n", 0) + 1
            current["reward"] = current.get("reward", 0) + normalized_reward
        else:
            current = {"n": 1, "reward": normalized_reward}

        redis_client.hset(key, genre, json.dumps(current))

这个设计的优点是:随着用户产生更多的评分行为,系统会逐渐了解用户的偏好,UCB 的“利用”成分会增加;同时,对于用户从未接触过的类型,UCB 仍然会给予一定的推荐机会,避免陷入“信息茧房”。

10.4.2.4. 偏好类型策略

本节代码位于 online/cold_start/preferred_genre.py

如果用户在注册时设置了偏好类型,这是一个宝贵的信息来源。偏好类型策略通过 Elasticsearch 查询这些类型中的高评分电影:

class PreferredGenreStrategy(ColdStartStrategy):
    def can_handle(self, user_features: Dict[str, Any]) -> bool:
        preferred_genres = user_features.get("preferred_genres", [])
        return bool(preferred_genres)

    async def recommend(self, user_features: Dict[str, Any], k: int):
        preferred_genres = user_features.get("preferred_genres", [])

        query = {
            "bool": {
                "must": [
                    {"terms": {"genres.keyword": preferred_genres}}
                ],
                "filter": [
                    {"range": {"avg_rating": {"gte": 6.0}}},
                    {"range": {"rating_count": {"gte": 20}}}
                ]
            }
        }

        res = es_service.client.search(
            index=es_service.INDEX_NAME,
            query=query,
            sort=[
                {"avg_rating": {"order": "desc"}},
                {"year": {"order": "desc"}}
            ],
            size=k
        )

        # 处理结果...

查询条件中增加了评分和评分数的过滤,确保推荐的是经过一定用户验证的优质电影,而不是冷门或低质量内容。

10.4.3. 多路召回

对于有足够历史行为的用户,系统进入正常的推荐流程。第一阶段是召回,目标是从全量电影库中快速筛选出一批候选电影。

10.4.3.1. 为什么需要多路召回

2.3节 中我们介绍了 YoutubeDNN 召回模型。既然已经有了一个不错的召回模型,为什么还需要多路召回?

原因在于单一召回策略的局限性。向量召回依赖模型学习到的表征,但模型可能无法捕捉所有相关性。例如,用户最近看了一部新上映的小众电影,由于训练数据中这部电影的样本很少,模型对它的向量表征可能不够准确,导致召回结果不理想。

多路召回的思路是“不要把鸡蛋放在一个篮子里”。我们同时运行多种召回策略,每种策略从不同角度检索候选,最后将结果合并。这样可以提高召回的覆盖率和多样性。

本项目实现了三种召回策略:

class RecallService:
    def __init__(self):
        self.strategies: List[RecallStrategy] = [
            UserPreferenceRecallStrategy(),  # 用户偏好类目召回
            ItemEmbeddingRecallStrategy(),   # 物品相似度召回
            YouTubeDNNRecallStrategy(),      # 向量召回
        ]

10.4.3.2. YoutubeDNN 向量召回

本节代码位于 online/recall/youtubednn.py

YoutubeDNN 召回的核心思想是:将用户和物品都映射到同一个向量空间,通过向量相似度来衡量用户对物品的兴趣。在离线阶段,我们已经训练好了模型并预计算了所有电影的向量。在线阶段,我们需要:

  1. 根据用户的实时特征计算用户向量

  2. 在物品向量空间中检索最相似的电影

用户特征的预处理包括类别特征编码和历史序列处理:

class YouTubeDNNRecallStrategy(RecallStrategy):
    def preprocess_user(self, user_features: Dict[str, Any], max_hist_len: int = 10):
        inputs = {}
        encoders = self.resource_manager.encoders

        # 1. 处理标量特征(user_id, gender, age, occupation, zip_code)
        for feat in ["user_id", "gender", "age", "occupation", "zip_code"]:
            raw_val = user_features.get(feat)
            if raw_val is not None and feat in encoders:
                try:
                    val = encoders[feat].transform([str(raw_val)])[0] + 1
                except:
                    val = 0  # 未知值
            else:
                val = 0
            inputs[feat] = np.array([val])

        # 2. 处理历史序列特征
        hist_mids_raw = user_features.get("hist_movie_ids", [])

        # 编码电影ID
        hist_mids_encoded = []
        movie_le = encoders.get("movie_id")
        if movie_le:
            for mid in hist_mids_raw:
                if mid in movie_le.classes_:
                    enc = movie_le.transform([mid])[0] + 1
                    hist_mids_encoded.append(enc)

        # 编码类型(展开为列表)
        hist_genres_encoded = []
        for mid in hist_mids_raw:
            genres = self.resource_manager.movie_genre_map.get(mid, [])
            hist_genres_encoded.extend(genres)

        # 填充/截断到固定长度
        def pad(seq, max_len):
            seq = seq[-max_len:]  # 保留最近的
            return [0] * (max_len - len(seq)) + seq

        inputs["hist_movie_id"] = np.array([pad(hist_mids_encoded, max_hist_len)])
        inputs["hist_genres"] = np.array([pad(hist_genres_encoded, max_hist_len)])

        return inputs

有了预处理后的输入,召回过程就是标准的向量检索:

def _recall_sync(self, user_context: Dict[str, Any], k: int):
    model_inputs = self.preprocess_user(user_context)

    # 计算用户向量
    user_emb = self.resource_manager.user_model.predict(model_inputs, verbose=0)
    user_emb = user_emb / np.linalg.norm(user_emb, axis=1, keepdims=True)

    # 计算与所有物品的相似度(内积)
    scores = np.dot(user_emb, self.resource_manager.item_embedding_matrix.T)[0]

    # 取 Top K
    top_indices = np.argsort(scores)[::-1][:k]

    results = []
    for idx in top_indices:
        if idx == 0: continue  # 跳过 padding
        if idx - 1 < len(self.resource_manager.all_movie_ids):
            raw_mid = self.resource_manager.all_movie_ids[idx - 1]
            results.append({
                "movie_id": int(raw_mid),
                "score": float(scores[idx]),
                "recall_type": "youtube_dnn"
            })

    return results

由于用户向量和物品向量都经过归一化,内积等价于余弦相似度。在生产环境中,如果物品库很大(百万级以上),可以使用 FAISS 等向量检索库来加速。本项目的电影库只有 3000 多部,直接计算内积即可满足延迟要求。

10.4.3.3. 物品相似度召回(I2I)

本节代码位于 online/recall/item_based.py

物品相似度召回的思路更加直接:用户刚看过什么电影,就推荐与之相似的电影。这种策略特别适合捕捉用户的即时兴趣。

class ItemEmbeddingRecallStrategy(RecallStrategy):
    async def recall(self, user_context: Dict[str, Any], k: int):
        hist_movie_ids = user_context.get("hist_movie_ids", [])
        if not hist_movie_ids:
            return []

        # 取最近观看的电影
        last_movie_id = hist_movie_ids[0]

        # 获取该电影的向量
        enc_idx = movie_le.transform([last_movie_id])[0] + 1
        target_emb = self.resource_manager.item_embedding_matrix[enc_idx]
        target_emb = target_emb / np.linalg.norm(target_emb)

        # 计算与所有物品的相似度
        scores = np.dot(self.resource_manager.item_embedding_matrix, target_emb)

        # 取 Top K(排除自身)
        top_indices = np.argsort(scores)[::-1][:k+2]

        results = []
        for idx in top_indices:
            if idx == 0 or idx == enc_idx:
                continue
            # 构建结果...

        return results

这里复用了 YoutubeDNN 训练得到的物品向量。由于物品向量本身就蕴含了协同过滤的信息(相似用户喜欢的物品向量接近),I2I 召回可以有效地找到与种子电影风格相近的候选。物品向量也可以用其他方法生成,比如使用Word2Vec、Item2Vec等方法生成。

10.4.3.4. 用户偏好类目召回

偏好类目召回是一种基于统计的策略。它根据用户的历史行为统计出偏好的类型,然后从这些类型中召回热门电影。

这种策略的优势在于稳定性。即使用户的最近行为出现偏差(例如偶尔看了一部不感兴趣的电影),偏好类目召回仍然能够推荐用户长期喜欢的类型。

用户的偏好类目在离线阶段计算并存入 Redis:

# 离线流程中的计算逻辑
user_movies = df_movies[df_movies["movie_id"].isin(movie_ids)]
all_genres = []
for genres in user_movies["genres"]:
    all_genres.extend(genres)

if all_genres:
    counts = Counter(all_genres)
    top_3 = [g for g, c in counts.most_common(3)]
    redis_client.hset(f"user:{user_id}:profile", "frequent_genres", ",".join(top_3))

在线召回时,从 Redis 读取偏好类目,查询对应类型的电影。

10.4.3.5. Snake Merge 多路融合

本节代码位于 online/recall/service.py

多路召回产生多组候选列表后,需要将它们合并成一个统一的候选集。简单的做法是按分数排序后合并,但这可能导致某一路占据主导地位。

我们采用 Snake Merge(蛇形合并)策略:从各路召回中轮流取出候选,确保每路都有代表性的候选进入排序阶段。

def _merge_results_round_robin(
    self,
    results_list: List[List[Dict[str, Any]]],
    top_k: int
) -> List[Dict[str, Any]]:
    merged_candidates = []
    seen_movie_ids = set()

    sources = [r if r else [] for r in results_list]
    source_pointers = [0] * len(sources)
    direction = 1  # 1 表示正向,-1 表示反向
    current_idx = 0

    while len(merged_candidates) < top_k:
        # 检查是否所有源都已耗尽
        all_exhausted = all(
            source_pointers[i] >= len(sources[i])
            for i in range(len(sources))
        )
        if all_exhausted:
            break

        # 从当前源取一个候选
        src_list = sources[current_idx]
        ptr = source_pointers[current_idx]

        if ptr < len(src_list):
            item = src_list[ptr]
            source_pointers[current_idx] += 1

            mid = item["movie_id"]
            if mid not in seen_movie_ids:
                merged_candidates.append(item)
                seen_movie_ids.add(mid)

        # 移动到下一个源(蛇形)
        current_idx += direction

        if direction == 1 and current_idx >= len(sources):
            direction = -1
            current_idx = len(sources) - 1
        elif direction == -1 and current_idx < 0:
            direction = 1
            current_idx = 0

    return merged_candidates

蛇形合并的名称来源于其遍历顺序:假设有三路召回 A、B、C,合并顺序是 A→B→C→C→B→A→A→B→C→…,像蛇一样来回穿梭。

10.4.4. 精准排序

召回阶段从全量电影库中筛选出约 100 个候选电影,但这些候选的顺序是由召回分数决定的,不够精准。排序阶段的任务是使用更复杂的模型,对这些候选进行精确的 CTR(点击率)预估,并按预测分数重新排序。

10.4.4.1. 排序模型的在线推理

本节代码位于 online/ranking/deepfm.py

本项目使用 DeepFM 作为排序模型。在 3.2.1.6节 中我们介绍了 DeepFM 的结构:它同时包含 FM 部分(学习二阶特征交叉)和 DNN 部分(学习高阶非线性),两部分的输出相加后经过 Sigmoid 函数得到点击概率。

在线推理的核心是特征构造。对于每个用户-候选对,我们需要提取用户特征和物品特征,编码成模型期望的输入格式:

class DeepFMRankingStrategy(RankingStrategy):
    def _prepare_batch_inputs(
        self,
        user_features: Dict[str, Any],
        candidates: List[Dict[str, Any]]
    ) -> Dict[str, np.ndarray]:
        rm = self.resource_manager
        batch_size = len(candidates)
        inputs = {}

        # 用户特征(所有候选共享,需要复制)
        for feat in rm.user_features:
            raw_val = user_features.get(feat)
            encoded_val = rm.encode_feature(feat, raw_val)
            inputs[feat] = np.full(batch_size, encoded_val, dtype=np.int32)

        # 物品特征(每个候选不同)
        for feat in rm.item_features:
            encoded_values = []
            for candidate in candidates:
                raw_val = candidate.get(feat)
                encoded_val = rm.encode_feature(feat, raw_val)
                encoded_values.append(encoded_val)
            inputs[feat] = np.array(encoded_values, dtype=np.int32)

        return inputs

用户特征包括 user_id、gender、age、occupation 等,这些对于同一次请求的所有候选都是相同的。物品特征包括 movie_id、genres、isAdult、startYear 等,每个候选各不相同。

特征编码使用离线阶段训练时保存的 LabelEncoder:

def encode_feature(self, feat_name: str, raw_value: Any) -> int:
    if raw_value is None:
        return 0  # 未知值编码为 0

    encoder = self.encoders.get(feat_name)
    if encoder is None:
        return 0

    try:
        # 确保类型匹配
        if isinstance(encoder.classes_[0], str) and not isinstance(raw_value, str):
            raw_value = str(raw_value)

        if raw_value in encoder.classes_:
            return int(encoder.transform([raw_value])[0]) + 1
        else:
            return 0  # 训练时未见过的值
    except Exception:
        return 0

编码时有一个重要的细节:编码值从 1 开始,0 预留给未知值。这与离线训练时的处理保持一致。

准备好输入后,调用模型进行批量预测:

def _rank_sync(
    self,
    user_features: Dict[str, Any],
    candidates: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
    # 准备输入
    inputs = self._prepare_batch_inputs(user_features, candidates)

    # 批量预测
    predictions = self.resource_manager.ranking_model.predict(
        inputs,
        verbose=0,
        batch_size=min(len(candidates), 256)
    )

    # 展平预测结果
    if predictions.ndim > 1:
        predictions = predictions.flatten()

    # 组合结果并排序
    ranked_results = []
    for i, candidate in enumerate(candidates):
        ranked_results.append({
            "movie_id": candidate["movie_id"],
            "score": float(predictions[i]),      # 排序分数(CTR 预测)
            "recall_score": candidate.get("score", 0.0),  # 保留召回分数
            "recall_type": candidate.get("recall_type"),
        })

    ranked_results.sort(key=lambda x: x["score"], reverse=True)
    return ranked_results

批量预测相比逐个预测有显著的性能优势。TensorFlow 在批量推理时可以更好地利用 CPU 的向量化指令或 GPU 的并行计算能力。对于 100 个候选的批量预测,耗时通常在 10-30 毫秒。

10.4.4.2. 异步执行与降级策略

由于模型推理是 CPU 密集型操作,为了不阻塞事件循环,我们将其放在线程池中执行:

async def rank(
    self,
    user_features: Dict[str, Any],
    candidates: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
    if not self.is_ready:
        return candidates  # 模型未就绪时直接返回

    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(
        None,
        self._rank_sync,
        user_features,
        candidates
    )

当排序模型不可用时(例如模型文件加载失败),系统会自动降级到 FallbackRankingStrategy,直接使用召回分数作为排序依据:

class FallbackRankingStrategy(RankingStrategy):
    async def rank(
        self,
        user_features: Dict[str, Any],
        candidates: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        sorted_candidates = sorted(
            candidates,
            key=lambda x: x.get("score", 0.0),
            reverse=True
        )
        return [
            {
                "movie_id": c["movie_id"],
                "score": c.get("score", 0.0),
                "recall_score": c.get("score", 0.0),
                "recall_type": c.get("recall_type"),
            }
            for c in sorted_candidates
        ]

这种降级设计确保了服务的高可用性:即使排序模型出现问题,用户仍然可以获得推荐结果,只是质量有所下降。

10.4.5. 多样性重排

经过召回和排序后,我们得到了一个按 CTR 预测分数排序的电影列表。但这个列表可能存在多样性不足的问题:如果用户历史中动作片占比较高,排序模型可能会把动作片都排在前面,导致推荐列表清一色都是动作片。

用户体验研究表明,适度的多样性可以提升用户满意度和留存率。重排阶段的任务就是在保持整体相关性的前提下,增加推荐列表的多样性。

10.4.5.1. 连续打散策略

本节代码位于 online/reranking/dispersion.py

本项目采用“连续打散”(Consecutive Dispersion)策略,核心规则是:不允许超过 N 个相同属性的物品连续出现

以类型打散为例,设 N=2,则推荐列表中最多只能有 2 部相同类型的电影连续出现。如果原始排序是 [动作, 动作, 动作, 喜剧],打散后变为 [动作, 动作, 喜剧, 动作]。

算法的实现思路是:

  1. 按原始排序顺序遍历候选

  2. 如果当前候选可以加入结果列表(不违反连续约束),则加入

  3. 如果不能加入,则放入“待定队列”

  4. 每次加入新元素后,尝试从待定队列中取出可以加入的元素

  5. 最后将待定队列中剩余的元素追加到末尾

class ConsecutiveDispersionStrategy(RerankingStrategy):
    def __init__(
        self,
        feature_extractor: Callable[[Dict[str, Any]], Optional[str]],
        max_consecutive: int = 2,
        strategy_name: str = "consecutive_dispersion"
    ):
        self._feature_extractor = feature_extractor
        self._max_consecutive = max_consecutive
        self._strategy_name = strategy_name

    def _can_add(self, item: Dict[str, Any], result: List[Dict[str, Any]]) -> bool:
        """检查加入该元素是否会违反连续约束"""
        if len(result) < self._max_consecutive:
            return True

        item_key = self._feature_extractor(item)

        # 没有该属性的元素总是可以加入
        if item_key is None:
            return True

        # 检查最近 (max_consecutive - 1) 个元素
        recent_keys = [
            self._feature_extractor(r)
            for r in result[-(self._max_consecutive - 1):]
        ]

        # 如果最近的元素都与当前元素相同,则不能加入
        return not all(k == item_key for k in recent_keys)

    async def rerank(
        self,
        items: List[Dict[str, Any]],
        user_features: Dict[str, Any] = None
    ) -> List[Dict[str, Any]]:
        if len(items) <= self._max_consecutive:
            return items

        result = []
        deferred = []

        for item in items:
            if self._can_add(item, result):
                result.append(item)
                # 尝试插入待定队列中的元素
                self._try_insert_deferred(result, deferred)
            else:
                deferred.append(item)

        # 将剩余的待定元素追加到末尾
        result.extend(deferred)
        return result

10.4.5.2. 预定义的打散策略

基于 ConsecutiveDispersionStrategy,我们定义了两种具体的打散策略:

类型打散:确保不超过 2 部相同类型的电影连续出现。

def _extract_genre(item: Dict[str, Any]) -> Optional[str]:
    """提取主要类型(取第一个类型)"""
    genres = item.get("genres")
    if isinstance(genres, list) and genres:
        return genres[0]
    if isinstance(genres, str):
        return genres.split("|")[0].strip() if "|" in genres else genres.strip()
    return None

class GenreDispersionStrategy(ConsecutiveDispersionStrategy):
    def __init__(self, max_consecutive: int = 2):
        super().__init__(
            feature_extractor=_extract_genre,
            max_consecutive=max_consecutive,
            strategy_name="genre_dispersion"
        )

年代打散:确保不超过 2 部相同年代的电影连续出现。年代按 10 年分桶(1990s、2000s 等)。

def _extract_decade(item: Dict[str, Any]) -> Optional[str]:
    """提取年代桶"""
    year = item.get("year") or item.get("startYear")
    if year is None:
        return None

    try:
        year_int = int(year)
        decade = (year_int // 10) * 10
        return f"{decade}s"
    except (ValueError, TypeError):
        return None

class DecadeDispersionStrategy(ConsecutiveDispersionStrategy):
    def __init__(self, max_consecutive: int = 2):
        super().__init__(
            feature_extractor=_extract_decade,
            max_consecutive=max_consecutive,
            strategy_name="decade_dispersion"
        )

10.4.5.3. 策略链的组合

重排服务支持多个策略的链式组合。策略按顺序执行,每个策略的输出作为下一个策略的输入:

class RerankingService:
    def __init__(self):
        self._strategies: List[RerankingStrategy] = [
            GenreDispersionStrategy(max_consecutive=2),
            DecadeDispersionStrategy(max_consecutive=2),
        ]

    async def rerank(
        self,
        items: List[Dict[str, Any]],
        user_features: Optional[Dict[str, Any]] = None
    ) -> List[Dict[str, Any]]:
        if not items or not self._enabled:
            return items

        result = items
        for strategy in self._strategies:
            if strategy.is_ready:
                result = await strategy.rerank(result, user_features)

        return result

通过这种链式设计,我们可以灵活地组合多种打散维度。例如先按类型打散,再按年代打散,确保推荐列表在内容类型和时间跨度两个维度上都具有多样性。

打散算法的一个重要特性是保序性:在满足连续约束的前提下,尽量保持原始的排序顺序。这意味着高分的电影仍然会排在前面,只是位置可能略有调整。这样既保证了推荐的相关性,又增加了多样性。

10.4.6. API 集成与服务启动

在线流程的各个组件开发完成后,需要将它们整合到 FastAPI 应用中,对外提供 HTTP 接口。

10.4.6.1. 推荐 API 的设计

本节代码位于 app/api/v1/endpoints/recommendations.py

推荐接口的核心逻辑如下:

@router.post("/recommend")
async def get_recommendations(
    request: RecommendRequest,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    pipeline = get_pipeline()
    if not pipeline.is_ready:
        raise HTTPException(status_code=503, detail="Recommendation service not ready")

    # 构建用户特征
    user_features = await build_user_features(current_user, db)

    # 定义物品特征获取函数
    async def item_features_provider(movie_ids: List[int]) -> Dict[int, Dict]:
        movies = await get_movies_by_ids(db, movie_ids)
        return {
            m.id: {
                "movie_id": m.id,
                "genres": m.genres.split("|") if m.genres else [],
                "year": m.year,
                "isAdult": m.is_adult,
            }
            for m in movies
        }

    # 执行推荐流程
    config = PipelineConfig(
        recall_top_k=request.recall_top_k or 100,
        ranking_top_k=request.top_k or 20,
        enable_cold_start=True,
    )

    result = await pipeline.recommend(
        user_features=user_features,
        item_features_provider=item_features_provider,
        config=config
    )

    # 查询电影详细信息
    movie_ids = [item.movie_id for item in result.items]
    movies = await get_movies_by_ids(db, movie_ids)
    movie_map = {m.id: m for m in movies}

    # 组装响应
    return {
        "recommendations": [
            {
                "movie_id": item.movie_id,
                "title": movie_map[item.movie_id].title,
                "poster_url": movie_map[item.movie_id].poster_url,
                "genres": movie_map[item.movie_id].genres,
                "year": movie_map[item.movie_id].year,
                "score": item.score,
                "recall_type": item.recall_type,
            }
            for item in result.items
            if item.movie_id in movie_map
        ],
        "is_cold_start": result.is_cold_start,
        "ranking_strategy": result.ranking_strategy,
    }

接口的设计有几个要点:

  1. 用户特征的构建:从数据库和 Redis 获取用户的基本信息和行为序列,组装成特征字典。

  2. 物品特征的惰性加载:通过 item_features_provider 回调函数,只在需要时才查询物品特征。这避免了在召回阶段就加载大量可能用不到的数据。

  3. 结果组装:流程返回的是电影 ID 和分数,还需要查询数据库获取电影的标题、海报等展示信息。

10.4.6.2. 资源加载与单例模式

在线服务启动时需要加载模型文件和词表。这些资源占用内存较大,应该在进程级别共享,而不是每个请求都重新加载。

本项目使用单例模式管理资源。以召回资源管理器为例:

class RecallResourceManager:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self):
        if self._initialized:
            return

        self.user_model = None
        self.item_embedding_matrix = None
        self.encoders = {}
        self.all_movie_ids = []

        self._initialized = True

    def _ensure_resources_loaded(self):
        """懒加载资源"""
        if self.user_model is not None:
            return

        # 从本地目录加载模型
        self._load_from_local()

    def _load_from_local(self):
        deploy_dir = Path(os.getenv("MODEL_DEPLOY_DIR"))

        # 加载用户模型
        with open(deploy_dir / "model" / "user_recall" / "active.json") as f:
            version_info = json.load(f)
        model_path = deploy_dir / version_info["path"]
        self.user_model = tf.keras.models.load_model(model_path)

        # 加载物品向量
        self.item_embedding_matrix = np.load(deploy_dir / "item_embeddings.npy")

        # 加载词表
        with open(deploy_dir / "vocab_dict.pkl", "rb") as f:
            vocab_dict = pickle.load(f)
        self.encoders = vocab_dict

采用惰性加载的方式,资源在第一次使用时才从本地目录加载。这样可以加快服务启动速度,同时确保资源只加载一次。

10.4.6.3. 健康检查

为了监控服务状态,流程提供了健康检查接口:

def get_health_status(self) -> Dict[str, Any]:
    return {
        "cold_start": {
            "available": self.cold_start_service is not None,
            "ready": self.is_cold_start_ready,
            "strategies": self.cold_start_service.strategy_names if self.cold_start_service else [],
        },
        "recall": {
            "available": self.recall_service is not None,
            "strategies": len(self.recall_service.strategies) if self.recall_service else 0,
        },
        "ranking": {
            "available": self.ranking_service is not None,
            "ready": self.is_ranking_ready,
            "strategy": self.ranking_service.active_strategy if self.ranking_service else None,
        },
        "reranking": {
            "available": self.reranking_service is not None,
            "ready": self.is_reranking_ready,
            "strategies": self.reranking_service.strategy_names if self.reranking_service else [],
        }
    }

这个状态信息可以通过 /health 接口暴露,用于监控系统的运行状况。如果某个组件不可用,可以及时发现并处理。