如何使用Spark ALS实现协同过滤推荐算法

242被浏览21,913分享邀请回答11021 条评论分享收藏感谢收起Spark机器学习库mllib之协同过滤
很久就想写一篇ML的实践文章,虽然看过肯多资料,总觉得纸上谈兵印象不深刻,过不了多久就忘了,现在就借Spark的Mllib来简单的实际一下推荐算法吧。
说起推荐算法,大家耳熟能详的就是CF(协同过滤),这次就拿CF中ALS(alternating least squares),交替最小二乘,来做个例子吧。
CF里面的算法比较多,有基于物品的,基于用户的,ALS是基于矩阵分解的,关于对推荐算法的小结,请参考我的
先介绍下mllib,mllib是运行在Spark上一个机器学习算法库。借助Spark的内存计算,可以使机器学习的模型计算时间大大缩短。
目前,spark1.0.0中的mllib中已经有很多算法了,具体可以参见官方网站
我们知道,协同过滤是基于用户行为的一种推荐算法,需要用户对Item的评价。
于是乎我们还是找到最经典的数据集movielens,地址
Down下来ml-100k,解压后有很多文件,可以看README里面对数据集的介绍。
user id | item id | rating | timestamp
这里有user对某个movie的评分rating和时间timstamp
先预处理一下数据cat u1.base | awk -F "\t" '{print $1"::"$2"::"$3"::"$4}' & ratings.dat
cat u.item | awk -F "|" '{print $1"\t"$2"\t"$3}' & movies.dat
数据结果:
user id::movie id::rating:: timestamp
2::258::3::
2::269::4::
2::272::5::
2::273::4::
2::274::3::
movie id ::movie id :: movie release date
1::Toy Story (1995)::01-Jan-1995
2::GoldenEye (1995)::01-Jan-1995
3::Four Rooms (1995)::01-Jan-1995
4::Get Shorty (1995)::01-Jan-1995
5::Copycat (1995)::01-Jan-1995
6::Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)::01-Jan-1995
7::Twelve Monkeys (1995)::01-Jan-1995
8::Babe (1995)::01-Jan-1995
9::Dead Man Walking (1995)::01-Jan-1995
10::Richard III (1995)::22-Jan-1996
11::Seven (Se7en) (1995)::01-Jan-1995
12::Usual Suspects, The (1995)::14-Aug-1995
OK,下面我们要用官方的ALS算法例子来运行下这个推荐。
首先导入mllib包,我们需要用到ALS算法类和Rating评分类
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating//加载数据
val data = sc.textFile("/app/hadoop/ml-100k/ratings.dat")
//data中每条数据经过map的split后会是一个数组,模式匹配后,会new一个Rating对象
val ratings = data.map(_.split("::") match { case Array(user, item, rate, ts) =&
Rating(user.toInt, item.toInt, rate.toDouble)
最终会new成对象
scala& ratings take 2
14/06/25 17:51:06 INFO scheduler.DAGScheduler: Computing the requested partition locally
14/06/25 17:51:06 INFO rdd.HadoopRDD: Input split: file:/app/hadoop/ml-100k/ratings.dat:0+1826544
14/06/25 17:51:07 INFO spark.SparkContext: Job finished: take at &console&:22, took 0. s
res0: Array[org.apache.spark.mllib.recommendation.Rating] = Array(Rating(1,1,5.0), Rating(1,2,3.0))//设置潜在因子个数为10
scala& val rank = 10
rank: Int = 10
//要迭代计算30次
scala& val numIterations = 30
numIterations: Int = 30
接下来调用ALS.train()方法,进行模型训练:
val model = ALS.train(ratings, rank, numIterations, 0.01)
14/06/25 17:53:04 INFO storage.MemoryStore: ensureFreeSpace(200) called with curMem=84002, maxMem=
14/06/25 17:53:04 INFO storage.MemoryStore: Block broadcast_60 stored as values to memory (estimated size 200.0 B, free 294.3 MB)
model: org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@17596ee0
训练完后,我们要对比一下预测的结果,我们那训练集当作测试集,来进行对比测试:
scala& val usersProducts = ratings.map { case Rating(user, product, rate) =&
(user, product)
usersProducts: org.apache.spark.rdd.RDD[(Int, Int)] = MappedRDD[623] at map at &console&:21//预测后的用户,电影,评分
scala& val predictions =
model.predict(usersProducts).map { case Rating(user, product, rate) =&
((user, product), rate)
predictions: org.apache.spark.rdd.RDD[((Int, Int), Double)] = MappedRDD[632] at map at &console&:30
我们用均方根误差来评价一个模型的好坏,所以我们要算一下MSE,来判定这个模型的准确率,其值越小说明越准确。
join一下,然后再计算:
//原始{(用户,电影),评分} join
预测后的{(用户,电影),评分}
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =&
((user, product), rate)
}.join(predictions)
ratesAndPreds.collect take 3
14/06/25 17:59:35 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 632.0, whose tasks have all completed, from pool
14/06/25 17:59:35 INFO scheduler.DAGScheduler: Stage 632 (collect at &console&:34) finished in 1.906 s
14/06/25 17:59:35 INFO spark.SparkContext: Job finished: collect at &console&:34, took 1. s
res11: Array[((Int, Int), (Double, Double))] = Array(((933,627),(2.0,1.9198)), ((537,24),(1.0,2.8327)), ((717,125),(4.0,3.737)))
join后的结果,就是每个用户对电影的实际打分和预测打分的一个对比,例如:
(用户,电影),(原始评分,预测的评分)
(933,627),(2.0,1.9198)
(537,24),(1.0, 2.8327)
(717,125),(4.0,3.737)
最后计算均方根误差:
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =&
val err = (r1 - r2)
}.mean()14/06/25 18:02:28 INFO scheduler.TaskSetManager: Finished TID 79 in 554 ms on localhost (progress: 1/1)
14/06/25 18:02:28 INFO scheduler.DAGScheduler: Stage 702 (mean at &console&:36) finished in 0.556 s
14/06/25 18:02:28 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 702.0, whose tasks have all completed, from pool
14/06/25 18:02:28 INFO spark.SparkContext: Job finished: mean at &console&:36, took 0. s
MSE: Double = 0.2655
顺便提一下预测的API有三个重载,上面用的是第二个:
调用model的API
scala& model.predict
def predict(user: Int, product: Int): Double
def predict(usersProducts: org.apache.spark.rdd.RDD[(Int, Int)]): org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating]
def predict(usersProductsJRDD: org.apache.spark.api.java.JavaRDD[Array[Byte]]): org.apache.spark.api.java.JavaRDD[Array[Byte]]
我们也可以传入user id, product id 来与此 某个用户 对某个 电影 的评分
model.predict(1,2)
14/06/25 18:17:36 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 963.0, whose tasks have all completed, from pool
14/06/25 18:17:36 INFO scheduler.DAGScheduler: Stage 963 (lookup at MatrixFactorizationModel.scala:46) finished in 0.035 s
14/06/25 18:17:36 INFO spark.SparkContext: Job finished: lookup at MatrixFactorizationModel.scala:46, took 0. s
res13: Double = 2.7363
模型都有了,推荐系统怎么设计就根据实际需求了 :)
MLlib充分利用了Spark的快速内存计算,迭代效率高的优势,将机器学习的模型计算性能提到另一片天地,这也就是为什么最近Spark备受推崇,那么火的原因。
目前Mllib的算法库还不是很多,但是Mahout都宣布不接受Mapreduce算法库,都迁移到spark上来了,看来未来机器学习要靠Spark了。至于为什么对于协同过滤先支持的是ALS,也是看中的ALS算法的并行度比较好,在Spark上更能发挥该算法的优势吧。
——EOF——
看过本文的人也看了:
我要留言技术领域:
取消收藏确定要取消收藏吗?
删除图谱提示你保存在该图谱下的知识内容也会被删除,建议你先将内容移到其他图谱中。你确定要删除知识图谱及其内容吗?
删除节点提示无法删除该知识节点,因该节点下仍保存有相关知识内容!
删除节点提示你确定要删除该知识节点吗?Mahout使用了Taste来提高协同过滤算法的实现,它是一个基于Java实现的可扩展的,高效的推荐引擎。Taste既实现了最基本的基 于用户的和基于内容的推荐算法,同时也提供了扩展接口,使用户可以方便的定义和实现自己的推荐算法。同时,Taste不仅仅只适用于Java应用程序,它 可以作为内部服务器的一个组件以HTTP和Web Service的形式向外界提供推荐的逻辑。Taste的设计使它能满足企业对推荐引擎在性能、灵活性和可扩展性等方面的要求。
接口相关介绍
Taste主要包括以下几个接口:
DataModel 是用户喜好信息的抽象接口,它的具体实现支持从任意类型的数据源抽取用户喜好信息。Taste 默认提供 JDBCDataModel 和 FileDataModel,分别支持从数据库和文件中读取用户的喜好信息。
UserSimilarity 和
ItemSimilarity 。UserSimilarity 用于定义两个用户间的相似度,它是基于协同过滤的推荐引擎的核心部分,可以用来计算用户的“邻居”,这里我们将与当前用户口味相似的用户称为他的邻居。ItemSimilarity 类似的,计算内容之间的相似度。
UserNeighborhood 用于基于用户相似度的推荐方法中,推荐的内容是基于找到与当前用户喜好相似的邻居用户的方式产生的。UserNeighborhood 定义了确定邻居用户的方法,具体实现一般是基于 UserSimilarity 计算得到的。
Recommender 是推荐引擎的抽象接口,Taste 中的核心组件。程序中,为它提供一个 DataModel,它可以计算出对不同用户的推荐内容。实际应用中,主要使用它的实现类 GenericUserBasedRecommender 或者 GenericItemBasedRecommender,分别实现基于用户相似度的推荐引擎或者基于内容的推荐引擎。
RecommenderEvaluator :评分器。
RecommenderIRStatsEvaluator :搜集推荐性能相关的指标,包括准确率、召回率等等。
目前,Mahout为DataModel提供了以下几种实现:
org.apache.mahout.cf.taste.impl.model.GenericDataModel
org.apache.mahout.cf.taste.impl.model.GenericBooleanPrefDataModel
org.apache.mahout.cf.taste.impl.model.PlusAnonymousUserDataModel
org.apache.mahout.cf.taste.impl.model.file.FileDataModel
org.apache.mahout.cf.taste.impl.model.hbase.HBaseDataModel
org.apache.mahout.cf.taste.impl.model.cassandra.CassandraDataModel
org.apache.mahout.cf.taste.impl.model.mongodb.MongoDBDataModel
org.apache.mahout.cf.taste.impl.model.jdbc.SQL92JDBCDataModel
org.apache.mahout.cf.taste.impl.model.jdbc.MySQLJDBCDataModel
org.apache.mahout.cf.taste.impl.model.jdbc.PostgreSQLJDBCDataModel
org.apache.mahout.cf.taste.impl.model.jdbc.GenericJDBCDataModel
org.apache.mahout.cf.taste.impl.model.jdbc.SQL92BooleanPrefJDBCDataModel
org.apache.mahout.cf.taste.impl.model.jdbc.MySQLBooleanPrefJDBCDataModel
org.apache.mahout.cf.taste.impl.model.jdbc.PostgreBooleanPrefSQLJDBCDataModel
org.apache.mahout.cf.taste.impl.model.jdbc.ReloadFromJDBCDataModel
从类名上就可以大概猜出来每个DataModel的用途,奇怪的是竟然没有HDFS的DataModel,有人实现了一个,请参考
UserSimilarity 和
ItemSimilarity 相似度实现有以下几种:
CityBlockSimilarity :基于Manhattan距离相似度
EuclideanDistanceSimilarity :基于欧几里德距离计算相似度
LogLikelihoodSimilarity :基于对数似然比的相似度
PearsonCorrelationSimilarity :基于皮尔逊相关系数计算相似度
SpearmanCorrelationSimilarity :基于皮尔斯曼相关系数相似度
TanimotoCoefficientSimilarity :基于谷本系数计算相似度
UncenteredCosineSimilarity :计算 Cosine 相似度
以上相似度的说明,请参考Mahout推荐引擎介绍。
UserNeighborhood 主要实现有两种:
NearestNUserNeighborhood:对每个用户取固定数量N个最近邻居
ThresholdUserNeighborhood:对每个用户基于一定的限制,取落在相似度限制以内的所有用户为邻居
Recommender分为以下几种实现:
GenericUserBasedRecommender:基于用户的推荐引擎
GenericBooleanPrefUserBasedRecommender:基于用户的无偏好值推荐引擎
GenericItemBasedRecommender:基于物品的推荐引擎
GenericBooleanPrefItemBasedRecommender:基于物品的无偏好值推荐引擎
RecommenderEvaluator有以下几种实现:
AverageAbsoluteDifferenceRecommenderEvaluator :计算平均差值
RMSRecommenderEvaluator :计算均方根差
RecommenderIRStatsEvaluator的实现类是GenericRecommenderIRStatsEvaluator。
首先,需要在maven中加入对mahout的依赖:
&dependency&
&groupId&org.apache.mahout&/groupId&
&artifactId&mahout-core&/artifactId&
&version&0.9&/version&
&/dependency&
&dependency&
&groupId&org.apache.mahout&/groupId&
&artifactId&mahout-integration&/artifactId&
&version&0.9&/version&
&/dependency&
&dependency&
&groupId&org.apache.mahout&/groupId&
&artifactId&mahout-math&/artifactId&
&version&0.9&/version&
&/dependency&
&dependency&
&groupId&org.apache.mahout&/groupId&
&artifactId&mahout-examples&/artifactId&
&version&0.9&/version&
&/dependency&
基于用户的推荐,以FileDataModel为例:
File modelFile modelFile = new File(&intro.csv&);
DataModel model = new FileDataModel(modelFile);
//用户相似度,使用基于皮尔逊相关系数计算相似度
UserSimilarity similarity = new PearsonCorrelationSimilarity(model);
//选择邻居用户,使用NearestNUserNeighborhood实现UserNeighborhood接口,选择邻近的4个用户
UserNeighborhood neighborhood = new NearestNUserNeighborhood(4, similarity, model);
Recommender recommender = new GenericUserBasedRecommender(model, neighborhood, similarity);
//给用户1推荐4个物品
List&RecommendedItem& recommendations = recommender.recommend(1, 4);
for (RecommendedItem recommendation : recommendations) {
System.out.println(recommendation);
FileDataModel要求输入文件中的字段分隔符为逗号或者制表符,如果你想使用其他分隔符,你可以扩展一个FileDataModel的实现,例如,mahout中已经提供了一个解析MoiveLens的数据集(分隔符为
:: )的实现GroupLensDataModel。
对相同用户重复获得推荐结果,我们可以改用CachingRecommender来包装GenericUserBasedRecommender对象,将推荐结果缓存起来:
Recommender cachingRecommender = new CachingRecommender(recommender);
上面代码可以在main方法中直接运行,然后,我们可以获取推荐模型的评分:
//使用平均绝对差值获得评分
RecommenderEvaluator evaluator = new AverageAbsoluteDifferenceRecommenderEvaluator();
// 用RecommenderBuilder构建推荐引擎
RecommenderBuilder recommenderBuilder = new RecommenderBuilder() {
public Recommender buildRecommender(DataModel model) throws TasteException {
UserSimilarity similarity = new PearsonCorrelationSimilarity(model);
UserNeighborhood neighborhood = new NearestNUserNeighborhood(4, similarity, model);
return new GenericUserBasedRecommender(model, neighborhood, similarity);
// Use 70%
test using the other 30%.
double score = evaluator.evaluate(recommenderBuilder, null, model, 0.7, 1.0);
System.out.println(score);
接下来,可以获取推荐结果的查准率和召回率:
RecommenderIRStatsEvaluator statsEvaluator = new GenericRecommenderIRStatsEvaluator();
// Build the same recommender for testing that we did last time:
RecommenderBuilder recommenderBuilder = new RecommenderBuilder() {
public Recommender buildRecommender(DataModel model) throws TasteException {
UserSimilarity similarity = new PearsonCorrelationSimilarity(model);
UserNeighborhood neighborhood = new NearestNUserNeighborhood(4, similarity, model);
return new GenericUserBasedRecommender(model, neighborhood, similarity);
// 计算推荐4个结果时的查准率和召回率
IRStatistics stats = statsEvaluator.evaluate(recommenderBuilder,null, model, null, 4,
GenericRecommenderIRStatsEvaluator.CHOOSE_THRESHOLD,1.0);
System.out.println(stats.getPrecision());
System.out.println(stats.getRecall());
如果是基于物品的推荐,代码大体相似,只是没有了UserNeighborhood,然后将上面代码中的User换成Item即可,完整代码如下:
File modelFile modelFile = new File(&intro.csv&);
DataModel model = new FileDataModel(new File(file));
// Build the same recommender for testing that we did last time:
RecommenderBuilder recommenderBuilder = new RecommenderBuilder() {
public Recommender buildRecommender(DataModel model) throws TasteException {
ItemSimilarity similarity = new PearsonCorrelationSimilarity(model);
return new GenericItemBasedRecommender(model, similarity);
//获取推荐结果
List&RecommendedItem& recommendations = recommenderBuilder.buildRecommender(model).recommend(1, 4);
for (RecommendedItem recommendation : recommendations) {
System.out.println(recommendation);
//计算评分
RecommenderEvaluator evaluator =
new AverageAbsoluteDifferenceRecommenderEvaluator();
// Use 70%
test using the other 30%.
double score = evaluator.evaluate(recommenderBuilder, null, model, 0.7, 1.0);
System.out.println(score);
//计算查全率和查准率
RecommenderIRStatsEvaluator statsEvaluator = new GenericRecommenderIRStatsEvaluator();
// Evaluate precision and recall &at 2&:
IRStatistics stats = statsEvaluator.evaluate(recommenderBuilder,
null, model, null, 4,
GenericRecommenderIRStatsEvaluator.CHOOSE_THRESHOLD,
System.out.println(stats.getPrecision());
System.out.println(stats.getRecall());
在Spark中运行
在Spark中运行,需要将Mahout相关的jar添加到Spark的classpath中,修改/etc/spark/conf/spark-env.sh,添加下面两行代码:
SPARK_DIST_CLASSPATH=&$SPARK_DIST_CLASSPATH:/usr/lib/mahout/lib/*&
SPARK_DIST_CLASSPATH=&$SPARK_DIST_CLASSPATH:/usr/lib/mahout/*&
然后,以本地模式在spark-shell中运行下面代码交互测试:
//注意:这里是本地目录
val model = new FileDataModel(new File(&intro.csv&))
val evaluator = new RMSRecommenderEvaluator()
val recommenderBuilder = new RecommenderBuilder {
override def buildRecommender(dataModel: DataModel): Recommender = {
val similarity = new LogLikelihoodSimilarity(dataModel)
new GenericItemBasedRecommender(dataModel, similarity)
val score = evaluator.evaluate(recommenderBuilder, null, model, 0.95, 0.05)
println(s&Score=$score&)
val recommender=recommenderBuilder.buildRecommender(model)
val users=trainingRatings.map(_.user).distinct().take(20)
import scala.collection.JavaConversions._
val result=users.par.map{user=&
user+&,&+recommender.recommend(user,40).map(_.getItemID).mkString(&,&)
上面有一个评估基于物品或是用户的各种相似度下的评分的类,叫做 RecommenderEvaluator,供大家学习参考。
分布式运行
Mahout提供了
org.apache.mahout.cf.taste.hadoop.item.RecommenderJob 类以MapReduce的方式来实现基于物品的协同过滤,查看该类的使用说明:
$ hadoop jar /usr/lib/mahout/mahout-examples-0.9-cdh5.4.0-job.jar org.apache.mahout.cf.taste.hadoop.item.RecommenderJob
15/06/10 16:19:34 ERROR common.AbstractJob: Missing required option --similarityClassname
Missing required option --similarityClassname
[--input &input& --output &output& --numRecommendations &numRecommendations&
--usersFile &usersFile& --itemsFile &itemsFile& --filterFile &filterFile&
--booleanData &booleanData& --maxPrefsPerUser &maxPrefsPerUser&
--minPrefsPerUser &minPrefsPerUser& --maxSimilaritiesPerItem
&maxSimilaritiesPerItem& --maxPrefsInItemSimilarity &maxPrefsInItemSimilarity&
--similarityClassname &similarityClassname& --threshold &threshold&
--outputPathForSimilarityMatrix &outputPathForSimilarityMatrix& --randomSeed
&randomSeed& --sequencefileOutput --help --tempDir &tempDir& --startPhase
&startPhase& --endPhase &endPhase&]
--similarityClassname (-s) similarityClassname
Name of distributed
similarity measures class to
instantiate, alternatively
use one of the predefined
similarities
([SIMILARITY_COOCCURRENCE,
SIMILARITY_LOGLIKELIHOOD,
SIMILARITY_TANIMOTO_COEFFICIEN
T, SIMILARITY_CITY_BLOCK,
SIMILARITY_COSINE,
SIMILARITY_PEARSON_CORRELATION
SIMILARITY_EUCLIDEAN_DISTANCE]
可见,该类可以接收的命令行参数如下:
--input(path) : 存储用户偏好数据的目录,该目录下可以包含一个或多个存储用户偏好数据的文本文件;
--output(path) : 结算结果的输出目录
--numRecommendations (integer) : 为每个用户推荐的item数量,默认为10
--usersFile (path) : 指定一个包含了一个或多个存储userID的文件路径,仅为该路径下所有文件包含的userID做推荐计算 (该选项可选)
--itemsFile (path) : 指定一个包含了一个或多个存储itemID的文件路径,仅为该路径下所有文件包含的itemID做推荐计算 (该选项可选)
--filterFile (path) : 指定一个路径,该路径下的文件包含了
[userID,itemID] 值对,userID和itemID用逗号分隔。计算结果将不会为user推荐
[userID,itemID] 值对中包含的item (该选项可选)
--booleanData (boolean) : 如果输入数据不包含偏好数值,则将该参数设置为true,默认为false
--maxPrefsPerUser (integer) : 在最后计算推荐结果的阶段,针对每一个user使用的偏好数据的最大数量,默认为10
--minPrefsPerUser (integer) : 在相似度计算中,忽略所有偏好数据量少于该值的用户,默认为1
--maxSimilaritiesPerItem (integer) : 针对每个item的相似度最大值,默认为100
--maxPrefsPerUserInItemSimilarity (integer) : 在item相似度计算阶段,针对每个用户考虑的偏好数据最大数量,默认为1000
--similarityClassname (classname) : 向量相似度计算类
outputPathForSimilarityMatrix :SimilarityMatrix输出目录
--randomSeed :随机种子 --
sequencefileOutput :序列文件输出路径
--tempDir (path) : 存储临时文件的目录,默认为当前用户的home目录下的temp目录
--startPhase
--endPhase
--threshold (double) : 忽略相似度低于该阀值的item对
一个例子如下,使用SIMILARITY_LOGLIKELIHOOD相似度推荐物品:
$ hadoop jar /usr/lib/mahout/mahout-examples-0.9-cdh5.4.0-job.jar org.apache.mahout.cf.taste.hadoop.item.RecommenderJob --input /tmp/mahout/part-00000 --output /tmp/mahout-out
-s SIMILARITY_LOGLIKELIHOOD
上面命令运行完成之后,会在当前用户的hdfs主目录生成temp目录,该目录可由
--tempDir (path) 参数设置:
$ hadoop fs -ls temp
Found 10 items
-rw-r--r--
3 root hadoop
14:42 temp/maxValues.bin
-rw-r--r--
3 root hadoop
5-06-10 14:42 temp/norms.bin
drwxr-xr-x
- root hadoop
14:41 temp/notUsed
-rw-r--r--
3 root hadoop
14:42 temp/numNonZeroEntries.bin
-rw-r--r--
3 root hadoop
5-06-10 14:41 temp/observationsPerColumn.bin
drwxr-xr-x
- root hadoop
14:47 temp/pairwiseSimilarity
drwxr-xr-x
- root hadoop
14:52 temp/partialMultiply
drwxr-xr-x
- root hadoop
14:39 temp/preparePreferenceMatrix
drwxr-xr-x
- root hadoop
14:50 temp/similarityMatrix
drwxr-xr-x
- root hadoop
14:42 temp/weights
观察yarn的管理界面,该命令会生成9个任务,任务名称依次是:
PreparePreferenceMatrixJob-ItemIDIndexMapper-Reducer
PreparePreferenceMatrixJob-ToItemPrefsMapper-Reducer
PreparePreferenceMatrixJob-ToItemVectorsMapper-Reducer
RowSimilarityJob-CountObservationsMapper-Reducer
RowSimilarityJob-VectorNormMapper-Reducer
RowSimilarityJob-CooccurrencesMapper-Reducer
RowSimilarityJob-UnsymmetrifyMapper-Reducer
partialMultiply
RecommenderJob-PartialMultiplyMapper-Reducer
从任务名称,大概可以知道每个任务在做什么,如果你的输入参数不一样,生成的任务数可能不一样,这个需要测试一下才能确认。
在hdfs上查看输出的结果:
843 [.426,.:4.5:4.108,.0467]
[.:3.164,.:3.7:3.3839]
使用Java API方式执行:
StringBuilder sb = new StringBuilder();
sb.append(&--input &).append(inPath);
sb.append(& --output &).append(outPath);
sb.append(& --tempDir &).append(tmpPath);
sb.append(& --booleanData true&);
sb.append(& --similarityClassname
org.apache.mahout.math.hadoop.similarity.
cooccurrence.measures.EuclideanDistanceSimilarity&);
args = sb.toString().split(& &);
JobConf jobConf = new JobConf(conf);
jobConf.setJobName(&MahoutTest&);
RecommenderJob job = new RecommenderJob();
job.setConf(conf);
job.run(args);
在Scala或者Spark中,可以以Java API或者命令方式运行,最后还可以通过Spark来处理推荐的结果,例如:过滤、去重、补足数据,这部分内容不做介绍。
http://www.tuicool.com/articles/FzmQziz
相关 [mahout 协同过滤 spark] 推荐:
Mahout使用了Taste来提高协同过滤算法的实现,它是一个基于Java实现的可扩展的,高效的推荐引擎. Taste既实现了最基本的基 于用户的和基于内容的推荐算法,同时也提供了扩展接口,使用户可以方便的定义和实现自己的推荐算法. 同时,Taste不仅仅只适用于Java应用程序,它 可以作为内部服务器的一个组件以HTTP和Web Service的形式向外界提供推荐的逻辑.
- JavaChen Blog
本文主要通过Spark官方的例子理解ALS协同过滤算法的原理和编码过程,然后通过对电影进行推荐来熟悉一个完整的推荐过程. 协同过滤常被应用于推荐系统,旨在补充用户-商品关联矩阵中所缺失的部分. MLlib当前支持基于模型的协同过滤,其中用户和商品通过一小组隐语义因子进行表达,并且这些因子也用于预测缺失的元素.
Mahout支持2种 M/R 的jobs实现itemBase的协同过滤. 下面我们对RecommenderJob进行分析,版本是mahout-distribution-0.7. 源码包位置:org.apache.mahout.cf.taste.hadoop.item.RecommenderJob. RecommenderJob前几个阶段和ItemSimilarityJob是一样的,不过ItemSimilarityJob 计算出item的相似度矩阵就结束了,而RecommenderJob 会继续使用相似度矩阵,对每个user计算出应该推荐给他的top N 个items.
JavaChen Blog,作者:
Junez. 本文主要记录最近一段时间学习和实现Spark MLlib中的协同过滤的一些总结,希望对大家熟悉Spark ALS算法有所帮助. 【】Spark1.4.0中MatrixFactorizationModel提供了recommendForAll方法实现离线批量推荐,见
SPARK-3066.
又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用.
说到推荐系统,大家可能立马会想到协同过滤算法. 本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用. 基于模型的协同过滤应用---电影推荐.
一、协同过滤算法概述.
随着大数据时代的到来,数据当中挖取金子的工作越来越有吸引力. 利用Spark在内存迭代运算、机器学习领域强悍性能的优势,使用spark处理数据挖掘问题就显得很有实际价值. 这篇文章给大家分享一个spark MLlib 的推荐实战例子. 我将会分享怎样用spark MLlib做一个电影评分的推荐系统.
- CSDN博客推荐文章
今天要讲的主要内容是
协同过滤,即Collaborative Filtering,简称
关于协同过滤的一个最经典的例子就是看电影,有时候不知道哪一部电影是我们喜欢的或者评分比较高的,那.
么通常的做法就是问问周围的朋友,看看最近有什么好的电影推荐. 在问的时候,都习惯于问跟自己口味差不.
- IT技术博客大学习
协同过滤算法是推荐系统中最古老,也是最简单高效的推荐算法. 简单说协同过滤就是根据以往的用户产生的数据分析,对用户的新行为进行匹配分析来给用户推荐用户最有可能感兴趣的内容.
协同过滤算法是为了解决
长尾现象,也就是说推荐系统是为了解决长尾现象而诞生的. 因为在之前在有限的空间(如:书店的书架、服装店的衣架、商店的货架、网页的展示区域)只能摆有限的物品进行展示,造成大量的非热门物品很难进入人们的视野,也就无法产生任何价值.
- 刘思喆@贝吉塔行星
推荐系统在个性化领域有着广泛的应用,从技术上讲涉及概率、抽样、最优化、机器学习、数据挖掘、搜索引擎、自然语言处理等多个领域. 东西太多,我也不准备写连载,今天仅从基本算法这个很小的切入点来聊聊推荐引擎的原理. 推荐引擎(系统)从不同的角度看有不同的划分,比如:. 按照数据的分类:协同过滤、内容过滤、社会化过滤.
--> 坚持分享优质有趣的原创文章,并保留作者信息和版权声明,任何问题请联系:itarea.。}

我要回帖

更多关于 spark 实现协同过滤 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信