.. _sec_projects_offline_pipeline: 离线流水线 ========== 在上一节中,我们设计了推荐系统的整体架构,明确了离线系统和在线系统的边界。本节将聚焦于离线流程的具体实现,包括特征工程、模型训练,以及将在线推理时使用的特征和模型部署上线三个核心环节。 代码结构 -------- 离线流水线的代码位于 ``web_project/backend/offline/`` 目录下: :: offline/ ├── pipeline.py # 流水线入口 ├── config.py # 配置管理 ├── feature/ # 特征工程 │ ├── preprocess_retrieval.py # 召回模型特征处理 │ └── preprocess_ranking.py # 排序模型特征处理 ├── training/ # 模型训练 │ ├── train_retrieval.py # 召回模型训练 │ └── train_ranking.py # 排序模型训练 └── storage/ # 存储部署 ├── redis_ingest.py # 特征上线 └── local_deploy.py # 模型部署 整个流程通过 ``pipeline.py`` 统一管理,支持按需运行指定步骤: .. raw:: latex \diilbookstyleinputcell .. code:: python # offline/pipeline.py def main(): parser = argparse.ArgumentParser(description="FunRec Offline Pipeline") parser.add_argument("--steps", type=str, default="all") args = parser.parse_args() steps = args.steps.split(",") if "all" in steps: steps = ["retrieval_preprocess", "ranking_preprocess", "retrieval_training", "ranking_training", "ingest", "deploy"] if "retrieval_preprocess" in steps: run_retrieval_preprocessing() if "ranking_preprocess" in steps: run_ranking_preprocessing() if "retrieval_training" in steps: run_retrieval_training() if "ranking_training" in steps: run_ranking_training() if "ingest" in steps: ingest_to_redis(flush=args.flush_redis) if "deploy" in steps: deploy_local() 这种模块化设计便于开发调试:可以选择只运行某些步骤,比如只重新训练排序模型而不改动召回模型。 配置集中在 ``offline/config.py``\ : .. raw:: latex \diilbookstyleinputcell .. code:: python class Config: # 数据路径 TEMP_DIR = Path(os.getenv("FUNREC_PROCESSED_DATA_PATH")) / "web_project" DATASET_DIR = Path(os.getenv("FUNREC_RAW_DATA_PATH")) # 特征工程参数 MAX_SEQ_LEN = 10 # 历史序列最大长度 EMB_DIM = 16 # Embedding 维度 NEG_SAMPLE_SIZE = 20 # 负采样数量 # 训练参数 BATCH_SIZE = 128 EPOCHS = 3 LEARNING_RATE = 0.001 # 存储服务配置 DEPLOY_DIR = TEMP_DIR / "deployed_models" # 模型部署目录 REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") 环境变量配置数据路径和服务地址,便于切换部署环境。下面依次介绍特征工程、模型训练、特征上线和模型部署的具体实现。 特征工程 -------- 特征工程是机器学习项目中最耗时但也最关键的环节。好的特征可以大幅提升模型效果,而特征处理的错误往往会导致模型完全失效。本项目需要为召回模型和排序模型分别构建训练样本。 原始数据加载 ~~~~~~~~~~~~ MovieLens-1M 数据集包含三个核心表: - **users.pkl**\ :6040 个用户,包含性别、年龄、职业、邮编等人口统计特征 - **movies.pkl**\ :3883 部电影,包含标题、类型、年份等属性 - **ratings.pkl**\ :约 100 万条评分记录,每条记录包含用户 ID、电影 ID、评分(1-5)和时间戳 数据加载的代码很简单: .. raw:: latex \diilbookstyleinputcell .. code:: python def load_raw_data(): df_movies = pd.read_pickle(config.DATASET_DIR / "movies.pkl") df_ratings = pd.read_pickle(config.DATASET_DIR / "ratings.pkl") df_users = pd.read_pickle(config.DATASET_DIR / "users.pkl") return df_movies, df_ratings, df_users 原始数据经过了预处理,从 IMDB 补充了电影的海报 URL、演员、导演等元数据。这些元数据主要用于前端展示,在模型训练中使用的特征相对简单。 召回模型的特征处理 ~~~~~~~~~~~~~~~~~~ 本节代码位于 ``offline/feature/preprocess_retrieval.py`` 召回模型(YoutubeDNN)采用序列建模的方式,将用户的历史观影序列作为重要输入。这里需要解决几个问题:如何编码类别特征?如何构建行为序列?如何生成训练样本? **类别特征编码** 推荐系统中的特征大多是类别型的:用户 ID、电影 ID、性别、类型等。这些特征需要编码为整数,才能输入神经网络的 Embedding 层。 .. raw:: latex \diilbookstyleinputcell .. code:: python def process_features(df_movies, df_ratings, df_users): # 用户特征编码 user_sparse_feature_columns = ["user_id", "gender", "age", "occupation", "zip_code"] user_vocab = {} for feat_name in user_sparse_feature_columns: label_encoder = LabelEncoder() new_user_feature_df[feat_name + "_encode"] = ( label_encoder.fit_transform(new_user_feature_df[feat_name]) + 1 ) user_vocab[feat_name] = label_encoder.classes_ # 电影特征编码 movie_encode_feature_columns = ["movie_id", "genres", "isAdult", "startYear"] movie_vocab = {} for feat_name in movie_encode_feature_columns: label_encoder = LabelEncoder() label_encoder.fit(new_movie_sparse_feature_df[feat_name].explode()) if feat_name == "genres": # genres 是列表类型,需要特殊处理 new_movie_sparse_feature_df[feat_name + "_encode"] = ( new_movie_sparse_feature_df[feat_name].apply( lambda x: label_encoder.transform(x) + 1 ) ) else: new_movie_sparse_feature_df[feat_name + "_encode"] = ( label_encoder.transform(new_movie_sparse_feature_df[feat_name]) + 1 ) movie_vocab[feat_name] = label_encoder.classes_ 编码时有一个细节:所有编码值都从 1 开始,0 预留给未知值和填充。这是因为 Embedding 层的第 0 个向量通常用于表示“不存在”或“未知”的情况。 **行为序列构建** YoutubeDNN 的核心思想是“预测用户下一个会观看的视频”。因此,我们需要构建滑动窗口形式的训练样本:给定用户的前 :math:`k` 次观影记录,预测第 :math:`k+1` 次观看的电影。 .. raw:: latex \diilbookstyleinputcell .. code:: python def generate_train_eval_samples( data_df, user_columns, item_columns, max_hist_seq_len=10, padding_value=0 ): data_df.sort_values("timestamp", inplace=True) train_data_dict = defaultdict(list) test_data_dict = defaultdict(list) for user_id, grouped_feats in data_df.groupby("user_id"): if len(grouped_feats["movie_id"]) < 2: continue len_hist_seq = len(grouped_feats["movie_id"]) # 测试集:使用最后一条记录 test_data_dict["user_id"].append(user_id) for col in user_columns: test_data_dict[col].append(grouped_feats[col].iloc[0]) for col in item_columns: # 历史序列:除最后一条外的所有记录 test_data_dict["hist_" + col].append( add_padding(grouped_feats[col].tolist()[:-1], padding_value, max_hist_seq_len) ) # 目标物品:最后一条记录 test_data_dict[col].append(grouped_feats[col].tolist()[-1]) # 训练集:滑动窗口 for i in range(1, len_hist_seq - 1): train_data_dict["user_id"].append(user_id) for col in user_columns: train_data_dict[col].append(grouped_feats[col].iloc[0]) for col in item_columns: # 历史序列:前 i 条记录 train_data_dict["hist_" + col].append( add_padding(grouped_feats[col].tolist()[:i], padding_value, max_hist_seq_len) ) # 目标物品:第 i 条记录 train_data_dict[col].append(grouped_feats[col].tolist()[i]) 这种时序划分方式模拟了真实的推荐场景:模型只能使用过去的信息来预测未来。如果使用随机划分,可能会导致未来信息泄露,使模型在训练时表现良好但在实际服务中效果不佳。 **序列填充** 不同用户的历史行为长度不同,但模型需要固定长度的输入。我们采用左填充的方式,将较短的序列在左侧补零: .. raw:: latex \diilbookstyleinputcell .. code:: python def add_padding(val, padding_value, max_seq_len): if isinstance(val, (list, tuple, np.ndarray)): if len(val) > 0 and isinstance(val[0], (list, tuple, np.ndarray)): # 处理嵌套列表(如 genres) val = list(itertools.chain(*val))[:max_seq_len] else: val = list(val)[:max_seq_len] # 左填充:新观看的在右边,符合时间顺序 return [padding_value] * (max_seq_len - len(val)) + val else: return val 左填充的好处是最近的行为总是在序列的右侧,这与时间顺序一致。对于序列模型(如 RNN 或 Transformer)来说,这种排列方式更自然。 排序模型的特征处理 ~~~~~~~~~~~~~~~~~~ 本节代码位于 ``offline/feature/preprocess_ranking.py`` 排序模型(DeepFM)与召回模型的样本格式不同。排序模型是一个点击率预测模型,输入是用户-物品对,输出是该用户点击该物品的概率。因此,我们需要构建正负样本。 **标签定义** MovieLens 数据集只有评分,没有显式的点击信号。我们根据评分高低来定义标签: .. raw:: latex \diilbookstyleinputcell .. code:: python # 计算每个用户的平均评分 user_avg_ratings = df_ratings.groupby("user_id")["rating"].mean().reset_index() df_ratings = df_ratings.merge(user_avg_ratings, on="user_id", how="left") # 如果评分高于用户平均分 - 1,认为是正样本 df_ratings['is_click'] = ( df_ratings['rating'] >= df_ratings['user_avg_rating'] - 1 ).astype(int) 这种定义方式考虑了用户的评分偏好。有些用户普遍给高分,有些用户比较严格。使用相对于个人平均分的偏移来判断,可以减少这种个体差异的影响。 **负样本采样** 正样本直接来自用户的评分记录,但负样本需要额外构造。我们采用两种负采样策略: 1. **困难负样本(Hard Negatives)**\ :用户曝光过但没有产生正向交互的物品。这些样本是“困难”的,因为用户看到了但选择不点击。 2. **随机负样本(Random Negatives)**\ :从用户没有交互过的物品中随机采样。这些样本相对“简单”,用于扩充负样本数量。 .. raw:: latex \diilbookstyleinputcell .. code:: python def generate_negative_samples( df_merged, movie_vocab, neg_ratio_from_exposure=1, # 每个正样本对应的困难负样本数 neg_ratio_random=2, # 每个正样本对应的随机负样本数 ): all_movie_ids = set(range(1, len(movie_vocab["movie_id"]) + 1)) positive_samples = df_merged[df_merged["is_click"] == 1].copy() hard_negative_pool = df_merged[df_merged["is_click"] == 0].copy() # 构建每个用户的困难负样本池 user_hard_negatives = {} for user_id, group in hard_negative_pool.groupby("user_id_original"): user_hard_negatives[user_id] = group # 构建每个用户的交互历史 user_interactions = df_merged.groupby("user_id_original")["movie_id"].apply(set).to_dict() negative_samples = [] # 1. 采样困难负样本 for user_id, user_positives in positive_samples.groupby("user_id_original"): user_hard_neg_pool = user_hard_negatives.get(user_id) if user_hard_neg_pool is None or len(user_hard_neg_pool) == 0: continue n_positives = len(user_positives) n_hard_neg_needed = n_positives * neg_ratio_from_exposure n_to_sample = min(len(user_hard_neg_pool), n_hard_neg_needed) if n_to_sample > 0: sampled = user_hard_neg_pool.sample(n=n_to_sample) negative_samples.append(sampled) # 2. 采样随机负样本 for _, row in positive_samples.iterrows(): user_id_orig = row["user_id_original"] user_interacted = user_interactions.get(user_id_orig, set()) available_movies = list(all_movie_ids - user_interacted) if len(available_movies) < neg_ratio_random: continue neg_movie_ids = random.sample(available_movies, neg_ratio_random) # 构建负样本记录... 负采样的比例需要权衡。过多的负样本会导致正负样本不平衡,影响模型学习;过少则可能使模型无法区分正负样本。本项目使用 1:3 的正负比例(1 个困难负样本 + 2 个随机负样本)。 **训练/测试集划分** 与召回模型类似,排序模型也采用时序划分:按时间排序后,较早的样本用于训练,较晚的样本用于测试。 .. raw:: latex \diilbookstyleinputcell .. code:: python def split_train_test(df_final, test_ratio=0.2, by_time=True): if by_time and "timestamp" in df_final.columns: df_final = df_final.sort_values("timestamp") split_idx = int(len(df_final) * (1 - test_ratio)) train_df = df_final.iloc[:split_idx] test_df = df_final.iloc[split_idx:] else: from sklearn.model_selection import train_test_split train_df, test_df = train_test_split(df_final, test_size=test_ratio) return train_df, test_df 时序划分可以更真实地评估模型在未来数据上的表现。在实际生产环境中,模型总是使用历史数据训练,然后对未来的请求进行预测。 召回模型训练 ------------ 本节代码位于 ``offline/training/train_retrieval.py`` 召回模型的任务是从全量物品库中快速筛选出候选集。本项目使用 YoutubeDNN 作为召回模型,它是一个双塔结构:用户塔将用户特征编码为向量,物品塔将物品特征编码为向量,二者的内积表示匹配程度。 模型配置 ~~~~~~~~ 模型配置定义了特征结构和训练参数: .. raw:: latex \diilbookstyleinputcell .. code:: python model_config_dict = { "features": { "emb_dim": 16, "max_seq_len": 10, "task_names": ["movie_id"], "features": [ # 用户特征 {"name": "user_id", "group": ["user_dnn"], "vocab_size": feature_dict["user_id"]}, {"name": "age", "group": ["user_dnn"], "vocab_size": feature_dict["age"]}, {"name": "gender", "group": ["user_dnn"], "vocab_size": feature_dict["gender"]}, {"name": "occupation", "group": ["user_dnn"], "vocab_size": feature_dict["occupation"]}, {"name": "zip_code", "group": ["user_dnn"], "vocab_size": feature_dict["zip_code"]}, # 目标物品 {"name": "movie_id", "group": ["target_item"], "vocab_size": feature_dict["movie_id"]}, # 历史行为序列 { "name": "hist_movie_id", "emb_name": "movie_id", # 与目标物品共享 Embedding "group": ["raw_hist_seq"], "combiner": "mean", # 序列聚合方式 "vocab_size": feature_dict["movie_id"] }, { "name": "hist_genres", "emb_name": "genres", "group": ["raw_hist_seq"], "combiner": "mean", "vocab_size": feature_dict["genres"] }, ] }, "training": { "build_function": "funrec.models.youtubednn.build_youtubednn_model", "model_params": { "emb_dim": 16, "neg_sample": 20, "dnn_units": [64, 32], "label_name": "movie_id" }, "optimizer": "adam", "optimizer_params": {"learning_rate": 0.001}, "loss": "sampledsoftmaxloss", "batch_size": 128, "epochs": 3, }, } 这里有几个要点: 1. **Embedding 共享**\ :历史序列中的电影 ID(\ ``hist_movie_id``\ )与目标电影(\ ``movie_id``\ )共享同一个 Embedding 表。这种设计减少了参数量,也使得历史偏好和目标物品在同一个向量空间中表示。 2. **序列聚合**\ :历史序列通过 ``mean`` 聚合,将变长的序列压缩为固定维度的向量。更复杂的聚合方式(如注意力机制)可能带来更好的效果,但也增加了计算成本。 3. **Sampled Softmax**\ :由于物品数量较多(3000+),直接计算完整的 Softmax 开销较大。Sampled Softmax 只对正样本和一定数量的负样本计算损失,大幅加速训练。 训练流程 ~~~~~~~~ 训练流程封装在 ``run_retrieval_training`` 函数中: .. raw:: latex \diilbookstyleinputcell .. code:: python def run_retrieval_training(): # 加载数据 train_eval_samples = pickle.load(open(config.TRAIN_DATA_PATH, "rb")) feature_dict = pickle.load(open(config.FEATURE_DICT_PATH, "rb")) train_data = train_eval_samples["train"] test_data = train_eval_samples["test"] # 构建配置 cfg = FunRecConfig( data=model_config_dict["data"], features=model_config_dict["features"], training=model_config_dict["training"], evaluation=model_config_dict["evaluation"] ) # 准备特征 feature_columns, processed_data = prepare_features( cfg.features, train_data, test_data, dataset_config=data_config[cfg.data["dataset_name"]] ) # 训练模型 models = train_model(cfg.training, feature_columns, processed_data) # 评估 metrics = evaluate_model(models, processed_data, cfg.evaluation, feature_columns) print(build_metrics_table(metrics)) # 保存用户塔和物品塔 user_model = models[1] item_model = models[2] user_model.save(config.SAVED_MODELS_DIR / "user_model") item_model.save(config.SAVED_MODELS_DIR / "item_model") YoutubeDNN 训练完成后会返回三个模型:完整模型、用户塔模型、物品塔模型。在线服务时,我们只需要用户塔模型(实时计算用户向量)和预计算的物品向量(通过物品塔生成)。 物品向量生成 ~~~~~~~~~~~~ 召回阶段需要计算用户向量与所有物品向量的相似度。为了加速在线检索,我们在离线阶段预计算所有物品的向量: .. raw:: latex \diilbookstyleinputcell .. code:: python # 生成所有物品的编码 ID vocab_dict = pickle.load(open(config.VOCAB_DICT_PATH, "rb")) all_movie_ids = sorted(list(vocab_dict["movie_id"])) encoded_ids = np.arange(1, len(all_movie_ids) + 1) # 通过物品塔计算向量 item_inputs = {"movie_id": encoded_ids} embeddings = item_model.predict(item_inputs, verbose=0) # 归一化(使内积等价于余弦相似度) embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) # 保存 np.save(config.ITEM_EMB_PATH, embeddings) np.save(config.MOVIE_IDS_PATH, np.array(all_movie_ids)) 向量归一化是一个重要步骤。归一化后,向量的内积等价于余弦相似度,取值范围在 [-1, 1] 之间。这使得相似度的解释更加直观,也便于设置阈值进行过滤。 在 :numref:`two_tower` 中,我们详细介绍了 YoutubeDNN 的模型结构和训练技巧。这里不再赘述模型细节,重点关注工程实现。 排序模型训练 ------------ 本节代码位于 ``offline/training/train_ranking.py`` 排序模型的任务是对召回阶段筛选出的候选进行精确打分。本项目使用 DeepFM 作为排序模型,它结合了 FM(捕捉二阶特征交叉)和 DNN(捕捉高阶非线性)的优势。 模型配置 ~~~~~~~~ DeepFM 的配置与 YoutubeDNN 有所不同。它不需要区分用户塔和物品塔,所有特征都作为模型的输入: .. raw:: latex \diilbookstyleinputcell .. code:: python model_config_dict = { "features": { "emb_dim": 16, "task_names": ["is_click"], # 二分类标签 "features": [ # 用户特征 {"name": "user_id", "group": ["deepfm", "linear"], "vocab_size": feature_dict["user_id"]}, {"name": "gender", "group": ["deepfm", "linear"], "vocab_size": feature_dict["gender"]}, {"name": "age", "group": ["deepfm", "linear"], "vocab_size": feature_dict["age"]}, {"name": "occupation", "group": ["deepfm", "linear"], "vocab_size": feature_dict["occupation"]}, {"name": "zip_code", "group": ["deepfm", "linear"], "vocab_size": feature_dict["zip_code"]}, # 物品特征 {"name": "movie_id", "group": ["deepfm", "linear"], "vocab_size": feature_dict["movie_id"]}, {"name": "genres", "group": ["deepfm", "linear"], "vocab_size": feature_dict["genres"]}, {"name": "isAdult", "group": ["deepfm", "linear"], "vocab_size": feature_dict["isAdult"]}, {"name": "startYear", "group": ["deepfm", "linear"], "vocab_size": feature_dict["startYear"]}, ] }, "training": { "build_function": "funrec.models.deepfm.build_deepfm_model", "model_params": { "dnn_units": [128, 64, 32], "dropout_rate": 0.1, "linear_logits": True, }, "optimizer": "adam", "optimizer_params": {"learning_rate": 0.001}, "loss": ["binary_crossentropy"], "metrics": ["binary_accuracy", "AUC"], "batch_size": 128, "epochs": 3, "validation_split": 0.1, }, } 配置中的 ``group`` 字段指定了特征所属的组件:\ ``deepfm`` 表示特征参与 FM 的二阶交叉计算,\ ``linear`` 表示特征参与一阶线性计算。将特征同时放入两个组,可以让模型同时学习一阶效应和二阶交互。 训练流程 ~~~~~~~~ 排序模型的训练流程与召回模型类似: .. raw:: latex \diilbookstyleinputcell .. code:: python def run_ranking_training(): # 加载数据 train_eval_samples = pickle.load(open(ranking_data_path, "rb")) feature_dict = pickle.load(open(ranking_feature_dict_path, "rb")) train_data = train_eval_samples["train"] test_data = train_eval_samples["test"] # 构建配置 cfg = FunRecConfig( data=model_config_dict["data"], features=model_config_dict["features"], training=model_config_dict["training"], evaluation=model_config_dict["evaluation"] ) # 准备特征 feature_columns, processed_data = prepare_features( cfg.features, train_data, test_data, dataset_config=data_config[dataset_name] ) # 训练模型 models = train_model(cfg.training, feature_columns, processed_data) main_model = models[0] # 评估 metrics = evaluate_model(models, processed_data, cfg.evaluation, feature_columns) print(build_metrics_table(metrics)) # 保存模型 main_model.save(config.RANKING_MODEL_PATH) # 保存模型配置(供在线推理使用) pickle.dump({ "feature_dict": feature_dict, "feature_columns": [fc.name for fc in feature_columns], "model_config": model_config_dict }, open(config.TEMP_DIR / "ranking_model_config.pkl", "wb")) 排序模型的评估指标通常是 AUC(Area Under the ROC Curve)。AUC 衡量的是模型区分正负样本的能力,不受正负样本比例的影响。在推荐场景中,AUC 反映了模型将用户喜欢的物品排在不喜欢的物品前面的能力。 在 :numref:`deepfm` 中,我们详细介绍了 DeepFM 的模型结构。它将 Wide & Deep 中需要人工设计的 Wide 部分替换为 FM,实现了端到端的特征交叉学习。 特征上线与模型部署 ------------------ 模型训练完成后,需要将相关产出物部署到在线系统可以访问的存储中。本项目使用 Redis 存储用户特征,使用共享文件目录存储模型文件。 Redis 特征写入 ~~~~~~~~~~~~~~ 本节代码位于 ``offline/storage/redis_ingest.py`` 用户特征需要在在线推理时快速读取。Redis 作为内存数据库,读取延迟通常在 1 毫秒以内,非常适合这个场景。 .. raw:: latex \diilbookstyleinputcell .. code:: python def ingest_to_redis(flush: bool = False): r = redis.Redis.from_url(config.REDIS_URL, decode_responses=True) if flush: r.flushdb() # 清空数据库(开发调试用) df_movies, df_ratings, df_users = load_raw_data() # 1. 写入用户画像 pipeline = r.pipeline() for _, row in df_users.iterrows(): user_id = row['user_id'] key = f"user:{user_id}:profile" profile_data = { "gender": row["gender"], "age": row["age"], "occupation": row["occupation"], "zip_code": row["zip_code"] } pipeline.hset(key, mapping=profile_data) if _ % 1000 == 0: pipeline.execute() # 批量执行,提升性能 pipeline.execute() 用户画像使用 Redis Hash 类型存储,键为 ``user:{id}:profile``\ ,值为包含多个字段的哈希表。这种结构便于读取单个字段或全部字段。 .. raw:: latex \diilbookstyleinputcell .. code:: python # 2. 写入用户行为历史和偏好类目 df_ratings.sort_values("timestamp", inplace=True) grouped = df_ratings.groupby("user_id") for user_id, group in grouped: # 行为历史(电影 ID 列表) history_key = f"user:{user_id}:history" movie_ids = group["movie_id"].tolist() pipeline.delete(history_key) if movie_ids: for i in range(0, len(movie_ids), 1000): chunk = movie_ids[i:i+1000] pipeline.rpush(history_key, *chunk) # 偏好类目(Top 3 类型) user_movies = df_movies[df_movies["movie_id"].isin(movie_ids)] all_genres = [] for genres in user_movies["genres"]: all_genres.extend(genres) if all_genres: from collections import Counter counts = Counter(all_genres) top_3 = [g for g, c in counts.most_common(3)] pipeline.hset( f"user:{user_id}:profile", "frequent_genres", ",".join(top_3) ) pipeline.execute() 用户行为历史使用 Redis List 类型存储,保持时间顺序。偏好类目是从历史行为中统计出的 Top 3 类型,存储在用户画像中,供偏好类目召回策略使用。 使用 Pipeline 批量执行命令是一个重要的优化。Redis 的命令执行速度很快,但每次网络往返都有开销。Pipeline 将多个命令打包发送,减少网络往返次数,可以显著提升写入速度。 本地模型部署 ~~~~~~~~~~~~ 本节代码位于 ``offline/storage/local_deploy.py`` 模型文件通常较大(几十 MB 到几百 MB),不适合存储在 Redis 中。本项目使用共享文件目录来管理模型文件,离线流程将模型写入该目录,在线服务从该目录加载。 .. raw:: latex \diilbookstyleinputcell .. code:: python def deploy_local(recall: bool = True, ranking: bool = True): deploy_dir = config.DEPLOY_DIR deploy_dir.mkdir(parents=True, exist_ok=True) if recall: deploy_recall_models(deploy_dir) if ranking: deploy_ranking_models(deploy_dir) 召回模型的部署包括用户塔模型、物品向量和词表: .. raw:: latex \diilbookstyleinputcell .. code:: python def deploy_recall_models(deploy_dir: Path): recall_dir = deploy_dir / "recall" recall_dir.mkdir(parents=True, exist_ok=True) # 1. 词表(用于特征编码) shutil.copy2(config.VOCAB_DICT_PATH, recall_dir / "vocab_dict.pkl") shutil.copy2(config.VOCAB_DICT_PATH, deploy_dir / "vocab_dict.pkl") # 2. 物品向量(用于相似度检索) shutil.copy2(config.ITEM_EMB_PATH, recall_dir / "item_embeddings.npy") shutil.copy2(config.ITEM_EMB_PATH, deploy_dir / "item_embeddings.npy") # 3. 用户塔模型(复制整个目录) user_model_path = config.SAVED_MODELS_DIR / "user_model" model_deploy_dir = deploy_dir / "model" / "user_recall" / "v1" model_deploy_dir.mkdir(parents=True, exist_ok=True) shutil.copytree(user_model_path, model_deploy_dir / "user_model") # 更新版本指针 version_info = {"version": "v1", "path": "model/user_recall/v1/user_model"} with open(deploy_dir / "model" / "user_recall" / "active.json", "w") as f: json.dump(version_info, f) TensorFlow SavedModel 是一个目录,包含模型结构和权重。我们将整个目录复制到部署目录,保持目录结构完整。 版本管理是生产环境中的重要功能。通过 ``active.json`` 指针文件,在线服务可以知道当前应该加载哪个版本的模型。模型更新时,先部署新版本的模型文件,再更新指针,可以实现无感知的模型热更新。 排序模型的部署类似: .. raw:: latex \diilbookstyleinputcell .. code:: python def deploy_ranking_models(deploy_dir: Path): ranking_dir = deploy_dir / "ranking" ranking_dir.mkdir(parents=True, exist_ok=True) # 1. 词表和特征配置 shutil.copy2(config.RANKING_VOCAB_DICT_PATH, ranking_dir / "vocab_dict.pkl") shutil.copy2(config.RANKING_FEATURE_DICT_PATH, ranking_dir / "feature_dict.pkl") # 2. DeepFM 模型 model_deploy_dir = deploy_dir / "model" / "ranking" / "v1" model_deploy_dir.mkdir(parents=True, exist_ok=True) shutil.copytree(config.RANKING_MODEL_PATH, model_deploy_dir / "ranking_model") # 更新版本指针 version_info = { "version": "v1", "path": "model/ranking/v1/ranking_model", "model_type": "deepfm" } with open(deploy_dir / "model" / "ranking" / "active.json", "w") as f: json.dump(version_info, f)