如何使用Spark ALSpython实现协同过滤滤

当前位置:
【实战篇】如何在spark分布式矩阵实现协同过滤推荐?
【实战篇】如何在spark分布式矩阵实现协同过滤推荐?
<dd data-toggle='tooltip' data-placement='top' data-original-title='作者: 黄波 &&'>作者: 黄波
<dd data-toggle='tooltip' data-placement='top' data-original-title='添加时间:
14:24:00 &&'>
<span data-toggle='tooltip' data-placement='top' data-original-title=' 阅读:4017'> 4017
上文,介绍了协同过滤中的矩阵分解算法、spark大数据处理框架以及spark分布式矩阵的实现以及相关的运算,请戳蓝色加粗链接:《》。本文将重介绍最受欢迎的协同过滤算法,以及会在文中实战在spark分布式矩阵的基础上模拟协同过滤推荐。
【推荐系统简介】
个性化推荐是根据用户的兴趣特点和购买行为,向用户推荐用户感兴趣的信息和商品。个性化推荐系统是建立在海量数据挖掘基础上的一种高级商业智能平台,以帮助为顾客提供完全个性化的决策支持和信息服务。个性化推荐已经发展出了很多成熟的算法,下面列出一些常用的算法。
▲协同过滤推荐算法
协同过滤推荐是推荐系统中应用最早和最为成功的技术之一。它一般采用最近邻技术,利用用户的历史喜好信息计算用户之间的距离,然后利用目标用户的最近邻用户对商品评价的加权价值来预测目标用户对特定商品的喜好程度,系统从而根据这一喜好程度来对目标用户推荐。协同过滤推荐技术分为3种类型,包括基于用户的推荐、基于项目的推荐和基于模型的推荐。
▲基于关联规则的推荐
基于关联规则的推荐是以关联规则为基础,把已购商品作为规则头,规则体为推荐对象。关联规则就是在一个交易数据库中挖掘购买了商品集X的交易中有多大比例的交易同时购买了商品集合Y。
▲基于效用的推荐
基于效用的推荐是建立在对用户使用项目的效用情况上计算的,其核心问题是怎样为每一个用户去创建一个效用函数。因此,用户资料规模很大程度上是由系统所采用的效用函数决定的。
▲基于知识的推荐
基于知识的推荐在某种程度是可以看成是一种推理技术。效用知识是一种关于一个项目如何满足某一特定用户的知识,因此能解释需要和推荐的关系,所以用户资料可以是任何能支持推理的知识结构,它可以是用户已经规范化的查询,也可以是一个更详细的用户需要的表示。
▲组合推荐
由于各种推荐方法都有优缺点,所以在实际中组合推荐经常被采用。内容推荐和协同过滤推荐的组合是当前工业界和学术界的一个热点。
【协同过滤推荐系统】
协同过滤推荐算法是推荐系统中应用最广泛的推荐算法之一。协同过滤的本质,就是通过预测用户-物品矩阵中缺失的评分,来预测用户对物品的偏好。更加具体地,协同过滤算法主要分为Memery-based CF和Model-based CF,而Memory-based CF包括User-based CF和Item-based CF。
一.基于用户的协同过滤算法User-based CF
基于用户的协同过滤算法是根据相似用户的偏好信息产生对目标用户的推荐。它基于这样一个假设:如果一些用户对某一类项目的打分比较接近,则他们对其他的项目的打分也比较接近。协同过滤推荐系统采用统计计算方式搜索目标用户的相似用户,并根据相似用户对项目的打分来预测目标用户的对指定项目的评分,最后选择相似度较高的前若干个相似用户的评分作为推荐结果,并反馈给用户。
这种算法不仅计算简单且精确度较高,被现有的协同过滤推荐系统广泛采用。User-based协同过滤推荐算法的核心就是通过相似性度量方法计算出最近邻居集合,并将最近邻的评分结果作为推荐预测结果返回给用户。
例如,在下表所示的用户-项目评分矩阵中,行代表用户,列代表电影,表中的数值代表用户对某个电影的评分。现在需要预测用户小李对电影《鬼吹灯》的评分。
由上表可以看出小庄和小李对电影的评分比价接近,小庄对《生化危机》《复仇者联盟》《金刚狼》的评分分别为3、4、4,小李的评分分别为3、5、4,他们之间的相似度最高,因此小庄和小李是最接近的邻居,他们两个看电影的口味相似,因此小庄对《鬼吹灯》的评分结果对预测值的影响占据最大比例。
在真实的预测中,推荐系统只对前若干个邻居进行搜索,并根据这些邻居的评分为目标用户预测指定项目的评分。
二.基于项目的协同过滤算法Item-based CF
基于项目的协同过滤推荐是根据用户对相似项目的评分数据来预测目标项目的评分。它是建立在如下假设基础上的:如果大部分用户对某些项目的打分比较相近,则当前用户对这些项目的打分也会比较接近。Item-based协同过滤算法主要对目标用户所评价的一组项目进行研究,并计算这些项目与目标之间的相似性,然后从选择前k个相似度最大的项目输出,这是与User-based协同过滤的区别所在。
把之前的用户-项目评分矩阵作为例子,还是预测用户小李对电影《鬼吹灯》的评分。通过数据分析发现,电影《生化危机》的评分与《鬼吹灯》的评分非常相似,前三个用户对《生化危机》的评分分别为4、3、2,对《鬼吹灯》的评分分别为4、3、3,它们二者相似度最高,因此电影《生化危机》是电影《鬼吹灯》的最佳邻居,因此《生化危机》对《鬼吹灯》的评分预测值的影响占据最大比例。在真实的预测中,推荐系统只对前若干个邻居进行搜索,并根据这些邻居的评分为目标用户预测指定项目的评分。
Item-based协同过滤推荐算法的主要工作内容是最近邻居查询和产生推荐。因此Item-based协同过滤推荐算法可以分为最近邻查询和产生推荐两个阶段。最近邻查询阶段是要计算项目与项目之间的相似性,搜索目标项目的最近邻居;产生推荐阶段是根据用户对目标项目的最近邻居的评分信息预测目标项目的评分,最后产生前N个推荐信息。
三.基于模型的协同过滤推荐Model-based CF
现实中的用户-项目矩阵极大,而用户的兴趣和消费能力有限,对单个用户来说消费的物品,产生评分记录的物品是极少的。这样造成了用户-项目矩阵含有大量的空值,数据极为稀疏。假设用户的兴趣只受少数几个因素的影响,因此稀疏且高维的用户-项目评分矩阵可以分解为两个低维矩阵,分别表示用户的特征向量和项目的特征向量。
用户的特征向量代表了用户的兴趣,物品的特征向量代表了物品的特点,且每一个维度相互对应,两个向量的内积表示用户对该物品的喜好程度。矩阵分解是Model-based协同过滤推荐中最为关键的技术之一,就是通过用户特征矩阵U和物品特征矩阵V来重构低维矩阵预测用户对物品的评分。常用的协同过滤矩阵分解算法包括奇异值分解、正则化矩阵分解、带偏置的矩阵分解。
【基于Spark分布式矩阵构建协同过滤推荐算法】
一.实现User-based协同过滤的示例
Spark的data/mllib/als/test.data文件提供了用于协同过滤推荐测试的评分数据,文件的每一行都是一项评分,其格式为:[用户ID],[项目ID],[评分]。基于该测试数据,展示如何使用Spark实现Item-based协同过滤。
1.将评分数据读取到坐标矩阵CoordinateMatrix中。
//读取文件
//格式化数据
//创建坐标矩阵
2.将CoordinateMatrix转换为RowMatrix计算两两用户的相似度。
由于RowMatrix只能就是那列的相似度,而用户数量是有行表示,因此CoordinateMatrix需要先计算转置:
//计算转置矩阵并转为行矩阵
//求相似度
3.假设需要预测用户1对项目1的评分,那么预测结果就是用户1的平均评分加上其他用户对项目1评分的按相似度的加权平均。
//计算用户1的平均评分
//计算其他用户对项目1的加权平均分
//求和输出预测结果
二.实现Item-based协同过滤的示例
类似于User-based的协同过滤算法,下面给出Spark在测试数据集上实现Item-based协同过滤的示例:
//读取评分数据
//格式化数据
//创建坐标矩阵
//计算Item相似度
//计算项目1的平均评分
//计算用户1对其他项目的加权平均分
//求和输出预测结果
三.实现Model-based协同过滤的示例
奇异值分解是Model-based协同过滤最简单的一种方法。利用Spark中的矩阵分解奇异值分解操作,可以很容易的实现基于奇异值分解的协同过滤算法,示例如下:
1.首先将评分数据读取到CoordinateMatrix中:
//读取文件数据
//格式化数据
//创建坐标矩阵
2.将CoordinateMatrix转换为RowMatrix,并调用computeSVD计算评分矩阵的秩为2的奇异值分解:
3.假设需要预测用户1对项目1的评分:
4.输出预测结果:
全文首先简单介绍了矩阵和spark分布式矩阵的实现和相关算法,利用案例实战了Spark分布式矩阵的基础上模拟实现协同过滤推荐。其实Spark的MLlib库中已经提供了一个基于ALS交替最小二乘法实现的协同过滤矩阵分解推荐算法,它通过观察到的所有用户给产品打分,来推断每个用户的喜好并向用户推荐合适的产品。其模型可以理解为用户评分矩阵是由用户特征矩阵乘以物品特征矩阵得到,MLlib提供的ALS模型采用一种高效的矩阵分解计算,在海量数据中计算性能非常好,如果使用协同推荐算法,建议直接使用MLlib提供的ALS协同推荐算法。
恒生技术之眼Spark MLlib之协同过滤实例:
[java] view plain copyimport java.util.L
import org.apache.spark.SparkC
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkC
import org.apache.spark.api.java.function.F
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationM
import org.apache.spark.mllib.recommendation.R
import scala.Tuple2;
public class SparkMLlibColbFilter {
public static void main(String[/] args) {
SparkConf conf = new SparkConf().setAppName("Java Collaborative Filtering Example");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load and parse the data
String path = "file:///data/hadoop/spark-2.0.0-bin-hadoop2.7/data/mllib/als/test.data";
JavaRDD&String& data = sc.textFile(path);
JavaRDD&Rating& ratings = data.map(new Function&String, Rating&() {
public Rating call(String s) throws Exception {
String[] sarray = s.split(",");
return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), Double.parseDouble(sarray[2]));
// Build the recommendation model using ALS
int rank = 10;
int numIterations = 10;
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);
JavaRDD&Tuple2&Object, Object&& userProducts
ratings.map(new Function&Rating, Tuple2&Object, Object&&() {
public Tuple2&Object, Object& call(Rating r) throws Exception {
return new Tuple2&Object, Object&(r.user(
), r.product());
JavaPairRDD&Tuple2&Integer, Integer&, Double&
predictions = JavaPairRDD.fromJavaRDD(
model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
new Function&Rating, Tuple2&Tuple2&Integer, Integer&, Double&&() {
public Tuple2&Tuple2&Integer, Integer&, Double& call(
Rating r) throws Exception {
return new Tuple2&&(new Tuple2&&(r.user(), r.product()), r.rating());
JavaRDD&Tuple2&Double, Double&& ratesAndPreds = JavaPairRDD.fromJavaRDD(ratings.map(
new Function&Rating, Tuple2&Tuple2&Integer, Integer&, Double&&() {
public Tuple2&Tuple2&Integer, Integer&, Double& call(
Rating r) throws Exception {
return new Tuple2&&(new Tuple2&&(r.user(), r.product()), r.rating());
})).join(predictions).values();
double MSE =
JavaDoubleRDD.fromRDD(ratesAndPreds.map(new Function&Tuple2&Double, Double&, Object&() {
public Object call(Tuple2&Double, Double& pair) throws Exception {
Math.pow((pair._1()
pair._2()),2);
}).rdd()).mean();
System.out.println("Mean Squared Error = " + MSE);
// Save and load model
model.save(sc.sc(), "target/tmp/myCollaborativeFilter");
MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(sc.sc(),
"target/tmp/myCollaborativeFilter");
//为每个用户进行推荐,推荐的结果可以以用户id为key,结果为value存入redis或者hbase中
List&String& users = data.map(new Function&String, String&() {
public String call(String s) throws Exception {
String[] sarray = s.split(",");
return sarray[0];
}).distinct().collect();
for (String user : users) {
Rating[] rs
= model.recommendProducts(Integer.parseInt(user), numIterations);
String value = "";
int key = 0;
for (Rating r : rs) {
key = r.user(/);
value = value + r.product(/) + ":" + r.rating() + "," ;
System.out.println(key + "
" + value);
协同过滤ALS算法推荐过程如下:
加载数据到 ratings RDD,每行记录包括:user, product, rate从 ratings 得到用户商品的数据集:(user, product)使用ALS对 ratings 进行训练通过 model 对用户商品进行预测评分:((user, product), rate)从 ratings 得到用户商品的实际评分:((user, product), rate)合并预测评分和实际评分的两个数据集,并求均方差
阅读(...) 评论()        &
  协同过滤(Collaborative Filtering,简称CF,WIKI上的定义是:简单来说是利用某个兴趣相投、拥有共同经验之群体的喜好来推荐感兴趣的资讯给使用者,个人透过合作的机制给予资讯相当程度的回应(如评分)并记录下来以达到过滤的目的,进而帮助别人筛选资讯,回应不一定局限于特别感兴趣的,特别不感兴趣资讯的纪录也相当重要。
  协同过滤常被应用于推荐系统。这些技术旨在补充用户&商品关联矩阵中所缺失的部分。
  MLlib&当前支持基于模型的协同过滤,其中用户和商品通过一小组隐性因子进行表达,并且这些因子也用于预测缺失的元素。MLLib&使用交替最小二乘法(ALS) 来学习这些隐性因子。
  用户对物品或者信息的偏好,根据应用本身的不同,可能包括用户对物品的评分、用户查看物品的记录、用户的购买记录等。其实这些用户的偏好信息可以分为两类:
显式的用户反馈:这类是用户在网站上自然浏览或者使用网站以外,显式地提供反馈信息,例如用户对物品的评分或者对物品的评论。
隐式的用户反馈:这类是用户在使用网站是产生的数据,隐式地反映了用户对物品的喜好,例如用户购买了某物品,用户查看了某物品的信息,等等。
  显式的用户反馈能准确地反映用户对物品的真实喜好,但需要用户付出额外的代价;而隐式的用户行为,通过一些分析和处理,也能反映用户的喜好,只是数据不是很精确,有些行为的分析存在较大的噪音。但只要选择正确的行为特征,隐式的用户反馈也能得到很好的效果,只是行为特征的选择可能在不同的应用中有很大的不同,例如在电子商务的网站上,购买行为其实就是一个能很好表现用户喜好的隐式反馈。
  推荐引擎根据不同的推荐机制可能用到数据源中的一部分,然后根据这些数据,分析出一定的规则或者直接对用户对其他物品的喜好进行预测计算。这样推荐引擎可以在用户进入时给他推荐他可能感兴趣的物品。
  MLlib目前支持基于协同过滤的模型,在这个模型里,用户和产品被一组可以用来预测缺失项目的潜在因子来描述。特别是我们实现交替最小二乘(ALS)算法来学习这些潜在的因子,在&MLlib&中的实现有如下参数:
numBlocks是用于并行化计算的分块个数(设置为-1时 为自动配置);
rank是模型中隐性因子的个数;
iterations是迭代的次数;
lambda是ALS&的正则化参数;
implicitPrefs决定了是用显性反馈ALS&的版本还是用隐性反馈数据集的版本;
alpha是一个针对于隐性反馈&ALS&版本的参数,这个参数决定了偏好行为强度的基准。
        &
  在本实例中将使用协同过滤算法对GroupLens Research(http://grouplens.org/datasets/movielens/)提供的数据进行分析,该数据为一组从20世纪90年末到21世纪初由MovieLens用户提供的电影评分数据,这些数据中包括电影评分、电影元数据(风格类型和年代)以及关于用户的人口统计学数据(年龄、邮编、性别和职业等)。根据不同需求该组织提供了不同大小的样本数据,不同样本信息中包含三种数据:评分、用户信息和电影信息。
  对这些数据分析进行如下步骤:
  1.&装载如下两种数据:
    a)装载样本评分数据,其中最后一列时间戳除10的余数作为key,Rating为值;
    b)装载电影目录对照表(电影ID-&电影标题)
  2.将样本评分表以key值切分成3个部分,分别用于训练&(60%,并加入用户评分),&校验&(20%), and&测试&(20%)
  3.训练不同参数下的模型,并再校验集中验证,获取最佳参数下的模型
  4.用最佳模型预测测试集的评分,计算和实际评分之间的均方根误差
  5.根据用户评分的数据,推荐前十部最感兴趣的电影(注意要剔除用户已经评分的电影)
测试数据说明
  在MovieLens提供的电影评分数据分为三个表:评分、用户信息和电影信息,在该系列提供的附属数据提供大概6000位读者和100万个评分数据,具体位置为/data/class8/movielens/data目录下,对三个表数据说明可以参考该目录下README文档。
  1.评分数据说明(ratings.data)
  该评分数据总共四个字段,格式为UserID::MovieID::Rating::Timestamp,分为为用户编号::电影编号::评分::评分时间戳,其中各个字段说明如下:
用户编号范围1~6040
电影编号1~3952
电影评分为五星评分,范围0~5
评分时间戳单位秒
每个用户至少有20个电影评分
使用的ratings.dat的数据样本如下所示:
1::8300760
1::661::3::
1::914::3::
1::8300275
1::8824291
1::8302268
1::8302039
1::8300719
  2.用户信息(users.dat)
  用户信息五个字段,格式为UserID::Gender::Age::Occupation::Zip-code,分为为用户编号::性别::年龄::职业::邮编,其中各个字段说明如下:
用户编号范围1~6040
性别,其中M为男性,F为女性
不同的数字代表不同的年龄范围,如:25代表25~34岁范围
职业信息,在测试数据中提供了21中职业分类
使用的users.dat的数据样本如下所示:
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
4::M::45::7::02460
5::M::25::20::55455
6::F::50::9::55117
7::M::35::1::06810
8::M::25::12::11413
3.电影信息(movies.dat)
  电影数据分为三个字段,格式为MovieID::Title::Genres,分为为电影编号::电影名::电影类别,其中各个字段说明如下:
电影编号1~3952
由IMDB提供电影名称,其中包括电影上映年份
电影分类,这里使用实际分类名非编号,如:Action、Crime等
使用的movies.dat的数据样本如下所示:
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
  程序代码
import java.io.File
import scala.io.Source
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}
object MovieLensALS {
def main(args: Array[String]) {
// 屏蔽不必要的日志显示在终端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
if (args.length != 2) {
println("Usage: /path/to/spark/bin/spark-submit --driver-memory 2g --class week7.MovieLensALS " +
"week7.jar movieLensHomeDir personalRatingsFile")
sys.exit(1)
// 设置运行环境
val conf = new SparkConf().setAppName("MovieLensALS").setMaster("local[4]")
val sc = new SparkContext(conf)
// 装载用户评分,该评分由评分器生成
val myRatings = loadRatings(args(1))
val myRatingsRDD = sc.parallelize(myRatings, 1)
// 样本数据目录
val movieLensHomeDir = args(0)
// 装载样本评分数据,其中最后一列Timestamp取除10的余数作为key,Rating为值,即(Int,Rating)
val ratings = sc.textFile(new File(movieLensHomeDir, "ratings.dat").toString).map { line =&
val fields = line.split("::")
(fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))
// 装载电影目录对照表(电影ID-&电影标题)
val movies = sc.textFile(new File(movieLensHomeDir, "movies.dat").toString).map { line =&
val fields = line.split("::")
(fields(0).toInt, fields(1))
}.collect().toMap
val numRatings = ratings.count()
val numUsers = ratings.map(_._2.user).distinct().count()
val numMovies = ratings.map(_._2.product).distinct().count()
println("Got " + numRatings + " ratings from " + numUsers + " users on " + numMovies + " movies.")
// 将样本评分表以key值切分成3个部分,分别用于训练 (60%,并加入用户评分), 校验 (20%), and 测试 (20%)
// 该数据在计算过程中要多次应用到,所以cache到内存
val numPartitions = 4
val training = ratings.filter(x =& x._1 & 6)
.union(myRatingsRDD) //注意ratings是(Int,Rating),取value即可
.repartition(numPartitions)
val validation = ratings.filter(x =& x._1 &= 6 && x._1 & 8)
.repartition(numPartitions)
val test = ratings.filter(x =& x._1 &= 8).values.cache()
val numTraining = training.count()
val numValidation = validation.count()
val numTest = test.count()
println("Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest)
// 训练不同参数下的模型,并在校验集中验证,获取最佳参数下的模型
val ranks = List(8, 12)
val lambdas = List(0.1, 10.0)
val numIters = List(10, 20)
var bestModel: Option[MatrixFactorizationModel] = None
var bestValidationRmse = Double.MaxValue
var bestRank = 0
var bestLambda = -1.0
var bestNumIter = -1
for (rank &- lambda &- numIter &- numIters) {
val model = ALS.train(training, rank, numIter, lambda)
val validationRmse = computeRmse(model, validation, numValidation)
println("RMSE (validation) = " + validationRmse + " for the model trained with rank = "
+ rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")
if (validationRmse & bestValidationRmse) {
bestModel = Some(model)
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lambda
bestNumIter = numIter
// 用最佳模型预测测试集的评分,并计算和实际评分之间的均方根误差
val testRmse = computeRmse(bestModel.get, test, numTest)
println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda
+ ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".")
// create a naive baseline and compare it with the best model
val meanRating = training.union(validation).map(_.rating).mean
val baselineRmse =
math.sqrt(test.map(x =& (meanRating - x.rating) * (meanRating - x.rating)).mean)
val improvement = (baselineRmse - testRmse) / baselineRmse * 100
println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")
// 推荐前十部最感兴趣的电影,注意要剔除用户已经评分的电影
val myRatedMovieIds = myRatings.map(_.product).toSet
val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)
val recommendations = bestModel.get
.predict(candidates.map((0, _)))
.collect()
.sortBy(-_.rating)
println("Movies recommended for you:")
recommendations.foreach { r =&
println("%2d".format(i) + ": " + movies(r.product))
/** 校验集预测数据和实际数据之间的均方根误差 **/
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = {
val predictions: RDD[Rating] = model.predict(data.map(x =& (x.user, x.product)))
val predictionsAndRatings = predictions.map(x =& ((x.user, x.product), x.rating))
.join(data.map(x =& ((x.user, x.product), x.rating)))
math.sqrt(predictionsAndRatings.map(x =& (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n)
/** 装载用户评分文件 **/
def loadRatings(path: String): Seq[Rating] = {
val lines = Source.fromFile(path).getLines()
val ratings = lines.map { line =&
val fields = line.split("::")
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
}.filter(_.rating & 0.0)
if (ratings.isEmpty) {
sys.error("No ratings provided.")
ratings.toSeq
      
IDEA执行情况
  第一步&&&使用如下命令启动Spark集群
$cd /app/hadoop/spark-1.1.0
$sbin/start-all.sh
  第二步&&&进行用户评分,生成用户样本数据
  由于该程序中最终推荐给用户十部电影,这需要用户提供对样本电影数据的评分,然后根据生成的最佳模型获取当前用户推荐电影。用户可以使用/home/hadoop/upload/class8/movielens/bin/rateMovies程序进行评分,最终生成personalRatings.txt文件:
第三步&&&在IDEA中设置运行环境
在IDEA运行配置中设置MovieLensALS运行配置,需要设置输入数据所在文件夹和用户的评分文件路径:
输入数据所在目录:输入数据文件目录,在该目录中包含了评分信息、用户信息和电影信息,这里设置为/home/hadoop/upload/class8/movielens/data/
&用户的评分文件路径:前一步骤中用户对十部电影评分结果文件路径,在这里设置为/home/hadoop/upload/class8/movielens/personalRatings.txt
第四步&&&执行并观察输出
输出Got 1000209 ratings from 6040 users on 3706 movies,表示本算法中计算数据包括大概100万评分数据、6000多用户和3706部电影;
输出Training: 602252, validation: 198919, test: 199049,表示对评分数据进行拆分为训练数据、校验数据和测试数据,大致占比为6:2:2;
在计算过程中选择8种不同模型对数据进行训练,然后从中选择最佳模型,其中最佳模型比基准模型提供22.30%
RMSE (validation) = 0.9973 for the model trained with rank = 8, lambda = 0.1, and numIter = 10.
RMSE (validation) = 0.595 for the model trained with rank = 8, lambda = 0.1, and numIter = 20.
RMSE (validation) = 3.2833 for the model trained with rank = 8, lambda = 10.0, and numIter = 10.
RMSE (validation) = 3.2833 for the model trained with rank = 8, lambda = 10.0, and numIter = 20.
RMSE (validation) = 0.1964 for the model trained with rank = 12, lambda = 0.1, and numIter = 10.
RMSE (validation) = 0.5418 for the model trained with rank = 12, lambda = 0.1, and numIter = 20.
RMSE (validation) = 3.2833 for the model trained with rank = 12, lambda = 10.0, and numIter = 10.
RMSE (validation) = 3.2833 for the model trained with rank = 12, lambda = 10.0, and numIter = 20.
The best model was trained with rank = 12 and lambda = 0.1, and numIter = 10, and its RMSE on the test set is 0.0565.
The best model improves the baseline by 22.30%.
利用前面获取的最佳模型,结合用户提供的样本数据,最终推荐给用户如下影片:
Movies recommended for you:
&1: Bewegte Mann, Der (1994)
&2: Chushingura (1962)
&3: Love Serenade (1996)
&4: For All Mankind (1989)
&5: Vie est belle, La (Life is Rosey) (1987)
&6: Bandits (1997)
&7: King of Masks, The (Bian Lian) (1996)
&8: I'm the One That I Want (2000)
&9: Big Trees, The (1952)
10: First Love, Last Rites (1997)
        
阅读(...) 评论()}

我要回帖

更多关于 python实现协同过滤 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信