0
点赞
收藏
分享

微信扫一扫

【电影推荐系统】基于 ALS 的协同过滤推荐算法

谁知我新 2023-07-13 阅读 63

目录

目的

用户电影推荐矩阵主要思路如下

1 UserId 和 MovieID 做笛卡尔积,产生(uid,mid)的元组

2 通过模型预测(uid,mid)的元组。

3 将预测结果通过预测分值进行排序。

4 返回分值最大的 K 个电影,作为当前用户的推荐。

5 排序根据余弦相似度进行排序;

6 用均方根误差RMSE进行模型评估和参数调整   ;

由于电影的特征是稳定不变的,所以离线训练电影相似矩阵,节约实时计算时间。

 完整代码

offlineRecommender.scala

ALSTrainer.scala


目的

训练出用户电影推荐矩阵,电影相似度矩阵。

用户电影推荐矩阵主要思路如下

1 UserId 和 MovieID 做笛卡尔积,产生(uid,mid)的元组

val userMovies = userRDD.cartesian(movieRDD)

2 通过模型预测(uid,mid)的元组。

3 将预测结果通过预测分值进行排序。

4 返回分值最大的 K 个电影,作为当前用户的推荐。

核心代码    

val preRatings = model.predict(userMovies)
    val userRecs = preRatings
      .filter(_.rating > 0)
      .map(rating => (rating.user, (rating.product, rating.rating)))
      .groupByKey()
      .map {
        case (uid, recs) => UserResc(uid, recs.toList
          .sortWith(_._2 > _._2)
          .take(USER_MAX_RECOMMENDATION)
          .map(
            x => Recommendation(x._1, x._2)
          )
        )
      }
      .toDF()

5 排序根据余弦相似度进行排序;

6 用均方根误差RMSE进行模型评估和参数调整   ;

均方根误差RMSE公式

核心代码 

 def adjustALSParam(trainRDD: RDD[Rating], testRDD: RDD[Rating]): Unit = {
    val result = for (rank <- Array(50, 100, 200, 300); lambda <- Array(0.01, 0.1, 1 ))
      yield {
        val model = ALS.train(trainRDD, rank, 5, lambda)
        val rmse = getRMSE(model, testRDD)
        (rank, lambda, rmse)
      }
    println(result.minBy(_._3)

由于电影的特征是稳定不变的,所以离线训练电影相似矩阵,节约实时计算时间。

核心代码   

val movieFeatures = model.productFeatures.map{
        case (mid, feature) => (mid, new DoubleMatrix(feature))
      }
     val movieRecs = movieFeatures.cartesian(movieFeatures)
       .filter{
         case (a,b ) => a._1 != b._1
       }
       .map{
         case(a, b) => {
           val simScore = this.consinSim(a._2, b._2)
           ( a._1, (b._1, simScore) )
         }
       }
       .filter(_._2._2 > 0.6)   
       .groupByKey()
       .map{
         case (mid, item) => MoiveResc(mid, item
           .toList
           .sortWith(_._2 > _._2)
           .map(x => Recommendation(x._1, x._2)))
       }
       .toDF()

 完整代码

offlineRecommender.scala

package com.qh.offline

import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix

//跟sparkMl 里面区分
case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Int)

case class MongoConfig(uri: String, db: String)

//基准推荐对象
case class Recommendation(mid: Int, score: Double)

//基于预测评分的用户推荐列表
case class UserResc(uid: Int, recs: Seq[Recommendation]) //类别, top10基准推荐对象

//基于LFM电影特征向量的电影相似度列表
case class MoiveResc(mid: Int, recs: Seq[Recommendation])

/**
 * 基于隐语义模型的协同过滤 推荐
 *
 * 用户电影推荐矩阵
 * 通过ALS训练出来的Model计算所有当前用户电影的推荐矩阵
 * 1. UserID和MovieId做笛卡尔积
 * 2. 通过模型预测uid,mid的元组
 * 3. 将预测结果通过预测分值进行排序
 * 4. 返回分值最大的K个电影,作为推荐
 * 生成的数据结构 存到UserRecs表中
 *
 * 电影相似度矩阵
 * 模型评估和参数选取
 *
 */
object offline {

  val MONGODB_RATING_COLLECTION = "Rating"
  val USER_RECS = "UserRecs"
  val MOVIE_RECS = "MovieRecs"
  val USER_MAX_RECOMMENDATION = 20

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://hadoop100:27017/recommender",
      "mongo.db" -> "recommender"
    )
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("offline")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._
    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    //加载数据
    val ratingRDD = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRating]
      .rdd
      .map(rating => (rating.uid, rating.mid, rating.score)) //去掉时间戳
      .cache() //缓存,诗酒花在内存中
    //预处理
    val userRDD = ratingRDD.map(_._1).distinct()
    val movieRDD = ratingRDD.map(_._2).distinct()

    //训练LFM
    val trainData = ratingRDD.map(x => Rating(x._1, x._2, x._3))
    val (rank, iterations, lambda) = (200, 5, 0.1)
    val model = ALS.train(trainData, rank, iterations, lambda)

    //基于用户和电影的隐特征,计算预测评分,得到用户的推荐列表
    //    笛卡尔积空矩阵
    val userMovies = userRDD.cartesian(movieRDD)

    //预测
    val preRatings = model.predict(userMovies)

    val userRecs = preRatings
      .filter(_.rating > 0)
      .map(rating => (rating.user, (rating.product, rating.rating)))
      .groupByKey()
      .map {
        case (uid, recs) => UserResc(uid, recs.toList
          .sortWith(_._2 > _._2)
          .take(USER_MAX_RECOMMENDATION)
          .map(
            x => Recommendation(x._1, x._2)
          )
        )
      }
      .toDF()

    userRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", USER_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    //基于电影隐特征,计算相似度矩阵,得到电影的相似度列表
    val movieFeatures = model.productFeatures.map{
        case (mid, feature) => (mid, new DoubleMatrix(feature))
      }

    //电影与电影 笛卡尔积
     val movieRecs = movieFeatures.cartesian(movieFeatures)
       .filter{
         case (a,b ) => a._1 != b._1
       }
       .map{
         case(a, b) => {
           val simScore = this.consinSim(a._2, b._2)
           ( a._1, (b._1, simScore) )
         }
       }
       .filter(_._2._2 > 0.6)    //过滤出相似度
       .groupByKey()
       .map{
         case (mid, item) => MoiveResc(mid, item
           .toList
           .sortWith(_._2 > _._2)
           .map(x => Recommendation(x._1, x._2)))
       }
       .toDF()

    movieRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", MOVIE_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    spark.stop()
  }

  /*
  求解余弦相似度
   */
  def consinSim(matrixA: DoubleMatrix, matrixB: DoubleMatrix):Double = {
//    .dot 点乘  .norm2 L2范数,就是模长
    matrixA.dot(matrixB) / (matrixA.norm2() * matrixB.norm2())
  }
}

ALSTrainer.scala

package com.qh.offline

import breeze.numerics.sqrt
import com.qh.offline.offline.MONGODB_RATING_COLLECTION
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
 * 调参
 * 通过ALS寻找lMF参数,打印到控制台输出
 */
object ALSTrainer {
  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://hadoop100:27017/recommender",
      "mongo.db" -> "recommender"
    )
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("offline")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._
    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
    val ratingRDD = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRating]
      .rdd
      .map(rating => Rating(rating.uid, rating.mid, rating.score)) //去掉时间戳
      .cache()
    //   随机切分数据集=>训练集 测试集
    val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
    val trainRDD = splits(0)
    val testRDD = splits(1)

    //    模型参数选择
    adjustALSParam(trainRDD, testRDD)

    spark.close()

  }

  def adjustALSParam(trainRDD: RDD[Rating], testRDD: RDD[Rating]): Unit = {
    val result = for (rank <- Array(50, 100, 200, 300); lambda <- Array(0.01, 0.1, 1 ))
      yield {
        val model = ALS.train(trainRDD, rank, 5, lambda)
        val rmse = getRMSE(model, testRDD)
        (rank, lambda, rmse)
      }
    println(result.minBy(_._3))
  }

  def getRMSE(model: MatrixFactorizationModel, data: RDD[Rating]): Double = {
    //      计算预测评分
    val userProducts = data.map(item => (item.user, item.product))
    val predictRating = model.predict(userProducts)
    //uid 和 mid 交集 做被减数
    val observed = data.map(item => ((item.user, item.product), item.rating)) //实际观测值和预测值
    val predict = predictRating.map(item => ((item.user, item.product), item.rating))

    sqrt(
      observed.join(predict).map {
        case ((uid, mid), (actual, pre)) => //内连接没有数据冗余,不需要groupby
          val err = actual - pre
          err * err
      }.mean()
    )
  }

}
举报

相关推荐

0 条评论