import os
import pprint
import tempfile
from typing import Dict, Text
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_recommenders as tfrs
数据集
桶检索阶段使用相同的数据集
ratings = tfds.load("movielens/100k-ratings", split="train")
ratings = ratings.map(lambda x: {
"movie_title": x["movie_title"],
"user_id": x["user_id"],
"user_rating": x["user_rating"]
})
同样随机划分数据集
tf.random.set_seed(42)
shuffled = ratings.shuffle(100_000, seed=42, reshuffle_each_iteration=False)
train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)
类别特征词汇表
movie_titles = ratings.batch(1_000_000).map(lambda x: x["movie_title"])
user_ids = ratings.batch(1_000_000).map(lambda x: x["user_id"])
unique_movie_titles = np.unique(np.concatenate(list(movie_titles)))
unique_user_ids = np.unique(np.concatenate(list(user_ids)))
由多个堆叠的密集层组成的模型是对任务进行排序的一种相对常见的体系结构
class RankingModel(tf.keras.Model):
def __init__(self):
super().__init__()
embedding_dimension = 32
# Compute embeddings for users.
self.user_embeddings = tf.keras.Sequential([
tf.keras.layers.StringLookup(
vocabulary=unique_user_ids, mask_token=None),
tf.keras.layers.Embedding(len(unique_user_ids) + 1, embedding_dimension)
])
# Compute embeddings for movies.
self.movie_embeddings = tf.keras.Sequential([
tf.keras.layers.StringLookup(
vocabulary=unique_movie_titles, mask_token=None),
tf.keras.layers.Embedding(len(unique_movie_titles) + 1, embedding_dimension)
])
# Compute predictions.
self.ratings = tf.keras.Sequential([
# Learn multiple dense layers.
tf.keras.layers.Dense(256, activation="relu"),
tf.keras.layers.Dense(64, activation="relu"),
# Make rating predictions in the final layer.
tf.keras.layers.Dense(1)
])
def call(self, inputs):
user_id, movie_title = inputs
user_embedding = self.user_embeddings(user_id)
movie_embedding = self.movie_embeddings(movie_title)
return self.ratings(tf.concat([user_embedding, movie_embedding], axis=1))
损失和评估函数
task = tfrs.tasks.Ranking(
loss = tf.keras.losses.MeanSquaredError(),
metrics=[tf.keras.metrics.RootMeanSquaredError()]
)
完整模型结构
class MovielensModel(tfrs.models.Model):
def __init__(self):
super().__init__()
self.ranking_model: tf.keras.Model = RankingModel()
self.task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
loss = tf.keras.losses.MeanSquaredError(),
metrics=[tf.keras.metrics.RootMeanSquaredError()]
)
def call(self, features: Dict[str, tf.Tensor]) -> tf.Tensor:
return self.ranking_model(
(features["user_id"], features["movie_title"]))
def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
labels = features.pop("user_rating")
rating_predictions = self(features)
# The task computes the loss and the metrics.
return self.task(labels=labels, predictions=rating_predictions)
模型训练和评估
model = MovielensModel()
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))
对训练、测试集进行shuffle
、batch
和 cache
操作:
cached_train = train.shuffle(100_000).batch(8192).cache()
cached_test = test.batch(4096).cache()
训练
cached_train = train.shuffle(100_000).batch(8192).cache()
cached_test = test.batch(4096).cache()
model.evaluate(cached_test, return_dict=True)
测试模型效果
现在我们可以通过计算一系列电影的预测来测试排名模型,然后根据预测对这些电影进行排名:
test_ratings = {}
test_movie_titles = ["M*A*S*H (1970)", "Dances with Wolves (1990)", "Speed (1994)"]
for movie_title in test_movie_titles:
test_ratings[movie_title] = model({
"user_id": np.array(["42"]),
"movie_title": np.array([movie_title])
})
print("Ratings:")
for title, score in sorted(test_ratings.items(), key=lambda x: x[1], reverse=True):
print(f"{title}: {score}")
模型服务
tf.saved_model.save(model, "export")
我们现在可以加载它并执行预测:
loaded = tf.saved_model.load("export")
loaded({"user_id": np.array(["42"]), "movie_title": ["Speed (1994)"]}).numpy()