Music-Recommendation

一、环境启动

spark-shell --driver-memory 8g --conf spark.sql.crossJoin.enabled=true

二、数据集

使用 Audioscrobbler 公开发布的一个数据集。Audioscrobbler 是 last.fm 的第一个音乐推荐系统。last.fm 创建于 2002 年,是最早的互联 网流媒体广播站点之一。Audioscrobbler 提供了开放 的“scrobbling”API,“scrobbling”可以记录听众播放过哪些艺术家的歌 曲。last.fm(https://www.last.fm/ )使用这些音乐播放记录构建了一个 强大的音乐推荐引擎。由于第三方应用和网站可以把音乐播放数据反馈 给这个推荐引擎,这个推荐引擎系统覆盖了数百万的用户。

然而,人们虽然经常听音乐,但很少给音乐评分。因此 Audioscrobbler 数据集要大得多,它覆盖了更多的用户和艺术家,也包含了更多的总体 信息,虽然单条记录的信息比较少。这种类型的数据通常被称为隐式反 馈数据 ,因为用户和艺术家的关系是通过其他行动隐含体现出来的, 而不是通过显式的评分或点赞得到的。

http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz

主要的数据集在文件 user_artist_data.txt 中,它包含 141 000 个用户和 160 万个艺术家,记录 了约 2420 万条用户播放艺术家歌曲的信息,其中包括播放次数信息。数据集在 artist_data.txt 文件中给出了每个艺术家的 ID 和对应的名字。 请注意,记录播放信息时,客户端应用提交的是艺术家的名字。名字如 果有拼写错误,或使用了非标准的名称,事后才能被发现。比 如,“The Smiths”“Smiths, The”和“the smiths”看似代表不同艺术家的 ID,但它们其实明显是指同一个艺术家。因此,为了将拼写错误的艺术 家 ID 或 ID 变体对应到该艺术家的规范 ID,数据集提供了 artist_alias.txt 文件。

三、算法(Alternating Least Square )

Collaborative filtering

Collaborative filtering (CF) is a technique used by recommender systems. Collaborative filtering has two senses, a narrow one and a more general one.

Model-based

In this approach, models are developed using different data mining, machine learning algorithms to predict users' rating of unrated items. There are many model-based CF algorithms. Bayesian networks, clustering models, latent semantic models such as singular value decomposition, probabilistic latent semantic analysis, multiple multiplicative factor, latent Dirichlet allocation and Markov decision process based models.[5]

Through this approach, dimensionality reduction methods are mostly being used as complementary technique to improve robustness and accuracy of memory-based approach. In this sense, methods like singular value decomposition, principal component analysis, known as latent factor models, compress user-item matrix into a low-dimensional representation in terms of latent factors. One advantage of using this approach is that instead of having a high dimensional matrix containing abundant number of missing values we will be dealing with a much smaller matrix in lower-dimensional space. A reduced presentation could be utilized for either user-based or item-based neighborhood algorithms that are presented in the previous section. There are several advantages with this paradigm. It handles the sparsity of the original matrix better than memory based ones. Also comparing similarity on the resulting matrix is much more scalable especially in dealing with large sparse datasets.[6]

Challenges

Data sparsity

In practice, many commercial recommender systems are based on large datasets. As a result, the user-item matrix used for collaborative filtering could be extremely large and sparse, which brings about the challenges in the performances of the recommendation.

One typical problem caused by the data sparsity is the cold start problem. As collaborative filtering methods recommend items based on users' past preferences, new users will need to rate sufficient number of items to enable the system to capture their preferences accurately and thus provides reliable recommendations.

Similarly, new items also have the same problem. When new items are added to the system, they need to be rated by a substantial number of users before they could be recommended to users who have similar tastes to the ones who rated them. The new item problem does not affect content-based recommendations, because the recommendation of an item is based on its discrete set of descriptive qualities rather than its ratings.

数学 上,这些算法把用户和产品数据当成一个大矩阵 A ,矩阵第 i 行和第 j 列上的元素有值,代表用户 i 播放过艺术家 j 的音乐。矩阵 A 是稀疏 的:A 中大多数元素都是 0,因为相对于所有可能的用户 - 艺术家组 合,只有很少一部分组合会出现在数据中。算法将 A 分解为两个小矩阵 XY 的乘积。矩阵 X 和矩阵 Y 非常“瘦”,因为 A 有很多行和列,但 XY 的行很多而列很少(列数用 k 表示)。这 k 个列就是潜在因素,用 于解释数据中的交互关系。

四、准备数据

Spark MLlib 的 ALS 算法实现并不严格要求用户和产品的 ID 必须是数 值型,不过当 ID 为 32 位非负整数时,效率会更高。使用 Int 表示 ID 是有好处的,但同时意味着 ID 不能超过 Int 的最大值 (Int.MaxValue ),即 2147483647。我们的数据集是否已经满足了这 个要求?利用 SparkSession 的 textFile 方法,将数据文件转换成 String 类型的数据集:

val rawUserArtistData = spark.read.textFile("/Users/mark/Learning/profiledata_06-May-2005/user_artist_data.txt")
rawUserArtistData.take(5).foreach(println)

文件的每行包含一个用户 ID、一个艺术家 ID 和播放次数,用空格分 隔。要计算用户 ID 的统计信息,可以用空格拆分每行,并将前两个值 解析为整数,其结果在概念上可以看成 Int 类型的两个列:用户 ID 和 艺术家 ID。将其转换为包含列 user 和 artist 的 DataFrame 是有意义 的,因为这样就可以简单地计算出两列的最大值和最小值:

val userArtistDF = rawUserArtistData.map { line => 
    val Array(user, artist, _*) = line.split(' ')
    (user.toInt, artist.toInt)
}.toDF("user", "artist")

userArtistDF.agg(
min("user"), max("user"), min("artist"), max("artist")).show()

map() 函数要求对每个输入必须严格返回一个值,因此这里不能 用这个函数。另一种可行的方法是用 filter() 方法删除那些无法解析 的行,但这会重复解析逻辑。当需要将每个元素映射为零个、一个或更 多结果时,我们应该使用 flatMap() 函数,因为它将每个输入对应的 零个或多个结果组成的集合简单展开,然后放入到一个更大的数据集中。它可以和 Scala 集合一起使用,也可以和 Scala 的 Option 类一起使 用。Option 代表一个值可以不存在,有点儿像只有 1 或 0 的一个简单 集合,1 对应子类 Some ,0 对应子类 None 。因此在以下代码中,虽然 flatMap 中的函数本可以简单返回一个空 List ,或一个只有一个元素 的 List ,但使用 Some 和 None 更合理,这种方法简单明了。

val rawArtistData = spark.read.textFile("/Users/mark/Learning/profiledata_06-May-2005/artist_data.txt")

val artistByID = rawArtistData.flatMap { line => 
    val (id, name) = line.span(_ != '\t')
    if (name.isEmpty) {
    None 
    } else {
        try {
            Some((id.toInt, name.trim)) } catch {
            case _: NumberFormatException => None }
        }
}.toDF("id", "name")

artist_alias.txt 将拼写错误的艺术家 ID 或非标准的艺术家 ID 映射为艺术 家的正规名字。其中每行有两个 ID,用制表符分隔。这个文件相对较 小,有 200 000 个记录。有必要把它转成 Map 集合的形式,将“不良 的”艺术家 ID 映射到“良好的”ID,而不是简单地把它作为包含艺术家 ID 二元组的数据集。这里又有一点小问题:由于某种原因有些行没有 艺术家的第一个 ID。这些行将被过滤掉:

val rawArtistAlias = spark.read.textFile("/Users/mark/Learning/profiledata_06-May-2005/artist_alias.txt") 
val artistAlias = rawArtistAlias.flatMap { line =>
    val Array(artist, alias) = line.split('\t') 
    if (artist.isEmpty) {   
        None 
    } else {
        Some((artist.toInt, alias.toInt)) }
}.collect().toMap

比如,第一条将 ID 1208690 映射为 1003926。接下来我们可以从包含艺 术家名字的数据集中进行查找:

artistByID.filter($"id" isin (1208690, 1003926)).show()

五、构建模型

import org.apache.spark.sql._ 
import org.apache.spark.broadcast._

def buildCounts(
    rawUserArtistData: Dataset[String],
    bArtistAlias: Broadcast[Map[Int,Int]]): DataFrame = {
    rawUserArtistData.map { line =>
    val Array(userID, artistID, count) = line.split(' ').map(_.toInt) 
    val finalArtistID =
    bArtistAlias.value.getOrElse(artistID, artistID) 
    (userID, finalArtistID, count)
}.toDF("user", "artist", "count") }

val bArtistAlias = spark.sparkContext.broadcast(artistAlias)
val trainData = buildCounts(rawUserArtistData, bArtistAlias) 
trainData.cache()

为 artistAlias 创建一个广播变量,取名为bArtistAlias 。使用广播变量时,Spark 对集群中每个 executor 只发 送一个副本,并且在内存里也只保存一个副本。如果有几千个任务在 executor 上并行执行,使用广播变量能节省巨大的网络流量和内存。

import org.apache.spark.ml.recommendation._ 
import scala.util.Random
val model = new ALS().
    setSeed(Random.nextLong()).
    setImplicitPrefs(true). 
    setRank(10). 
    setRegParam(0.01). 
    setAlpha(1.0).
    setMaxIter(5). 
    setUserCol("user"). 
    setItemCol("artist"). 
    setRatingCol("count").
    setPredictionCol("prediction").
    fit(trainData)

特征向量

model.userFactors.show(1, truncate = false)

检查推荐结果

val userID = 2093760
val existingArtistIDs = trainData. 
    filter($"user" === userID).
    select("artist").as[Int].collect() 
artistByID.filter($"id" isin (existingArtistIDs:_*)).show() 
def makeRecommendations(
    model: ALSModel,
    userID: Int,
    howMany: Int): DataFrame = {

val toRecommend = model.itemFactors. 
    select($"id".as("artist")). 
    withColumn("user", lit(userID)) 

model.transform(toRecommend). 
    select("artist", "prediction"). 
    orderBy($"prediction".desc). 
    limit(howMany) 
}
val topRecommendations = makeRecommendations(model, userID, 5) 
topRecommendations.show()

查找艺术家姓名

val recommendedArtistIDs = topRecommendations.select("artist").as[Int].collect()
artistByID.filter($"id" isin (recommendedArtistIDs:_*)).show()

计算AUC

import scala.collection.mutable.ArrayBuffer

def areaUnderCurve(
      positiveData: DataFrame,
      bAllArtistIDs: Broadcast[Array[Int]],
      predictFunction: (DataFrame => DataFrame)): Double = {

    // What this actually computes is AUC, per user. The result is actually something
    // that might be called "mean AUC".

    // Take held-out data as the "positive".
    // Make predictions for each of them, including a numeric score
    val positivePredictions = predictFunction(positiveData.select("user", "artist")).
      withColumnRenamed("prediction", "positivePrediction")

    // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
    // small AUC problems, and it would be inefficient, when a direct computation is available.

    // Create a set of "negative" products for each user. These are randomly chosen
    // from among all of the other artists, excluding those that are "positive" for the user.
    val negativeData = positiveData.select("user", "artist").as[(Int,Int)].
      groupByKey { case (user, _) => user }.
      flatMapGroups { case (userID, userIDAndPosArtistIDs) =>
        val random = new Random()
        val posItemIDSet = userIDAndPosArtistIDs.map { case (_, artist) => artist }.toSet
        val negative = new ArrayBuffer[Int]()
        val allArtistIDs = bAllArtistIDs.value
        var i = 0
        // Make at most one pass over all artists to avoid an infinite loop.
        // Also stop when number of negative equals positive set size
        while (i < allArtistIDs.length && negative.size < posItemIDSet.size) {
          val artistID = allArtistIDs(random.nextInt(allArtistIDs.length))
          // Only add new distinct IDs
          if (!posItemIDSet.contains(artistID)) {
            negative += artistID
          }
          i += 1
        }
        // Return the set with user ID added back
        negative.map(artistID => (userID, artistID))
      }.toDF("user", "artist")

    // Make predictions on the rest:
    val negativePredictions = predictFunction(negativeData).
      withColumnRenamed("prediction", "negativePrediction")

    // Join positive predictions to negative predictions by user, only.
    // This will result in a row for every possible pairing of positive and negative
    // predictions within each user.
    val joinedPredictions = positivePredictions.join(negativePredictions, "user").
      select("user", "positivePrediction", "negativePrediction").cache()

    // Count the number of pairs per user
    val allCounts = joinedPredictions.
      groupBy("user").agg(count(lit("1")).as("total")).
      select("user", "total")
    // Count the number of correctly ordered pairs per user
    val correctCounts = joinedPredictions.
      filter($"positivePrediction" > $"negativePrediction").
      groupBy("user").agg(count("user").as("correct")).
      select("user", "correct")

    // Combine these, compute their ratio, and average over all users
    val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer").
      select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")).
      agg(mean("auc")).
      as[Double].first()

    joinedPredictions.unpersist()

    meanAUC
  }

训练集只用于 训练 ALS 模型,验证集用于评估模型。

90% 的数据用于训 练,剩余的 10% 用于交叉验证:

val allData = buildCounts(rawUserArtistData, bArtistAlias) 
val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1)) 
trainData.cache()
cvData.cache()
val allArtistIDs = allData.select("artist").as[Int].distinct().collect() 
val bAllArtistIDs = spark.sparkContext.broadcast(allArtistIDs)
val model = new ALS().
    setSeed(Random.nextLong()).
    setImplicitPrefs(true). 
    setRank(10).
    setRegParam(0.01).
    setAlpha(1.0).
    setMaxIter(5). 
    setUserCol("user").
    setItemCol("artist"). 
    setRatingCol("count").
    setPredictionCol("prediction"). 
    fit(trainData)

areaUnderCurve(cvData, bAllArtistIDs, model.transform)

六、超参数

  • setRank(10)

模型的潜在因素的个数,即“用户 - 特征”和“产品 - 特征”矩阵的列 数;一般来说,它也是矩阵的阶。

  • setMaxIter(5)

矩阵分解迭代的次数;迭代的次数越多,花费的时间越长,但分解 的结果可能会更好。

  • setRegParam(0.01)

标准的过拟合参数,通常也称作 lambda;值越大越不容易产生过拟 合,但值太大会降低分解的准确率。

  • setAlpha(1.0)

控制矩阵分解时,被观察到的“用户 - 产品”交互相对没被观察到的 交互的权重。

8 种可能的组合:rank = 5 或 30,regParam = 4.0 或 0.0001,以及 alpha = 1.0 或 40.0

 val evaluations =
  for (rank <- Seq(5, 30);
regParam <- Seq(4.0, 0.0001);
alpha <- Seq(1.0, 40.0)) 
yield {
    val model = new ALS().
    setSeed(Random.nextLong()).
    setImplicitPrefs(true).
    setRank(rank).setRegParam(regParam). 
    setAlpha(alpha).
    setMaxIter(20). 
    setUserCol("user").
    setItemCol("artist"). 
    setRatingCol("count").
    setPredictionCol("prediction"). 
    fit(trainData)
    val auc = areaUnderCurve(cvData, bAllArtistIDs, model.transform)

model.userFactors.unpersist() 
model.itemFactors.unpersist()

(auc, (rank, regParam, alpha))
    }
evaluations.sorted.reverse.foreach(println) 

![image-20200112192514163](/Users/mark/Library/Application Support/typora-user-images/image-20200112192514163.png)