10.3. 离线流水线¶
在上一节中,我们设计了推荐系统的整体架构,明确了离线系统和在线系统的边界。本节将聚焦于离线流程的具体实现,包括特征工程、模型训练,以及将在线推理时使用的特征和模型部署上线三个核心环节。
10.3.1. 代码结构¶
离线流水线的代码位于 web_project/backend/offline/ 目录下:
offline/
├── pipeline.py # 流水线入口
├── config.py # 配置管理
├── feature/ # 特征工程
│ ├── preprocess_retrieval.py # 召回模型特征处理
│ └── preprocess_ranking.py # 排序模型特征处理
├── training/ # 模型训练
│ ├── train_retrieval.py # 召回模型训练
│ └── train_ranking.py # 排序模型训练
└── storage/ # 存储部署
├── redis_ingest.py # 特征上线
└── local_deploy.py # 模型部署
整个流程通过 pipeline.py 统一管理,支持按需运行指定步骤:
# offline/pipeline.py
def main():
parser = argparse.ArgumentParser(description="FunRec Offline Pipeline")
parser.add_argument("--steps", type=str, default="all")
args = parser.parse_args()
steps = args.steps.split(",")
if "all" in steps:
steps = ["retrieval_preprocess", "ranking_preprocess",
"retrieval_training", "ranking_training",
"ingest", "deploy"]
if "retrieval_preprocess" in steps:
run_retrieval_preprocessing()
if "ranking_preprocess" in steps:
run_ranking_preprocessing()
if "retrieval_training" in steps:
run_retrieval_training()
if "ranking_training" in steps:
run_ranking_training()
if "ingest" in steps:
ingest_to_redis(flush=args.flush_redis)
if "deploy" in steps:
deploy_local()
这种模块化设计便于开发调试:可以选择只运行某些步骤,比如只重新训练排序模型而不改动召回模型。
配置集中在 offline/config.py:
class Config:
# 数据路径
TEMP_DIR = Path(os.getenv("FUNREC_PROCESSED_DATA_PATH")) / "web_project"
DATASET_DIR = Path(os.getenv("FUNREC_RAW_DATA_PATH"))
# 特征工程参数
MAX_SEQ_LEN = 10 # 历史序列最大长度
EMB_DIM = 16 # Embedding 维度
NEG_SAMPLE_SIZE = 20 # 负采样数量
# 训练参数
BATCH_SIZE = 128
EPOCHS = 3
LEARNING_RATE = 0.001
# 存储服务配置
DEPLOY_DIR = TEMP_DIR / "deployed_models" # 模型部署目录
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
环境变量配置数据路径和服务地址,便于切换部署环境。下面依次介绍特征工程、模型训练、特征上线和模型部署的具体实现。
10.3.2. 特征工程¶
特征工程是机器学习项目中最耗时但也最关键的环节。好的特征可以大幅提升模型效果,而特征处理的错误往往会导致模型完全失效。本项目需要为召回模型和排序模型分别构建训练样本。
10.3.2.1. 原始数据加载¶
MovieLens-1M 数据集包含三个核心表:
users.pkl:6040 个用户,包含性别、年龄、职业、邮编等人口统计特征
movies.pkl:3883 部电影,包含标题、类型、年份等属性
ratings.pkl:约 100 万条评分记录,每条记录包含用户 ID、电影 ID、评分(1-5)和时间戳
数据加载的代码很简单:
def load_raw_data():
df_movies = pd.read_pickle(config.DATASET_DIR / "movies.pkl")
df_ratings = pd.read_pickle(config.DATASET_DIR / "ratings.pkl")
df_users = pd.read_pickle(config.DATASET_DIR / "users.pkl")
return df_movies, df_ratings, df_users
原始数据经过了预处理,从 IMDB 补充了电影的海报 URL、演员、导演等元数据。这些元数据主要用于前端展示,在模型训练中使用的特征相对简单。
10.3.2.2. 召回模型的特征处理¶
本节代码位于
offline/feature/preprocess_retrieval.py
召回模型(YoutubeDNN)采用序列建模的方式,将用户的历史观影序列作为重要输入。这里需要解决几个问题:如何编码类别特征?如何构建行为序列?如何生成训练样本?
类别特征编码
推荐系统中的特征大多是类别型的:用户 ID、电影 ID、性别、类型等。这些特征需要编码为整数,才能输入神经网络的 Embedding 层。
def process_features(df_movies, df_ratings, df_users):
# 用户特征编码
user_sparse_feature_columns = ["user_id", "gender", "age", "occupation", "zip_code"]
user_vocab = {}
for feat_name in user_sparse_feature_columns:
label_encoder = LabelEncoder()
new_user_feature_df[feat_name + "_encode"] = (
label_encoder.fit_transform(new_user_feature_df[feat_name]) + 1
)
user_vocab[feat_name] = label_encoder.classes_
# 电影特征编码
movie_encode_feature_columns = ["movie_id", "genres", "isAdult", "startYear"]
movie_vocab = {}
for feat_name in movie_encode_feature_columns:
label_encoder = LabelEncoder()
label_encoder.fit(new_movie_sparse_feature_df[feat_name].explode())
if feat_name == "genres":
# genres 是列表类型,需要特殊处理
new_movie_sparse_feature_df[feat_name + "_encode"] = (
new_movie_sparse_feature_df[feat_name].apply(
lambda x: label_encoder.transform(x) + 1
)
)
else:
new_movie_sparse_feature_df[feat_name + "_encode"] = (
label_encoder.transform(new_movie_sparse_feature_df[feat_name]) + 1
)
movie_vocab[feat_name] = label_encoder.classes_
编码时有一个细节:所有编码值都从 1 开始,0 预留给未知值和填充。这是因为 Embedding 层的第 0 个向量通常用于表示“不存在”或“未知”的情况。
行为序列构建
YoutubeDNN 的核心思想是“预测用户下一个会观看的视频”。因此,我们需要构建滑动窗口形式的训练样本:给定用户的前 \(k\) 次观影记录,预测第 \(k+1\) 次观看的电影。
def generate_train_eval_samples(
data_df, user_columns, item_columns,
max_hist_seq_len=10, padding_value=0
):
data_df.sort_values("timestamp", inplace=True)
train_data_dict = defaultdict(list)
test_data_dict = defaultdict(list)
for user_id, grouped_feats in data_df.groupby("user_id"):
if len(grouped_feats["movie_id"]) < 2:
continue
len_hist_seq = len(grouped_feats["movie_id"])
# 测试集:使用最后一条记录
test_data_dict["user_id"].append(user_id)
for col in user_columns:
test_data_dict[col].append(grouped_feats[col].iloc[0])
for col in item_columns:
# 历史序列:除最后一条外的所有记录
test_data_dict["hist_" + col].append(
add_padding(grouped_feats[col].tolist()[:-1],
padding_value, max_hist_seq_len)
)
# 目标物品:最后一条记录
test_data_dict[col].append(grouped_feats[col].tolist()[-1])
# 训练集:滑动窗口
for i in range(1, len_hist_seq - 1):
train_data_dict["user_id"].append(user_id)
for col in user_columns:
train_data_dict[col].append(grouped_feats[col].iloc[0])
for col in item_columns:
# 历史序列:前 i 条记录
train_data_dict["hist_" + col].append(
add_padding(grouped_feats[col].tolist()[:i],
padding_value, max_hist_seq_len)
)
# 目标物品:第 i 条记录
train_data_dict[col].append(grouped_feats[col].tolist()[i])
这种时序划分方式模拟了真实的推荐场景:模型只能使用过去的信息来预测未来。如果使用随机划分,可能会导致未来信息泄露,使模型在训练时表现良好但在实际服务中效果不佳。
序列填充
不同用户的历史行为长度不同,但模型需要固定长度的输入。我们采用左填充的方式,将较短的序列在左侧补零:
def add_padding(val, padding_value, max_seq_len):
if isinstance(val, (list, tuple, np.ndarray)):
if len(val) > 0 and isinstance(val[0], (list, tuple, np.ndarray)):
# 处理嵌套列表(如 genres)
val = list(itertools.chain(*val))[:max_seq_len]
else:
val = list(val)[:max_seq_len]
# 左填充:新观看的在右边,符合时间顺序
return [padding_value] * (max_seq_len - len(val)) + val
else:
return val
左填充的好处是最近的行为总是在序列的右侧,这与时间顺序一致。对于序列模型(如 RNN 或 Transformer)来说,这种排列方式更自然。
10.3.2.3. 排序模型的特征处理¶
本节代码位于
offline/feature/preprocess_ranking.py
排序模型(DeepFM)与召回模型的样本格式不同。排序模型是一个点击率预测模型,输入是用户-物品对,输出是该用户点击该物品的概率。因此,我们需要构建正负样本。
标签定义
MovieLens 数据集只有评分,没有显式的点击信号。我们根据评分高低来定义标签:
# 计算每个用户的平均评分
user_avg_ratings = df_ratings.groupby("user_id")["rating"].mean().reset_index()
df_ratings = df_ratings.merge(user_avg_ratings, on="user_id", how="left")
# 如果评分高于用户平均分 - 1,认为是正样本
df_ratings['is_click'] = (
df_ratings['rating'] >= df_ratings['user_avg_rating'] - 1
).astype(int)
这种定义方式考虑了用户的评分偏好。有些用户普遍给高分,有些用户比较严格。使用相对于个人平均分的偏移来判断,可以减少这种个体差异的影响。
负样本采样
正样本直接来自用户的评分记录,但负样本需要额外构造。我们采用两种负采样策略:
困难负样本(Hard Negatives):用户曝光过但没有产生正向交互的物品。这些样本是“困难”的,因为用户看到了但选择不点击。
随机负样本(Random Negatives):从用户没有交互过的物品中随机采样。这些样本相对“简单”,用于扩充负样本数量。
def generate_negative_samples(
df_merged, movie_vocab,
neg_ratio_from_exposure=1, # 每个正样本对应的困难负样本数
neg_ratio_random=2, # 每个正样本对应的随机负样本数
):
all_movie_ids = set(range(1, len(movie_vocab["movie_id"]) + 1))
positive_samples = df_merged[df_merged["is_click"] == 1].copy()
hard_negative_pool = df_merged[df_merged["is_click"] == 0].copy()
# 构建每个用户的困难负样本池
user_hard_negatives = {}
for user_id, group in hard_negative_pool.groupby("user_id_original"):
user_hard_negatives[user_id] = group
# 构建每个用户的交互历史
user_interactions = df_merged.groupby("user_id_original")["movie_id"].apply(set).to_dict()
negative_samples = []
# 1. 采样困难负样本
for user_id, user_positives in positive_samples.groupby("user_id_original"):
user_hard_neg_pool = user_hard_negatives.get(user_id)
if user_hard_neg_pool is None or len(user_hard_neg_pool) == 0:
continue
n_positives = len(user_positives)
n_hard_neg_needed = n_positives * neg_ratio_from_exposure
n_to_sample = min(len(user_hard_neg_pool), n_hard_neg_needed)
if n_to_sample > 0:
sampled = user_hard_neg_pool.sample(n=n_to_sample)
negative_samples.append(sampled)
# 2. 采样随机负样本
for _, row in positive_samples.iterrows():
user_id_orig = row["user_id_original"]
user_interacted = user_interactions.get(user_id_orig, set())
available_movies = list(all_movie_ids - user_interacted)
if len(available_movies) < neg_ratio_random:
continue
neg_movie_ids = random.sample(available_movies, neg_ratio_random)
# 构建负样本记录...
负采样的比例需要权衡。过多的负样本会导致正负样本不平衡,影响模型学习;过少则可能使模型无法区分正负样本。本项目使用 1:3 的正负比例(1 个困难负样本 + 2 个随机负样本)。
训练/测试集划分
与召回模型类似,排序模型也采用时序划分:按时间排序后,较早的样本用于训练,较晚的样本用于测试。
def split_train_test(df_final, test_ratio=0.2, by_time=True):
if by_time and "timestamp" in df_final.columns:
df_final = df_final.sort_values("timestamp")
split_idx = int(len(df_final) * (1 - test_ratio))
train_df = df_final.iloc[:split_idx]
test_df = df_final.iloc[split_idx:]
else:
from sklearn.model_selection import train_test_split
train_df, test_df = train_test_split(df_final, test_size=test_ratio)
return train_df, test_df
时序划分可以更真实地评估模型在未来数据上的表现。在实际生产环境中,模型总是使用历史数据训练,然后对未来的请求进行预测。
10.3.3. 召回模型训练¶
本节代码位于
offline/training/train_retrieval.py
召回模型的任务是从全量物品库中快速筛选出候选集。本项目使用 YoutubeDNN 作为召回模型,它是一个双塔结构:用户塔将用户特征编码为向量,物品塔将物品特征编码为向量,二者的内积表示匹配程度。
10.3.3.1. 模型配置¶
模型配置定义了特征结构和训练参数:
model_config_dict = {
"features": {
"emb_dim": 16,
"max_seq_len": 10,
"task_names": ["movie_id"],
"features": [
# 用户特征
{"name": "user_id", "group": ["user_dnn"], "vocab_size": feature_dict["user_id"]},
{"name": "age", "group": ["user_dnn"], "vocab_size": feature_dict["age"]},
{"name": "gender", "group": ["user_dnn"], "vocab_size": feature_dict["gender"]},
{"name": "occupation", "group": ["user_dnn"], "vocab_size": feature_dict["occupation"]},
{"name": "zip_code", "group": ["user_dnn"], "vocab_size": feature_dict["zip_code"]},
# 目标物品
{"name": "movie_id", "group": ["target_item"], "vocab_size": feature_dict["movie_id"]},
# 历史行为序列
{
"name": "hist_movie_id",
"emb_name": "movie_id", # 与目标物品共享 Embedding
"group": ["raw_hist_seq"],
"combiner": "mean", # 序列聚合方式
"vocab_size": feature_dict["movie_id"]
},
{
"name": "hist_genres",
"emb_name": "genres",
"group": ["raw_hist_seq"],
"combiner": "mean",
"vocab_size": feature_dict["genres"]
},
]
},
"training": {
"build_function": "funrec.models.youtubednn.build_youtubednn_model",
"model_params": {
"emb_dim": 16,
"neg_sample": 20,
"dnn_units": [64, 32],
"label_name": "movie_id"
},
"optimizer": "adam",
"optimizer_params": {"learning_rate": 0.001},
"loss": "sampledsoftmaxloss",
"batch_size": 128,
"epochs": 3,
},
}
这里有几个要点:
Embedding 共享:历史序列中的电影 ID(
hist_movie_id)与目标电影(movie_id)共享同一个 Embedding 表。这种设计减少了参数量,也使得历史偏好和目标物品在同一个向量空间中表示。序列聚合:历史序列通过
mean聚合,将变长的序列压缩为固定维度的向量。更复杂的聚合方式(如注意力机制)可能带来更好的效果,但也增加了计算成本。Sampled Softmax:由于物品数量较多(3000+),直接计算完整的 Softmax 开销较大。Sampled Softmax 只对正样本和一定数量的负样本计算损失,大幅加速训练。
10.3.3.2. 训练流程¶
训练流程封装在 run_retrieval_training 函数中:
def run_retrieval_training():
# 加载数据
train_eval_samples = pickle.load(open(config.TRAIN_DATA_PATH, "rb"))
feature_dict = pickle.load(open(config.FEATURE_DICT_PATH, "rb"))
train_data = train_eval_samples["train"]
test_data = train_eval_samples["test"]
# 构建配置
cfg = FunRecConfig(
data=model_config_dict["data"],
features=model_config_dict["features"],
training=model_config_dict["training"],
evaluation=model_config_dict["evaluation"]
)
# 准备特征
feature_columns, processed_data = prepare_features(
cfg.features, train_data, test_data,
dataset_config=data_config[cfg.data["dataset_name"]]
)
# 训练模型
models = train_model(cfg.training, feature_columns, processed_data)
# 评估
metrics = evaluate_model(models, processed_data, cfg.evaluation, feature_columns)
print(build_metrics_table(metrics))
# 保存用户塔和物品塔
user_model = models[1]
item_model = models[2]
user_model.save(config.SAVED_MODELS_DIR / "user_model")
item_model.save(config.SAVED_MODELS_DIR / "item_model")
YoutubeDNN 训练完成后会返回三个模型:完整模型、用户塔模型、物品塔模型。在线服务时,我们只需要用户塔模型(实时计算用户向量)和预计算的物品向量(通过物品塔生成)。
10.3.3.3. 物品向量生成¶
召回阶段需要计算用户向量与所有物品向量的相似度。为了加速在线检索,我们在离线阶段预计算所有物品的向量:
# 生成所有物品的编码 ID
vocab_dict = pickle.load(open(config.VOCAB_DICT_PATH, "rb"))
all_movie_ids = sorted(list(vocab_dict["movie_id"]))
encoded_ids = np.arange(1, len(all_movie_ids) + 1)
# 通过物品塔计算向量
item_inputs = {"movie_id": encoded_ids}
embeddings = item_model.predict(item_inputs, verbose=0)
# 归一化(使内积等价于余弦相似度)
embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
# 保存
np.save(config.ITEM_EMB_PATH, embeddings)
np.save(config.MOVIE_IDS_PATH, np.array(all_movie_ids))
向量归一化是一个重要步骤。归一化后,向量的内积等价于余弦相似度,取值范围在 [-1, 1] 之间。这使得相似度的解释更加直观,也便于设置阈值进行过滤。
在 2.3节 中,我们详细介绍了 YoutubeDNN 的模型结构和训练技巧。这里不再赘述模型细节,重点关注工程实现。
10.3.4. 排序模型训练¶
本节代码位于
offline/training/train_ranking.py
排序模型的任务是对召回阶段筛选出的候选进行精确打分。本项目使用 DeepFM 作为排序模型,它结合了 FM(捕捉二阶特征交叉)和 DNN(捕捉高阶非线性)的优势。
10.3.4.1. 模型配置¶
DeepFM 的配置与 YoutubeDNN 有所不同。它不需要区分用户塔和物品塔,所有特征都作为模型的输入:
model_config_dict = {
"features": {
"emb_dim": 16,
"task_names": ["is_click"], # 二分类标签
"features": [
# 用户特征
{"name": "user_id", "group": ["deepfm", "linear"],
"vocab_size": feature_dict["user_id"]},
{"name": "gender", "group": ["deepfm", "linear"],
"vocab_size": feature_dict["gender"]},
{"name": "age", "group": ["deepfm", "linear"],
"vocab_size": feature_dict["age"]},
{"name": "occupation", "group": ["deepfm", "linear"],
"vocab_size": feature_dict["occupation"]},
{"name": "zip_code", "group": ["deepfm", "linear"],
"vocab_size": feature_dict["zip_code"]},
# 物品特征
{"name": "movie_id", "group": ["deepfm", "linear"],
"vocab_size": feature_dict["movie_id"]},
{"name": "genres", "group": ["deepfm", "linear"],
"vocab_size": feature_dict["genres"]},
{"name": "isAdult", "group": ["deepfm", "linear"],
"vocab_size": feature_dict["isAdult"]},
{"name": "startYear", "group": ["deepfm", "linear"],
"vocab_size": feature_dict["startYear"]},
]
},
"training": {
"build_function": "funrec.models.deepfm.build_deepfm_model",
"model_params": {
"dnn_units": [128, 64, 32],
"dropout_rate": 0.1,
"linear_logits": True,
},
"optimizer": "adam",
"optimizer_params": {"learning_rate": 0.001},
"loss": ["binary_crossentropy"],
"metrics": ["binary_accuracy", "AUC"],
"batch_size": 128,
"epochs": 3,
"validation_split": 0.1,
},
}
配置中的 group 字段指定了特征所属的组件:deepfm 表示特征参与
FM 的二阶交叉计算,linear
表示特征参与一阶线性计算。将特征同时放入两个组,可以让模型同时学习一阶效应和二阶交互。
10.3.4.2. 训练流程¶
排序模型的训练流程与召回模型类似:
def run_ranking_training():
# 加载数据
train_eval_samples = pickle.load(open(ranking_data_path, "rb"))
feature_dict = pickle.load(open(ranking_feature_dict_path, "rb"))
train_data = train_eval_samples["train"]
test_data = train_eval_samples["test"]
# 构建配置
cfg = FunRecConfig(
data=model_config_dict["data"],
features=model_config_dict["features"],
training=model_config_dict["training"],
evaluation=model_config_dict["evaluation"]
)
# 准备特征
feature_columns, processed_data = prepare_features(
cfg.features, train_data, test_data,
dataset_config=data_config[dataset_name]
)
# 训练模型
models = train_model(cfg.training, feature_columns, processed_data)
main_model = models[0]
# 评估
metrics = evaluate_model(models, processed_data, cfg.evaluation, feature_columns)
print(build_metrics_table(metrics))
# 保存模型
main_model.save(config.RANKING_MODEL_PATH)
# 保存模型配置(供在线推理使用)
pickle.dump({
"feature_dict": feature_dict,
"feature_columns": [fc.name for fc in feature_columns],
"model_config": model_config_dict
}, open(config.TEMP_DIR / "ranking_model_config.pkl", "wb"))
排序模型的评估指标通常是 AUC(Area Under the ROC Curve)。AUC 衡量的是模型区分正负样本的能力,不受正负样本比例的影响。在推荐场景中,AUC 反映了模型将用户喜欢的物品排在不喜欢的物品前面的能力。
在 3.2.1.6节 中,我们详细介绍了 DeepFM 的模型结构。它将 Wide & Deep 中需要人工设计的 Wide 部分替换为 FM,实现了端到端的特征交叉学习。
10.3.5. 特征上线与模型部署¶
模型训练完成后,需要将相关产出物部署到在线系统可以访问的存储中。本项目使用 Redis 存储用户特征,使用共享文件目录存储模型文件。
10.3.5.1. Redis 特征写入¶
本节代码位于
offline/storage/redis_ingest.py
用户特征需要在在线推理时快速读取。Redis 作为内存数据库,读取延迟通常在 1 毫秒以内,非常适合这个场景。
def ingest_to_redis(flush: bool = False):
r = redis.Redis.from_url(config.REDIS_URL, decode_responses=True)
if flush:
r.flushdb() # 清空数据库(开发调试用)
df_movies, df_ratings, df_users = load_raw_data()
# 1. 写入用户画像
pipeline = r.pipeline()
for _, row in df_users.iterrows():
user_id = row['user_id']
key = f"user:{user_id}:profile"
profile_data = {
"gender": row["gender"],
"age": row["age"],
"occupation": row["occupation"],
"zip_code": row["zip_code"]
}
pipeline.hset(key, mapping=profile_data)
if _ % 1000 == 0:
pipeline.execute() # 批量执行,提升性能
pipeline.execute()
用户画像使用 Redis Hash 类型存储,键为
user:{id}:profile,值为包含多个字段的哈希表。这种结构便于读取单个字段或全部字段。
# 2. 写入用户行为历史和偏好类目
df_ratings.sort_values("timestamp", inplace=True)
grouped = df_ratings.groupby("user_id")
for user_id, group in grouped:
# 行为历史(电影 ID 列表)
history_key = f"user:{user_id}:history"
movie_ids = group["movie_id"].tolist()
pipeline.delete(history_key)
if movie_ids:
for i in range(0, len(movie_ids), 1000):
chunk = movie_ids[i:i+1000]
pipeline.rpush(history_key, *chunk)
# 偏好类目(Top 3 类型)
user_movies = df_movies[df_movies["movie_id"].isin(movie_ids)]
all_genres = []
for genres in user_movies["genres"]:
all_genres.extend(genres)
if all_genres:
from collections import Counter
counts = Counter(all_genres)
top_3 = [g for g, c in counts.most_common(3)]
pipeline.hset(
f"user:{user_id}:profile",
"frequent_genres",
",".join(top_3)
)
pipeline.execute()
用户行为历史使用 Redis List 类型存储,保持时间顺序。偏好类目是从历史行为中统计出的 Top 3 类型,存储在用户画像中,供偏好类目召回策略使用。
使用 Pipeline 批量执行命令是一个重要的优化。Redis 的命令执行速度很快,但每次网络往返都有开销。Pipeline 将多个命令打包发送,减少网络往返次数,可以显著提升写入速度。
10.3.5.2. 本地模型部署¶
本节代码位于
offline/storage/local_deploy.py
模型文件通常较大(几十 MB 到几百 MB),不适合存储在 Redis 中。本项目使用共享文件目录来管理模型文件,离线流程将模型写入该目录,在线服务从该目录加载。
def deploy_local(recall: bool = True, ranking: bool = True):
deploy_dir = config.DEPLOY_DIR
deploy_dir.mkdir(parents=True, exist_ok=True)
if recall:
deploy_recall_models(deploy_dir)
if ranking:
deploy_ranking_models(deploy_dir)
召回模型的部署包括用户塔模型、物品向量和词表:
def deploy_recall_models(deploy_dir: Path):
recall_dir = deploy_dir / "recall"
recall_dir.mkdir(parents=True, exist_ok=True)
# 1. 词表(用于特征编码)
shutil.copy2(config.VOCAB_DICT_PATH, recall_dir / "vocab_dict.pkl")
shutil.copy2(config.VOCAB_DICT_PATH, deploy_dir / "vocab_dict.pkl")
# 2. 物品向量(用于相似度检索)
shutil.copy2(config.ITEM_EMB_PATH, recall_dir / "item_embeddings.npy")
shutil.copy2(config.ITEM_EMB_PATH, deploy_dir / "item_embeddings.npy")
# 3. 用户塔模型(复制整个目录)
user_model_path = config.SAVED_MODELS_DIR / "user_model"
model_deploy_dir = deploy_dir / "model" / "user_recall" / "v1"
model_deploy_dir.mkdir(parents=True, exist_ok=True)
shutil.copytree(user_model_path, model_deploy_dir / "user_model")
# 更新版本指针
version_info = {"version": "v1", "path": "model/user_recall/v1/user_model"}
with open(deploy_dir / "model" / "user_recall" / "active.json", "w") as f:
json.dump(version_info, f)
TensorFlow SavedModel 是一个目录,包含模型结构和权重。我们将整个目录复制到部署目录,保持目录结构完整。
版本管理是生产环境中的重要功能。通过 active.json
指针文件,在线服务可以知道当前应该加载哪个版本的模型。模型更新时,先部署新版本的模型文件,再更新指针,可以实现无感知的模型热更新。
排序模型的部署类似:
def deploy_ranking_models(deploy_dir: Path):
ranking_dir = deploy_dir / "ranking"
ranking_dir.mkdir(parents=True, exist_ok=True)
# 1. 词表和特征配置
shutil.copy2(config.RANKING_VOCAB_DICT_PATH, ranking_dir / "vocab_dict.pkl")
shutil.copy2(config.RANKING_FEATURE_DICT_PATH, ranking_dir / "feature_dict.pkl")
# 2. DeepFM 模型
model_deploy_dir = deploy_dir / "model" / "ranking" / "v1"
model_deploy_dir.mkdir(parents=True, exist_ok=True)
shutil.copytree(config.RANKING_MODEL_PATH, model_deploy_dir / "ranking_model")
# 更新版本指针
version_info = {
"version": "v1",
"path": "model/ranking/v1/ranking_model",
"model_type": "deepfm"
}
with open(deploy_dir / "model" / "ranking" / "active.json", "w") as f:
json.dump(version_info, f)