如何使用Spark ALS实现协同过滤算法java实现

协同过滤与推荐
协同过滤是一种根据用户对各种产品的交互与评分来推荐新产品的推荐系统技术。
协同过滤引入的地方就在于它只需要输入一系列用户/产品的交互记录;
无论是显式的交互(例如在购物网站上进行评分)还是隐式的(例如用户访问了一个
产品的页面但是没有对产品评分)交互皆可。仅仅根据这些交互,协同过滤算法就能
够知道哪些产品之间比较相似(因为相同的用户与它们发生了交互)以及哪些用户之间
比较相似,然后就可以做出新的推荐。
交替最小二乘法
MLlib中包含交替最小二乘法(ALS)的一个实现,这是一个协同过滤的常用算法,可以很好的
扩展到集群上。它位于mllib.recommendation.ALS类中。
ALS会为每个用户和产品都设一个特征向量,这样用户向量与产品向量的点积就接近于它们的得分。
它接收下面所列几个参数:
& & & & 使用的特征向量的大小,更大的特征向量会产生更好的模型,但是也需要话费更大的计算代价,默认10
& iterations
& & & & 要执行的迭代次数,默认10
& & & & 正则化参数,默认0.01
& & & & 用来在ALS中计算置信度的常量,默认1.0
& &numUserBlocks, &numProductBlocks
& & & & 切分用户和产品数据的块的数目,用来控制并行度,可以选择传递-1来上MLlib自动决定.
要使用ALS算法,需要有一个由mllib.recommendation.Rating对象组成的RDD,
其中每个包含一个用户id,一个产品id和一个评分。
实现过程中的一个挑战是每个id都需要是一个32位的整数值。
如果id是字符串或者更大的数字,那么可以直接在ALS中使用id的哈希值,
即使有两个用户或者产品映射到同一个Id上,总体结果依然会不错。
还有一种办法是broadcast()一张从产品id到正兴致的表,来付给每个产品独特的id。
ALS返回一个MatrixFactorizationModel对象来表示结果,
可以调用predict()来对一个由(UserId,productId)对组成的RDD进行预测评分。
也可以对使用model.recommendProducts(userId,numProducts)来为一个给定用户找到最值得推荐的前numProduct个产品。
注意,和MLlib中的其他模型不同,MatrixFactorizationModel对象很大,为每个用户和产品都存储了一个向量。
这样我们就不能把它存储到磁盘上,然后在另一个程序中读取回来。
不过,可以把模型中生成的特征向量RDD,也就是model.userFeatures和model.productFeatures保存到分布式文件系统上。
最后,ALS有两个变种:显示评分(默认情况)和隐式反馈(通过调用ALS.trainImplicit()而非ALS.train()来打开)。
用于显式评分时,每个用户对于一个产品的评分需要是一个得分(例如1到5星),而预测出来的评分也是得分。
而用于隐式反馈时,每个评分代表的是用户会和给定产品发送交互的置信度(比如随着用户访问一个网页次数
的增加,评分也会提高),预测出来的也是置信度。
阅读(...) 评论()Spark机器学习库mllib之协同过滤
很久就想写一篇ML的实践文章,虽然看过肯多资料,总觉得纸上谈兵印象不深刻,过不了多久就忘了,现在就借Spark的Mllib来简单的实际一下推荐算法吧。
说起推荐算法,大家耳熟能详的就是CF(协同过滤),这次就拿CF中ALS(alternating least squares),交替最小二乘,来做个例子吧。
CF里面的算法比较多,有基于物品的,基于用户的,ALS是基于矩阵分解的,关于对推荐算法的小结,请参考我的
先介绍下mllib,mllib是运行在Spark上一个机器学习算法库。借助Spark的内存计算,可以使机器学习的模型计算时间大大缩短。
目前,spark1.0.0中的mllib中已经有很多算法了,具体可以参见官方网站
我们知道,协同过滤是基于用户行为的一种推荐算法,需要用户对Item的评价。
于是乎我们还是找到最经典的数据集movielens,地址
Down下来ml-100k,解压后有很多文件,可以看README里面对数据集的介绍。
user id | item id | rating | timestamp
这里有user对某个movie的评分rating和时间timstamp
先预处理一下数据cat u1.base | awk -F "\t" '{print $1"::"$2"::"$3"::"$4}' & ratings.dat
cat u.item | awk -F "|" '{print $1"\t"$2"\t"$3}' & movies.dat
数据结果:
user id::movie id::rating:: timestamp
2::258::3::
2::269::4::
2::272::5::
2::273::4::
2::274::3::
movie id ::movie id :: movie release date
1::Toy Story (1995)::01-Jan-1995
2::GoldenEye (1995)::01-Jan-1995
3::Four Rooms (1995)::01-Jan-1995
4::Get Shorty (1995)::01-Jan-1995
5::Copycat (1995)::01-Jan-1995
6::Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)::01-Jan-1995
7::Twelve Monkeys (1995)::01-Jan-1995
8::Babe (1995)::01-Jan-1995
9::Dead Man Walking (1995)::01-Jan-1995
10::Richard III (1995)::22-Jan-1996
11::Seven (Se7en) (1995)::01-Jan-1995
12::Usual Suspects, The (1995)::14-Aug-1995
OK,下面我们要用官方的ALS算法例子来运行下这个推荐。
首先导入mllib包,我们需要用到ALS算法类和Rating评分类
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating//加载数据
val data = sc.textFile("/app/hadoop/ml-100k/ratings.dat")
//data中每条数据经过map的split后会是一个数组,模式匹配后,会new一个Rating对象
val ratings = data.map(_.split("::") match { case Array(user, item, rate, ts) =&
Rating(user.toInt, item.toInt, rate.toDouble)
最终会new成对象
scala& ratings take 2
14/06/25 17:51:06 INFO scheduler.DAGScheduler: Computing the requested partition locally
14/06/25 17:51:06 INFO rdd.HadoopRDD: Input split: file:/app/hadoop/ml-100k/ratings.dat:0+1826544
14/06/25 17:51:07 INFO spark.SparkContext: Job finished: take at &console&:22, took 0. s
res0: Array[org.apache.spark.mllib.recommendation.Rating] = Array(Rating(1,1,5.0), Rating(1,2,3.0))//设置潜在因子个数为10
scala& val rank = 10
rank: Int = 10
//要迭代计算30次
scala& val numIterations = 30
numIterations: Int = 30
接下来调用ALS.train()方法,进行模型训练:
val model = ALS.train(ratings, rank, numIterations, 0.01)
14/06/25 17:53:04 INFO storage.MemoryStore: ensureFreeSpace(200) called with curMem=84002, maxMem=
14/06/25 17:53:04 INFO storage.MemoryStore: Block broadcast_60 stored as values to memory (estimated size 200.0 B, free 294.3 MB)
model: org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@17596ee0
训练完后,我们要对比一下预测的结果,我们那训练集当作测试集,来进行对比测试:
scala& val usersProducts = ratings.map { case Rating(user, product, rate) =&
(user, product)
usersProducts: org.apache.spark.rdd.RDD[(Int, Int)] = MappedRDD[623] at map at &console&:21//预测后的用户,电影,评分
scala& val predictions =
model.predict(usersProducts).map { case Rating(user, product, rate) =&
((user, product), rate)
predictions: org.apache.spark.rdd.RDD[((Int, Int), Double)] = MappedRDD[632] at map at &console&:30
我们用均方根误差来评价一个模型的好坏,所以我们要算一下MSE,来判定这个模型的准确率,其值越小说明越准确。
join一下,然后再计算:
//原始{(用户,电影),评分} join
预测后的{(用户,电影),评分}
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =&
((user, product), rate)
}.join(predictions)
ratesAndPreds.collect take 3
14/06/25 17:59:35 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 632.0, whose tasks have all completed, from pool
14/06/25 17:59:35 INFO scheduler.DAGScheduler: Stage 632 (collect at &console&:34) finished in 1.906 s
14/06/25 17:59:35 INFO spark.SparkContext: Job finished: collect at &console&:34, took 1. s
res11: Array[((Int, Int), (Double, Double))] = Array(((933,627),(2.0,1.9198)), ((537,24),(1.0,2.8327)), ((717,125),(4.0,3.737)))
join后的结果,就是每个用户对电影的实际打分和预测打分的一个对比,例如:
(用户,电影),(原始评分,预测的评分)
(933,627),(2.0,1.9198)
(537,24),(1.0, 2.8327)
(717,125),(4.0,3.737)
最后计算均方根误差:
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =&
val err = (r1 - r2)
}.mean()14/06/25 18:02:28 INFO scheduler.TaskSetManager: Finished TID 79 in 554 ms on localhost (progress: 1/1)
14/06/25 18:02:28 INFO scheduler.DAGScheduler: Stage 702 (mean at &console&:36) finished in 0.556 s
14/06/25 18:02:28 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 702.0, whose tasks have all completed, from pool
14/06/25 18:02:28 INFO spark.SparkContext: Job finished: mean at &console&:36, took 0. s
MSE: Double = 0.2655
顺便提一下预测的API有三个重载,上面用的是第二个:
调用model的API
scala& model.predict
def predict(user: Int, product: Int): Double
def predict(usersProducts: org.apache.spark.rdd.RDD[(Int, Int)]): org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating]
def predict(usersProductsJRDD: org.apache.spark.api.java.JavaRDD[Array[Byte]]): org.apache.spark.api.java.JavaRDD[Array[Byte]]
我们也可以传入user id, product id 来与此 某个用户 对某个 电影 的评分
model.predict(1,2)
14/06/25 18:17:36 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 963.0, whose tasks have all completed, from pool
14/06/25 18:17:36 INFO scheduler.DAGScheduler: Stage 963 (lookup at MatrixFactorizationModel.scala:46) finished in 0.035 s
14/06/25 18:17:36 INFO spark.SparkContext: Job finished: lookup at MatrixFactorizationModel.scala:46, took 0. s
res13: Double = 2.7363
模型都有了,推荐系统怎么设计就根据实际需求了 :)
MLlib充分利用了Spark的快速内存计算,迭代效率高的优势,将机器学习的模型计算性能提到另一片天地,这也就是为什么最近Spark备受推崇,那么火的原因。
目前Mllib的算法库还不是很多,但是Mahout都宣布不接受Mapreduce算法库,都迁移到spark上来了,看来未来机器学习要靠Spark了。至于为什么对于协同过滤先支持的是ALS,也是看中的ALS算法的并行度比较好,在Spark上更能发挥该算法的优势吧。
——EOF——
看过本文的人也看了:
我要留言技术领域:
取消收藏确定要取消收藏吗?
删除图谱提示你保存在该图谱下的知识内容也会被删除,建议你先将内容移到其他图谱中。你确定要删除知识图谱及其内容吗?
删除节点提示无法删除该知识节点,因该节点下仍保存有相关知识内容!
删除节点提示你确定要删除该知识节点吗?如何使用Spark ALS实现协同过滤
我的图书馆
如何使用Spark ALS实现协同过滤
http://blog.javachen.com//how-to-implement-collaborative-filtering-using-spark-als.html
本文主要记录最近一段时间学习和实现的一些总结,希望对大家熟悉Spark ALS算法有所帮助。
【】Spark1.4.0中MatrixFactorizationModel提供了recommendForAll方法实现离线批量推荐,见。
为了测试简单,在本地以local方式运行Spark,你需要做的是下载编译好的压缩包解压即可,可以参考。
测试数据使用的,下载之后解压到data目录。数据的格式请参考README中的说明,需要注意的是ratings.dat中的数据被处理过,每个用户至少访问了20个商品。
下面的代码均在spark-shell中运行,启动时候可以根据你的机器内存设置JVM参数,例如:
bin/spark-shell --executor-memory 3g --driver-memory 3g --driver-java-options '-Xms2g -Xmx2g -XX:+UseCompressedOops'
这个例子主要演示如何训练数据、评分并计算根均方差。
首先,启动spark-shell,然后引入mllib包,我们需要用到ALS算法类和Rating评分类:
import org.apache.spark.mllib.recommendation.{ALS, Rating}
Spark的日志级别默认为INFO,你可以手动设置为WARN级别,同样先引入log4j依赖:
import org.apache.log4j.{Logger,Level}
然后,运行下面代码:
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
spark-shell启动成功之后,sc为内置变量,你可以通过它来加载测试数据:
val data = sc.textFile("data/ml-1m/ratings.dat")
接下来解析文件内容,获得用户对商品的评分记录:
val ratings = data.map(_.split("::") match { case Array(user, item, rate, ts) =&
Rating(user.toInt, item.toInt, rate.toDouble)
}).cache()
查看第一条记录:
scala& ratings.first
res81: org.apache.spark.mllib.recommendation.Rating = Rating(1,1193,5.0)
我们可以统计文件中用户和商品数量:
val users = ratings.map(_.user).distinct()
val products = ratings.map(_.product).distinct()
println("Got "+ratings.count()+" ratings from "+users.count+" users on "+products.count+" products.")
可以看到如下输出:
//Got 1000209 ratings from 6040 users on 3706 products.
你可以对评分数据生成训练集和测试集,例如:训练集和测试集比例为8比2:
val splits = ratings.randomSplit(Array(0.8, 0.2), seed = 111l)
val training = splits(0).repartition(numPartitions)
val test = splits(1).repartition(numPartitions)
这里,我们是将评分数据全部当做训练集,并且也为测试集。
接下来调用ALS.train()方法,进行模型训练:
val rank = 12
val lambda = 0.01
val numIterations = 20
val model = ALS.train(ratings, rank, numIterations, lambda)
训练完后,我们看看model中的用户和商品特征向量:
model.userFeatures
//res82: org.apache.spark.rdd.RDD[(Int, Array[Double])] = users MapPartitionsRDD[400] at mapValues at ALS.scala:218
model.userFeatures.count
//res84: Long = 6040
model.productFeatures
//res85: org.apache.spark.rdd.RDD[(Int, Array[Double])] = products MapPartitionsRDD[401] at mapValues at ALS.scala:222
model.productFeatures.count
//res86: Long = 3706
我们要对比一下预测的结果,注意:我们将训练集当作测试集来进行对比测试。从训练集中获取用户和商品的映射:
val usersProducts= ratings.map { case Rating(user, product, rate) =&
(user, product)
显然,测试集的记录数等于评分总记录数,验证一下:
usersProducts.count
//Long = 1000209
使用推荐模型对用户商品进行预测评分,得到预测评分的数据集:
var predictions = model.predict(usersProducts).map { case Rating(user, product, rate) =&
((user, product), rate)
查看其记录数:
predictions.count //Long = 1000209
将真实评分数据集与预测评分数据集进行合并,这样得到用户对每一个商品的实际评分和预测评分:
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =&
((user, product), rate)
}.join(predictions)
ratesAndPreds.count
//Long = 1000209
然后计算根均方差:
val rmse= math.sqrt(ratesAndPreds.map { case ((user, product), (r1, r2)) =&
val err = (r1 - r2)
println(s"RMSE = $rmse")
上面这段代码其实就是对测试集进行评分预测并计算相似度,这段代码可以抽象为一个方法,如下:
/** Compute RMSE (Root Mean Squared Error). */
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating]) = {
val usersProducts = data.map { case Rating(user, product, rate) =&
(user, product)
val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) =&
((user, product), rate)
val ratesAndPreds = data.map { case Rating(user, product, rate) =&
((user, product), rate)
}.join(predictions)
math.sqrt(ratesAndPreds.map { case ((user, product), (r1, r2)) =&
val err = (r1 - r2)
除了RMSE指标,我们还可以及时AUC以及Mean average precision at K (MAPK),关于AUC的计算方法,参考,关于MAPK的计算方法可以参考《》一书第四章节内容,或者你可以看本文后面内容。
保存真实评分和预测评分
我们还可以保存用户对商品的真实评分和预测评分记录到本地文件:
ratesAndPreds.sortByKey().repartition(1).sortBy(_._1).map({
case ((user, product), (rate, pred)) =& (user + "," + product + "," + rate + "," + pred)
}).saveAsTextFile("/tmp/result")
上面这段代码先按用户排序,然后重新分区确保目标目录中只生成一个文件。如果你重复运行这段代码,则需要先删除目标路径:
import scala.sys.process._
"rm -r /tmp/result".!
我们还可以对预测的评分结果按用户进行分组并按评分倒排序:
predictions.map { case ((user, product), rate) =&
(user, (product, rate))
}.groupByKey(numPartitions).map{case (user_id,list)=&
(user_id,list.toList.sortBy {case (goods_id,rate)=& - rate})
给一个用户推荐商品
这个例子主要是记录如何给一个或大量用户进行推荐商品,例如,对用户编号为384的用户进行推荐,查出该用户在测试集中评分过的商品。
找出5个用户:
users.take(5)
//Array[Int] = Array(384, , )
查看用户编号为384的用户的预测结果中预测评分排前10的商品:
val userId = users.take(1)(0) //384
val K = 10
val topKRecs = model.recommendProducts(userId, K)
println(topKRecs.mkString("\n"))
Rating(384,)
Rating(384,129,8.676)
Rating(384,184,8.853)
Rating(384,811,7.284)
Rating(384,)
Rating(384,)
Rating(384,)
Rating(384,)
Rating(384,397,7.967)
Rating(384,97,7.754)
查看该用户的评分记录:
val goodsForUser=ratings.keyBy(_.user).lookup(384)
// Seq[org.apache.spark.mllib.recommendation.Rating] = WrappedArray(Rating(384,), Rating(384,), Rating(384,593,5.0), Rating(384,599,3.0), Rating(384,673,2.0), Rating(384,), Rating(384,), Rating(384,), Rating(384,), Rating(384,204,4.0), Rating(384,), Rating(384,), Rating(384,260,4.0), Rating(384,), Rating(384,), Rating(384,), Rating(384,), Rating(384,), Rating(384,), Rating(384,), Rating(384,), Rating(384,))
productsForUser.size //Int = 22
productsForUser.sortBy(-_.rating).take(10).map(rating =& (rating.product, rating.rating)).foreach(println)
可以看到该用户对22个商品评过分以及浏览的商品是哪些。
我们可以该用户对某一个商品的实际评分和预测评分方差为多少:
val actualRating = productsForUser.take(1)(0)
//actualRating: org.apache.spark.mllib.recommendation.Rating = Rating(384,)
val predictedRating = model.predict(789, actualRating.product)
val predictedRating = model.predict(384, actualRating.product)
//predictedRating: Double = 1.4637
val squaredError = math.pow(predictedRating - actualRating.rating, 2.0)
//squaredError: Double = 0.5075172
如何找出和一个已知商品最相似的商品呢?这里,我们可以使用余弦相似度来计算:
import org.jblas.DoubleMatrix
/* Compute the cosine similarity between two vectors */
def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
以2055商品为例,计算实际评分和预测评分相似度
val itemId = 2055
val itemFactor = model.productFeatures.lookup(itemId).head
//itemFactor: Array[Double] = Array(0.14, -0.86, -1.4, -0.9518, -0.3333, -0.3022, -0.8916, -0.62415, -0.2258)
val itemVector = new DoubleMatrix(itemFactor)
//itemVector: org.jblas.DoubleMatrix = [0..435731; -0..443828; -1..627457; -0.326453; -0.993985; -0.871032; -0.757889; -0.146219; -0.725426]
cosineSimilarity(itemVector, itemVector)
// res99: Double = 0.9999
找到和该商品最相似的10个商品:
val sims = model.productFeatures.map{ case (id, factor) =&
val factorVector = new DoubleMatrix(factor)
val sim = cosineSimilarity(factorVector, itemVector)
val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double] { case (id, similarity) =& similarity })
//sortedSims: Array[(Int, Double)] = Array((9), (4), (6), (1), (9), (1), (2), (7), (2), (7))
println(sortedSims.mkString("\n"))
显然第一个最相似的商品即为该商品本身,即2055,我们可以修改下代码,取前k+1个商品,然后排除第一个:
val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double] { case (id, similarity) =& similarity })
//sortedSims2: Array[(Int, Double)] = Array((9), (4), (6), (1), (9), (1), (2), (7), (2), (7), (4))
sortedSims2.slice(1, 11).map{ case (id, sim) =& (id, sim) }.mkString("\n")
接下来,我们可以计算给该用户推荐的前K个商品的平均准确度MAPK,该算法定义如下(该算法是否正确还有待考证):
/* Function to compute average precision given a set of actual and predicted ratings */
// Code for this function is based on: https://github.com/benhamner/Metrics
def avgPrecisionK(actual: Seq[Int], predicted: Seq[Int], k: Int): Double = {
val predK = predicted.take(k)
var score = 0.0
var numHits = 0.0
for ((p, i) &- predK.zipWithIndex) {
if (actual.contains(p)) {
numHits += 1.0
score += numHits / (i.toDouble + 1.0)
if (actual.isEmpty) {
score / scala.math.min(actual.size, k).toDouble
给该用户推荐的商品为:
val actualProducts = productsForUser.map(_.product)
//actualProducts: Seq[Int] = ArrayBuffer(, 593, 599, 673, , , 204, , 260, , , , , 1304)
给该用户预测的商品为:
val predictedProducts = topKRecs.map(_.product)
//predictedProducts: Array[Int] = Array(, 184, 811, , , 397, 97)
最后的准确度为:
val apk10 = avgPrecisionK(actualProducts, predictedProducts, 10)
// apk10: Double = 0.0
你可以评分记录中获得所有用户然后依次给每个用户推荐:
val users = ratings.map(_.user).distinct()
users.collect.flatMap { user =&
model.recommendProducts(user, 10)
这种方式是遍历内存中的一个集合然后循环调用RDD的操作,运行会比较慢,另外一种方式是直接操作model中的userFeatures和productFeatures,代码如下:
val itemFactors = model.productFeatures.map { case (id, factor) =& factor }.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
println(itemMatrix.rows, itemMatrix.columns)
//(3706,12)
// broadcast the item factor matrix
val imBroadcast = sc.broadcast(itemMatrix)
//获取商品和索引的映射
var idxProducts=model.productFeatures.map { case (prodcut, factor) =& prodcut }.zipWithIndex().map{case (prodcut, idx) =& (idx,prodcut)}.collectAsMap()
val idxProductsBroadcast = sc.broadcast(idxProducts)
val allRecs = model.userFeatures.map{ case (user, array) =&
val userVector = new DoubleMatrix(array)
val scores = imBroadcast.value.mmul(userVector)
val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
//根据索引取对应的商品id
val recommendedProducts = sortedWithId.map(_._2).map{idx=&idxProductsBroadcast.value.get(idx).get}
(user, recommendedProducts)
这种方式其实还不是最优方法,更好的方法可以参考,当然这篇文章中的代码还可以继续优化一下。我修改后的代码如下,供大家参考:
val productFeatures = model.productFeatures.collect()
var productArray = ArrayBuffer[Int]()
var productFeaturesArray = ArrayBuffer[Array[Double]]()
for ((product, features) &- productFeatures) {
productArray += product
productFeaturesArray += features
val productArrayBroadcast = sc.broadcast(productArray)
val productFeatureMatrixBroadcast = sc.broadcast(new DoubleMatrix(productFeaturesArray.toArray).transpose())
start = System.currentTimeMillis()
val allRecs = model.userFeatures.mapPartitions { iter =&
// Build user feature matrix for jblas
var userFeaturesArray = ArrayBuffer[Array[Double]]()
var userArray = new ArrayBuffer[Int]()
while (iter.hasNext) {
val (user, features) = iter.next()
userArray += user
userFeaturesArray += features
var userFeatureMatrix = new DoubleMatrix(userFeaturesArray.toArray)
var userRecommendationMatrix = userFeatureMatrix.mmul(productFeatureMatrixBroadcast.value)
var productArray=productArrayBroadcast.value
var mappedUserRecommendationArray = new ArrayBuffer[String](params.topk)
// Extract ratings from the matrix
for (i &- 0 until userArray.length) {
var ratingSet =
mutable.TreeSet.empty(Ordering.fromLessThan[(Int,Double)](_._2 & _._2))
for (j &- 0 until productArray.length) {
var rating = (productArray(j), userRecommendationMatrix.get(i,j))
ratingSet += rating
mappedUserRecommendationArray += userArray(i)+","+ratingSet.take(params.topk).mkString(",")
mappedUserRecommendationArray.iterator
悲哀的是,上面的方法还是不能解决问题,因为矩阵相乘会撑爆集群内存;可喜的是,如果你关注Spark最新动态,你会发现Spark1.4.0中MatrixFactorizationModel提供了recommendForAll方法实现离线批量推荐,详细说明见。因为,我使用的Hadoop版本是CDH-5.4.0,其中Spark版本还是1.3.0,所以暂且不能在集群上测试Spark1.4.0中添加的新方法。
如果上面结果跑出来了,就可以验证推荐结果是否正确。还是以384用户为例:
allRecs.lookup(384).head.take(10)
//res50: Array[Int] = Array(, , , , 853, 759)
topKRecs.map(_.product)
//res49: Array[Int] = Array(, , , , 853, 759)
接下来,我们可以计算所有推荐结果的准确度了,首先,得到每个用户评分过的所有商品:
val userProducts = ratings.map{ case Rating(user, product, rating) =& (user, product) }.groupBy(_._1)
然后,预测的商品和实际商品关联求准确度:
// finally, compute the APK for each user, and average them to find MAPK
val MAPK = allRecs.join(userProducts).map{ case (userId, (predicted, actualWithIds)) =&
val actual = actualWithIds.map(_._2).toSeq
avgPrecisionK(actual, predicted, K)
}.reduce(_ + _) / allRecs.count
println("Mean Average Precision at K = " + MAPK)
//Mean Average Precision at K = 0.260383
其实,我们也可以使用Spark内置的算法计算RMSE和MAE:
// MSE, RMSE and MAE
import org.apache.spark.mllib.evaluation.RegressionMetrics
val predictedAndTrue = ratesAndPreds.map { case ((user, product), (actual, predicted)) =& (actual, predicted) }
val regressionMetrics = new RegressionMetrics(predictedAndTrue)
println("Mean Squared Error = " + regressionMetrics.meanSquaredError)
println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)
// Mean Squared Error = 0.8566
// Root Mean Squared Error = 0.0918
import org.apache.spark.mllib.evaluation.RankingMetrics
val predictedAndTrueForRanking = allRecs.join(userProducts).map{ case (userId, (predicted, actualWithIds)) =&
val actual = actualWithIds.map(_._2)
(predicted.toArray, actual.toArray)
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
// Mean Average Precision = 0.20426
计算推荐2000个商品时的准确度为:
val MAPK2000 = allRecs.join(userProducts).map{ case (userId, (predicted, actualWithIds)) =&
val actual = actualWithIds.map(_._2).toSeq
avgPrecisionK(actual, predicted, 2000)
}.reduce(_ + _) / allRecs.count
println("Mean Average Precision = " + MAPK2000)
//Mean Average Precision = 0.069083
保存和加载推荐模型
对与实时推荐,我们需要启动一个web server,在启动的时候生成或加载训练模型,然后提供API接口返回推荐接口,需要调用的相关方法为:
save(model: MatrixFactorizationModel, path: String)
load(sc: SparkContext, path: String)
model中的userFeatures和productFeatures也可以保存起来:
val outputDir="/tmp"
model.userFeatures.map{ case (id, vec) =& id + "\t" + vec.mkString(",") }.saveAsTextFile(outputDir + "/userFeatures")
model.productFeatures.map{ case (id, vec) =& id + "\t" + vec.mkString(",") }.saveAsTextFile(outputDir + "/productFeatures")
本文主要记录如何使用ALS算法实现协同过滤并给用户推荐商品,以上代码在仓库中的ScalaLocalALS.scala文件。
如果你想更加深入了解Spark MLlib算法的使用,可以看看这本电子书并下载书中的源码,本文大部分代码参考自该电子书。
喜欢该文的人也喜欢}

我要回帖

更多关于 协同过滤推荐算法 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信