10.3. 离线流水线

在上一节中,我们设计了推荐系统的整体架构,明确了离线系统和在线系统的边界。本节将聚焦于离线流程的具体实现,包括特征工程、模型训练,以及将在线推理时使用的特征和模型部署上线三个核心环节。

10.3.1. 代码结构

离线流水线的代码位于 web_project/backend/offline/ 目录下:

offline/
├── pipeline.py                     # 流水线入口
├── config.py                       # 配置管理
├── feature/                        # 特征工程
│   ├── preprocess_retrieval.py     # 召回模型特征处理
│   └── preprocess_ranking.py       # 排序模型特征处理
├── training/                       # 模型训练
│   ├── train_retrieval.py          # 召回模型训练
│   └── train_ranking.py            # 排序模型训练
└── storage/                        # 存储部署
    ├── redis_ingest.py             # 特征上线
    └── local_deploy.py             # 模型部署

整个流程通过 pipeline.py 统一管理,支持按需运行指定步骤:

# offline/pipeline.py
def main():
    parser = argparse.ArgumentParser(description="FunRec Offline Pipeline")
    parser.add_argument("--steps", type=str, default="all")
    args = parser.parse_args()

    steps = args.steps.split(",")
    if "all" in steps:
        steps = ["retrieval_preprocess", "ranking_preprocess",
                 "retrieval_training", "ranking_training",
                 "ingest", "deploy"]

    if "retrieval_preprocess" in steps:
        run_retrieval_preprocessing()
    if "ranking_preprocess" in steps:
        run_ranking_preprocessing()
    if "retrieval_training" in steps:
        run_retrieval_training()
    if "ranking_training" in steps:
        run_ranking_training()
    if "ingest" in steps:
        ingest_to_redis(flush=args.flush_redis)
    if "deploy" in steps:
        deploy_local()

这种模块化设计便于开发调试:可以选择只运行某些步骤,比如只重新训练排序模型而不改动召回模型。

配置集中在 offline/config.py

class Config:
    # 数据路径
    TEMP_DIR = Path(os.getenv("FUNREC_PROCESSED_DATA_PATH")) / "web_project"
    DATASET_DIR = Path(os.getenv("FUNREC_RAW_DATA_PATH"))

    # 特征工程参数
    MAX_SEQ_LEN = 10      # 历史序列最大长度
    EMB_DIM = 16          # Embedding 维度
    NEG_SAMPLE_SIZE = 20  # 负采样数量

    # 训练参数
    BATCH_SIZE = 128
    EPOCHS = 3
    LEARNING_RATE = 0.001

    # 存储服务配置
    DEPLOY_DIR = TEMP_DIR / "deployed_models"  # 模型部署目录
    REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")

环境变量配置数据路径和服务地址,便于切换部署环境。下面依次介绍特征工程、模型训练、特征上线和模型部署的具体实现。

10.3.2. 特征工程

特征工程是机器学习项目中最耗时但也最关键的环节。好的特征可以大幅提升模型效果,而特征处理的错误往往会导致模型完全失效。本项目需要为召回模型和排序模型分别构建训练样本。

10.3.2.1. 原始数据加载

MovieLens-1M 数据集包含三个核心表:

  • users.pkl:6040 个用户,包含性别、年龄、职业、邮编等人口统计特征

  • movies.pkl:3883 部电影,包含标题、类型、年份等属性

  • ratings.pkl:约 100 万条评分记录,每条记录包含用户 ID、电影 ID、评分(1-5)和时间戳

数据加载的代码很简单:

def load_raw_data():
    df_movies = pd.read_pickle(config.DATASET_DIR / "movies.pkl")
    df_ratings = pd.read_pickle(config.DATASET_DIR / "ratings.pkl")
    df_users = pd.read_pickle(config.DATASET_DIR / "users.pkl")
    return df_movies, df_ratings, df_users

原始数据经过了预处理,从 IMDB 补充了电影的海报 URL、演员、导演等元数据。这些元数据主要用于前端展示,在模型训练中使用的特征相对简单。

10.3.2.2. 召回模型的特征处理

本节代码位于 offline/feature/preprocess_retrieval.py

召回模型(YoutubeDNN)采用序列建模的方式,将用户的历史观影序列作为重要输入。这里需要解决几个问题:如何编码类别特征?如何构建行为序列?如何生成训练样本?

类别特征编码

推荐系统中的特征大多是类别型的:用户 ID、电影 ID、性别、类型等。这些特征需要编码为整数,才能输入神经网络的 Embedding 层。

def process_features(df_movies, df_ratings, df_users):
    # 用户特征编码
    user_sparse_feature_columns = ["user_id", "gender", "age", "occupation", "zip_code"]
    user_vocab = {}

    for feat_name in user_sparse_feature_columns:
        label_encoder = LabelEncoder()
        new_user_feature_df[feat_name + "_encode"] = (
            label_encoder.fit_transform(new_user_feature_df[feat_name]) + 1
        )
        user_vocab[feat_name] = label_encoder.classes_

    # 电影特征编码
    movie_encode_feature_columns = ["movie_id", "genres", "isAdult", "startYear"]
    movie_vocab = {}

    for feat_name in movie_encode_feature_columns:
        label_encoder = LabelEncoder()
        label_encoder.fit(new_movie_sparse_feature_df[feat_name].explode())

        if feat_name == "genres":
            # genres 是列表类型,需要特殊处理
            new_movie_sparse_feature_df[feat_name + "_encode"] = (
                new_movie_sparse_feature_df[feat_name].apply(
                    lambda x: label_encoder.transform(x) + 1
                )
            )
        else:
            new_movie_sparse_feature_df[feat_name + "_encode"] = (
                label_encoder.transform(new_movie_sparse_feature_df[feat_name]) + 1
            )

        movie_vocab[feat_name] = label_encoder.classes_

编码时有一个细节:所有编码值都从 1 开始,0 预留给未知值和填充。这是因为 Embedding 层的第 0 个向量通常用于表示“不存在”或“未知”的情况。

行为序列构建

YoutubeDNN 的核心思想是“预测用户下一个会观看的视频”。因此,我们需要构建滑动窗口形式的训练样本:给定用户的前 \(k\) 次观影记录,预测第 \(k+1\) 次观看的电影。

def generate_train_eval_samples(
    data_df, user_columns, item_columns,
    max_hist_seq_len=10, padding_value=0
):
    data_df.sort_values("timestamp", inplace=True)
    train_data_dict = defaultdict(list)
    test_data_dict = defaultdict(list)

    for user_id, grouped_feats in data_df.groupby("user_id"):
        if len(grouped_feats["movie_id"]) < 2:
            continue

        len_hist_seq = len(grouped_feats["movie_id"])

        # 测试集:使用最后一条记录
        test_data_dict["user_id"].append(user_id)
        for col in user_columns:
            test_data_dict[col].append(grouped_feats[col].iloc[0])
        for col in item_columns:
            # 历史序列:除最后一条外的所有记录
            test_data_dict["hist_" + col].append(
                add_padding(grouped_feats[col].tolist()[:-1],
                           padding_value, max_hist_seq_len)
            )
            # 目标物品:最后一条记录
            test_data_dict[col].append(grouped_feats[col].tolist()[-1])

        # 训练集:滑动窗口
        for i in range(1, len_hist_seq - 1):
            train_data_dict["user_id"].append(user_id)
            for col in user_columns:
                train_data_dict[col].append(grouped_feats[col].iloc[0])
            for col in item_columns:
                # 历史序列:前 i 条记录
                train_data_dict["hist_" + col].append(
                    add_padding(grouped_feats[col].tolist()[:i],
                               padding_value, max_hist_seq_len)
                )
                # 目标物品:第 i 条记录
                train_data_dict[col].append(grouped_feats[col].tolist()[i])

这种时序划分方式模拟了真实的推荐场景:模型只能使用过去的信息来预测未来。如果使用随机划分,可能会导致未来信息泄露,使模型在训练时表现良好但在实际服务中效果不佳。

序列填充

不同用户的历史行为长度不同,但模型需要固定长度的输入。我们采用左填充的方式,将较短的序列在左侧补零:

def add_padding(val, padding_value, max_seq_len):
    if isinstance(val, (list, tuple, np.ndarray)):
        if len(val) > 0 and isinstance(val[0], (list, tuple, np.ndarray)):
            # 处理嵌套列表(如 genres)
            val = list(itertools.chain(*val))[:max_seq_len]
        else:
            val = list(val)[:max_seq_len]
        # 左填充:新观看的在右边,符合时间顺序
        return [padding_value] * (max_seq_len - len(val)) + val
    else:
        return val

左填充的好处是最近的行为总是在序列的右侧,这与时间顺序一致。对于序列模型(如 RNN 或 Transformer)来说,这种排列方式更自然。

10.3.2.3. 排序模型的特征处理

本节代码位于 offline/feature/preprocess_ranking.py

排序模型(DeepFM)与召回模型的样本格式不同。排序模型是一个点击率预测模型,输入是用户-物品对,输出是该用户点击该物品的概率。因此,我们需要构建正负样本。

标签定义

MovieLens 数据集只有评分,没有显式的点击信号。我们根据评分高低来定义标签:

# 计算每个用户的平均评分
user_avg_ratings = df_ratings.groupby("user_id")["rating"].mean().reset_index()
df_ratings = df_ratings.merge(user_avg_ratings, on="user_id", how="left")

# 如果评分高于用户平均分 - 1,认为是正样本
df_ratings['is_click'] = (
    df_ratings['rating'] >= df_ratings['user_avg_rating'] - 1
).astype(int)

这种定义方式考虑了用户的评分偏好。有些用户普遍给高分,有些用户比较严格。使用相对于个人平均分的偏移来判断,可以减少这种个体差异的影响。

负样本采样

正样本直接来自用户的评分记录,但负样本需要额外构造。我们采用两种负采样策略:

  1. 困难负样本(Hard Negatives):用户曝光过但没有产生正向交互的物品。这些样本是“困难”的,因为用户看到了但选择不点击。

  2. 随机负样本(Random Negatives):从用户没有交互过的物品中随机采样。这些样本相对“简单”,用于扩充负样本数量。

def generate_negative_samples(
    df_merged, movie_vocab,
    neg_ratio_from_exposure=1,  # 每个正样本对应的困难负样本数
    neg_ratio_random=2,          # 每个正样本对应的随机负样本数
):
    all_movie_ids = set(range(1, len(movie_vocab["movie_id"]) + 1))

    positive_samples = df_merged[df_merged["is_click"] == 1].copy()
    hard_negative_pool = df_merged[df_merged["is_click"] == 0].copy()

    # 构建每个用户的困难负样本池
    user_hard_negatives = {}
    for user_id, group in hard_negative_pool.groupby("user_id_original"):
        user_hard_negatives[user_id] = group

    # 构建每个用户的交互历史
    user_interactions = df_merged.groupby("user_id_original")["movie_id"].apply(set).to_dict()

    negative_samples = []

    # 1. 采样困难负样本
    for user_id, user_positives in positive_samples.groupby("user_id_original"):
        user_hard_neg_pool = user_hard_negatives.get(user_id)
        if user_hard_neg_pool is None or len(user_hard_neg_pool) == 0:
            continue

        n_positives = len(user_positives)
        n_hard_neg_needed = n_positives * neg_ratio_from_exposure
        n_to_sample = min(len(user_hard_neg_pool), n_hard_neg_needed)

        if n_to_sample > 0:
            sampled = user_hard_neg_pool.sample(n=n_to_sample)
            negative_samples.append(sampled)

    # 2. 采样随机负样本
    for _, row in positive_samples.iterrows():
        user_id_orig = row["user_id_original"]
        user_interacted = user_interactions.get(user_id_orig, set())
        available_movies = list(all_movie_ids - user_interacted)

        if len(available_movies) < neg_ratio_random:
            continue

        neg_movie_ids = random.sample(available_movies, neg_ratio_random)
        # 构建负样本记录...

负采样的比例需要权衡。过多的负样本会导致正负样本不平衡,影响模型学习;过少则可能使模型无法区分正负样本。本项目使用 1:3 的正负比例(1 个困难负样本 + 2 个随机负样本)。

训练/测试集划分

与召回模型类似,排序模型也采用时序划分:按时间排序后,较早的样本用于训练,较晚的样本用于测试。

def split_train_test(df_final, test_ratio=0.2, by_time=True):
    if by_time and "timestamp" in df_final.columns:
        df_final = df_final.sort_values("timestamp")
        split_idx = int(len(df_final) * (1 - test_ratio))
        train_df = df_final.iloc[:split_idx]
        test_df = df_final.iloc[split_idx:]
    else:
        from sklearn.model_selection import train_test_split
        train_df, test_df = train_test_split(df_final, test_size=test_ratio)

    return train_df, test_df

时序划分可以更真实地评估模型在未来数据上的表现。在实际生产环境中,模型总是使用历史数据训练,然后对未来的请求进行预测。

10.3.3. 召回模型训练

本节代码位于 offline/training/train_retrieval.py

召回模型的任务是从全量物品库中快速筛选出候选集。本项目使用 YoutubeDNN 作为召回模型,它是一个双塔结构:用户塔将用户特征编码为向量,物品塔将物品特征编码为向量,二者的内积表示匹配程度。

10.3.3.1. 模型配置

模型配置定义了特征结构和训练参数:

model_config_dict = {
    "features": {
        "emb_dim": 16,
        "max_seq_len": 10,
        "task_names": ["movie_id"],
        "features": [
            # 用户特征
            {"name": "user_id", "group": ["user_dnn"], "vocab_size": feature_dict["user_id"]},
            {"name": "age", "group": ["user_dnn"], "vocab_size": feature_dict["age"]},
            {"name": "gender", "group": ["user_dnn"], "vocab_size": feature_dict["gender"]},
            {"name": "occupation", "group": ["user_dnn"], "vocab_size": feature_dict["occupation"]},
            {"name": "zip_code", "group": ["user_dnn"], "vocab_size": feature_dict["zip_code"]},

            # 目标物品
            {"name": "movie_id", "group": ["target_item"], "vocab_size": feature_dict["movie_id"]},

            # 历史行为序列
            {
                "name": "hist_movie_id",
                "emb_name": "movie_id",  # 与目标物品共享 Embedding
                "group": ["raw_hist_seq"],
                "combiner": "mean",      # 序列聚合方式
                "vocab_size": feature_dict["movie_id"]
            },
            {
                "name": "hist_genres",
                "emb_name": "genres",
                "group": ["raw_hist_seq"],
                "combiner": "mean",
                "vocab_size": feature_dict["genres"]
            },
        ]
    },
    "training": {
        "build_function": "funrec.models.youtubednn.build_youtubednn_model",
        "model_params": {
            "emb_dim": 16,
            "neg_sample": 20,
            "dnn_units": [64, 32],
            "label_name": "movie_id"
        },
        "optimizer": "adam",
        "optimizer_params": {"learning_rate": 0.001},
        "loss": "sampledsoftmaxloss",
        "batch_size": 128,
        "epochs": 3,
    },
}

这里有几个要点:

  1. Embedding 共享:历史序列中的电影 ID(hist_movie_id)与目标电影(movie_id)共享同一个 Embedding 表。这种设计减少了参数量,也使得历史偏好和目标物品在同一个向量空间中表示。

  2. 序列聚合:历史序列通过 mean 聚合,将变长的序列压缩为固定维度的向量。更复杂的聚合方式(如注意力机制)可能带来更好的效果,但也增加了计算成本。

  3. Sampled Softmax:由于物品数量较多(3000+),直接计算完整的 Softmax 开销较大。Sampled Softmax 只对正样本和一定数量的负样本计算损失,大幅加速训练。

10.3.3.2. 训练流程

训练流程封装在 run_retrieval_training 函数中:

def run_retrieval_training():
    # 加载数据
    train_eval_samples = pickle.load(open(config.TRAIN_DATA_PATH, "rb"))
    feature_dict = pickle.load(open(config.FEATURE_DICT_PATH, "rb"))

    train_data = train_eval_samples["train"]
    test_data = train_eval_samples["test"]

    # 构建配置
    cfg = FunRecConfig(
        data=model_config_dict["data"],
        features=model_config_dict["features"],
        training=model_config_dict["training"],
        evaluation=model_config_dict["evaluation"]
    )

    # 准备特征
    feature_columns, processed_data = prepare_features(
        cfg.features, train_data, test_data,
        dataset_config=data_config[cfg.data["dataset_name"]]
    )

    # 训练模型
    models = train_model(cfg.training, feature_columns, processed_data)

    # 评估
    metrics = evaluate_model(models, processed_data, cfg.evaluation, feature_columns)
    print(build_metrics_table(metrics))

    # 保存用户塔和物品塔
    user_model = models[1]
    item_model = models[2]
    user_model.save(config.SAVED_MODELS_DIR / "user_model")
    item_model.save(config.SAVED_MODELS_DIR / "item_model")

YoutubeDNN 训练完成后会返回三个模型:完整模型、用户塔模型、物品塔模型。在线服务时,我们只需要用户塔模型(实时计算用户向量)和预计算的物品向量(通过物品塔生成)。

10.3.3.3. 物品向量生成

召回阶段需要计算用户向量与所有物品向量的相似度。为了加速在线检索,我们在离线阶段预计算所有物品的向量:

# 生成所有物品的编码 ID
vocab_dict = pickle.load(open(config.VOCAB_DICT_PATH, "rb"))
all_movie_ids = sorted(list(vocab_dict["movie_id"]))
encoded_ids = np.arange(1, len(all_movie_ids) + 1)

# 通过物品塔计算向量
item_inputs = {"movie_id": encoded_ids}
embeddings = item_model.predict(item_inputs, verbose=0)

# 归一化(使内积等价于余弦相似度)
embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)

# 保存
np.save(config.ITEM_EMB_PATH, embeddings)
np.save(config.MOVIE_IDS_PATH, np.array(all_movie_ids))

向量归一化是一个重要步骤。归一化后,向量的内积等价于余弦相似度,取值范围在 [-1, 1] 之间。这使得相似度的解释更加直观,也便于设置阈值进行过滤。

2.3节 中,我们详细介绍了 YoutubeDNN 的模型结构和训练技巧。这里不再赘述模型细节,重点关注工程实现。

10.3.4. 排序模型训练

本节代码位于 offline/training/train_ranking.py

排序模型的任务是对召回阶段筛选出的候选进行精确打分。本项目使用 DeepFM 作为排序模型,它结合了 FM(捕捉二阶特征交叉)和 DNN(捕捉高阶非线性)的优势。

10.3.4.1. 模型配置

DeepFM 的配置与 YoutubeDNN 有所不同。它不需要区分用户塔和物品塔,所有特征都作为模型的输入:

model_config_dict = {
    "features": {
        "emb_dim": 16,
        "task_names": ["is_click"],  # 二分类标签
        "features": [
            # 用户特征
            {"name": "user_id", "group": ["deepfm", "linear"],
             "vocab_size": feature_dict["user_id"]},
            {"name": "gender", "group": ["deepfm", "linear"],
             "vocab_size": feature_dict["gender"]},
            {"name": "age", "group": ["deepfm", "linear"],
             "vocab_size": feature_dict["age"]},
            {"name": "occupation", "group": ["deepfm", "linear"],
             "vocab_size": feature_dict["occupation"]},
            {"name": "zip_code", "group": ["deepfm", "linear"],
             "vocab_size": feature_dict["zip_code"]},

            # 物品特征
            {"name": "movie_id", "group": ["deepfm", "linear"],
             "vocab_size": feature_dict["movie_id"]},
            {"name": "genres", "group": ["deepfm", "linear"],
             "vocab_size": feature_dict["genres"]},
            {"name": "isAdult", "group": ["deepfm", "linear"],
             "vocab_size": feature_dict["isAdult"]},
            {"name": "startYear", "group": ["deepfm", "linear"],
             "vocab_size": feature_dict["startYear"]},
        ]
    },
    "training": {
        "build_function": "funrec.models.deepfm.build_deepfm_model",
        "model_params": {
            "dnn_units": [128, 64, 32],
            "dropout_rate": 0.1,
            "linear_logits": True,
        },
        "optimizer": "adam",
        "optimizer_params": {"learning_rate": 0.001},
        "loss": ["binary_crossentropy"],
        "metrics": ["binary_accuracy", "AUC"],
        "batch_size": 128,
        "epochs": 3,
        "validation_split": 0.1,
    },
}

配置中的 group 字段指定了特征所属的组件:deepfm 表示特征参与 FM 的二阶交叉计算,linear 表示特征参与一阶线性计算。将特征同时放入两个组,可以让模型同时学习一阶效应和二阶交互。

10.3.4.2. 训练流程

排序模型的训练流程与召回模型类似:

def run_ranking_training():
    # 加载数据
    train_eval_samples = pickle.load(open(ranking_data_path, "rb"))
    feature_dict = pickle.load(open(ranking_feature_dict_path, "rb"))

    train_data = train_eval_samples["train"]
    test_data = train_eval_samples["test"]

    # 构建配置
    cfg = FunRecConfig(
        data=model_config_dict["data"],
        features=model_config_dict["features"],
        training=model_config_dict["training"],
        evaluation=model_config_dict["evaluation"]
    )

    # 准备特征
    feature_columns, processed_data = prepare_features(
        cfg.features, train_data, test_data,
        dataset_config=data_config[dataset_name]
    )

    # 训练模型
    models = train_model(cfg.training, feature_columns, processed_data)
    main_model = models[0]

    # 评估
    metrics = evaluate_model(models, processed_data, cfg.evaluation, feature_columns)
    print(build_metrics_table(metrics))

    # 保存模型
    main_model.save(config.RANKING_MODEL_PATH)

    # 保存模型配置(供在线推理使用)
    pickle.dump({
        "feature_dict": feature_dict,
        "feature_columns": [fc.name for fc in feature_columns],
        "model_config": model_config_dict
    }, open(config.TEMP_DIR / "ranking_model_config.pkl", "wb"))

排序模型的评估指标通常是 AUC(Area Under the ROC Curve)。AUC 衡量的是模型区分正负样本的能力,不受正负样本比例的影响。在推荐场景中,AUC 反映了模型将用户喜欢的物品排在不喜欢的物品前面的能力。

3.2.1.6节 中,我们详细介绍了 DeepFM 的模型结构。它将 Wide & Deep 中需要人工设计的 Wide 部分替换为 FM,实现了端到端的特征交叉学习。

10.3.5. 特征上线与模型部署

模型训练完成后,需要将相关产出物部署到在线系统可以访问的存储中。本项目使用 Redis 存储用户特征,使用共享文件目录存储模型文件。

10.3.5.1. Redis 特征写入

本节代码位于 offline/storage/redis_ingest.py

用户特征需要在在线推理时快速读取。Redis 作为内存数据库,读取延迟通常在 1 毫秒以内,非常适合这个场景。

def ingest_to_redis(flush: bool = False):
    r = redis.Redis.from_url(config.REDIS_URL, decode_responses=True)

    if flush:
        r.flushdb()  # 清空数据库(开发调试用)

    df_movies, df_ratings, df_users = load_raw_data()

    # 1. 写入用户画像
    pipeline = r.pipeline()
    for _, row in df_users.iterrows():
        user_id = row['user_id']
        key = f"user:{user_id}:profile"

        profile_data = {
            "gender": row["gender"],
            "age": row["age"],
            "occupation": row["occupation"],
            "zip_code": row["zip_code"]
        }
        pipeline.hset(key, mapping=profile_data)

        if _ % 1000 == 0:
            pipeline.execute()  # 批量执行,提升性能
    pipeline.execute()

用户画像使用 Redis Hash 类型存储,键为 user:{id}:profile,值为包含多个字段的哈希表。这种结构便于读取单个字段或全部字段。

# 2. 写入用户行为历史和偏好类目
    df_ratings.sort_values("timestamp", inplace=True)
    grouped = df_ratings.groupby("user_id")

    for user_id, group in grouped:
        # 行为历史(电影 ID 列表)
        history_key = f"user:{user_id}:history"
        movie_ids = group["movie_id"].tolist()

        pipeline.delete(history_key)
        if movie_ids:
            for i in range(0, len(movie_ids), 1000):
                chunk = movie_ids[i:i+1000]
                pipeline.rpush(history_key, *chunk)

        # 偏好类目(Top 3 类型)
        user_movies = df_movies[df_movies["movie_id"].isin(movie_ids)]
        all_genres = []
        for genres in user_movies["genres"]:
            all_genres.extend(genres)

        if all_genres:
            from collections import Counter
            counts = Counter(all_genres)
            top_3 = [g for g, c in counts.most_common(3)]

            pipeline.hset(
                f"user:{user_id}:profile",
                "frequent_genres",
                ",".join(top_3)
            )

    pipeline.execute()

用户行为历史使用 Redis List 类型存储,保持时间顺序。偏好类目是从历史行为中统计出的 Top 3 类型,存储在用户画像中,供偏好类目召回策略使用。

使用 Pipeline 批量执行命令是一个重要的优化。Redis 的命令执行速度很快,但每次网络往返都有开销。Pipeline 将多个命令打包发送,减少网络往返次数,可以显著提升写入速度。

10.3.5.2. 本地模型部署

本节代码位于 offline/storage/local_deploy.py

模型文件通常较大(几十 MB 到几百 MB),不适合存储在 Redis 中。本项目使用共享文件目录来管理模型文件,离线流程将模型写入该目录,在线服务从该目录加载。

def deploy_local(recall: bool = True, ranking: bool = True):
    deploy_dir = config.DEPLOY_DIR
    deploy_dir.mkdir(parents=True, exist_ok=True)

    if recall:
        deploy_recall_models(deploy_dir)

    if ranking:
        deploy_ranking_models(deploy_dir)

召回模型的部署包括用户塔模型、物品向量和词表:

def deploy_recall_models(deploy_dir: Path):
    recall_dir = deploy_dir / "recall"
    recall_dir.mkdir(parents=True, exist_ok=True)

    # 1. 词表(用于特征编码)
    shutil.copy2(config.VOCAB_DICT_PATH, recall_dir / "vocab_dict.pkl")
    shutil.copy2(config.VOCAB_DICT_PATH, deploy_dir / "vocab_dict.pkl")

    # 2. 物品向量(用于相似度检索)
    shutil.copy2(config.ITEM_EMB_PATH, recall_dir / "item_embeddings.npy")
    shutil.copy2(config.ITEM_EMB_PATH, deploy_dir / "item_embeddings.npy")

    # 3. 用户塔模型(复制整个目录)
    user_model_path = config.SAVED_MODELS_DIR / "user_model"
    model_deploy_dir = deploy_dir / "model" / "user_recall" / "v1"
    model_deploy_dir.mkdir(parents=True, exist_ok=True)
    shutil.copytree(user_model_path, model_deploy_dir / "user_model")

    # 更新版本指针
    version_info = {"version": "v1", "path": "model/user_recall/v1/user_model"}
    with open(deploy_dir / "model" / "user_recall" / "active.json", "w") as f:
        json.dump(version_info, f)

TensorFlow SavedModel 是一个目录,包含模型结构和权重。我们将整个目录复制到部署目录,保持目录结构完整。

版本管理是生产环境中的重要功能。通过 active.json 指针文件,在线服务可以知道当前应该加载哪个版本的模型。模型更新时,先部署新版本的模型文件,再更新指针,可以实现无感知的模型热更新。

排序模型的部署类似:

def deploy_ranking_models(deploy_dir: Path):
    ranking_dir = deploy_dir / "ranking"
    ranking_dir.mkdir(parents=True, exist_ok=True)

    # 1. 词表和特征配置
    shutil.copy2(config.RANKING_VOCAB_DICT_PATH, ranking_dir / "vocab_dict.pkl")
    shutil.copy2(config.RANKING_FEATURE_DICT_PATH, ranking_dir / "feature_dict.pkl")

    # 2. DeepFM 模型
    model_deploy_dir = deploy_dir / "model" / "ranking" / "v1"
    model_deploy_dir.mkdir(parents=True, exist_ok=True)
    shutil.copytree(config.RANKING_MODEL_PATH, model_deploy_dir / "ranking_model")

    # 更新版本指针
    version_info = {
        "version": "v1",
        "path": "model/ranking/v1/ranking_model",
        "model_type": "deepfm"
    }
    with open(deploy_dir / "model" / "ranking" / "active.json", "w") as f:
        json.dump(version_info, f)