大数据sparkmllib推荐算法(代码片段)

赵广陆 赵广陆     2023-01-19     389

关键词:


1 相似度算法

无论是基于用户还是基于商品的推荐,都是需要找到相似的用户或者商品,才能做推荐,所以,相似度算法就变得非常重要了。

常见的相似度算法有:

  • 欧几里德距离算法(Euclidean Distance)
  • 皮尔逊相似度算法(Pearson Correlation Coefficient)
  • 基于夹角余弦相似度算法(Consine Similarity)
  • 基于Tanimoto系数相似度(Tanimoto Coefficient)

1.1 欧几里德距离算法

上图即二维空间中6位用户对Snakes 和 Dupree 这两Item评价的直观体现。

根据两用户之间共同评价的Item为维度,建立一个多维的空间,那么通过用户对单一维度上的评价Score组成的坐标系X(s1,s2,s3……,si)即可定位该用户在这个多维度空间中的位置,那么任意两个位置之间的距离Distance(X,Y)(即:欧式距离)就能在一定程度上反应了两用户兴趣的相似程度。

就其意义而言,欧氏距离越小,两个用户相似度就越大,欧氏距离越大,两个用户相似度就越小。

1.2 基于夹角余弦相似度算法

计算夹角,并得出夹角对应的余弦值,此余弦值就可以用来表征,这两个向量的相似性。夹角越小,余弦值越接近于1,它们的方向更加吻合,则越相似。

计算公式:

2 最近邻域

通过相似度计算,可以计算出邻居,问题来了,我们如果选取出几个邻居作为参考,进行推荐呢?

通常有2种方式:

  • 固定数量的邻居:K-neighborhoods
  • 基于相似度门槛的邻居:Threshold-based neighborhoods

3 交替最小二乘法

交替最小二乘法(ALS)是统计分析中最常用的逼近计算的一种算法,其交替计算结果使得最终结果尽可能地逼近真实结果。而ALS的基础是最小二乘法(LS算法),LS算法是一种常用的机器学习算法,它通过最小化误差的平方和寻找数据的最佳函数匹配。利用最小二乘法可以简便的求得未知的数据,并使得这些求得的数据与实际数据之间误差的平法和为最小。也就是评分未知,稀疏矩阵预测评分.由于ALS算法的目标函数不是凸的,而且变量互相耦合在一起,所以它并不容易求解。但如果把用户特征矩阵U和物品特征矩阵V固定其一,其目标函数就立刻变成了一个凸的而且是可拆分的。

3.1 最小二乘法

以一个变量为例,在二维空间中最小二乘法的原理图如下:

若干个点依次分布在向量空间中,如果希望找出一条直线和这些点达到最佳匹配,那么最简单的一个方法就是希望这些点到直线的距离最小,则可得出最小二乘法的公式如下:

2是开方

这里的f(x)是直接的拟合公式,也是所求的目标函数,在这里希望各个点到直接的值最小,因此第二个公式就是求所有点到该直接的距离,我们可以微分求得其最小值。

3.2 交替最小二乘法

以用户,商品为例说明,一个基于用户名,物品表的用户评分矩阵可以被分解成2个较为小型化的矩阵,即M=U的转置矩阵*V。

在这里U和V分别表示 用户和物品的矩阵,在MLlib的ALS算法中,首先会对U或者V矩阵随机生成,之后固定某一个特定的对象,去求取另一个未随机化的矩阵对象。

之后利用求取的矩阵对象去求随机化矩阵对象。最后两个对象相互迭代计算,求取与实际矩阵差异达到程序设定的最小阀值位置。

通俗的说就是先固定U矩阵,求取V,然后再固定V矩阵再求取U矩阵,一直这样交替迭代计算直到误差达到一定的阀值条件或者达到迭代次数的上限。例如,固定U求V,这个问题就是经典的最小二乘问题。所谓交替,就是指先随机生成U(0),然后固定它,去 求 解V ( 0 ) ;再 固 定V ( 0 ) ,然 后 求 解U ( 1 ) ,这 样 交 替 进 行 下 去 。 因 为 每一次迭代都会降低重构误差,并且误差是有下界的,所以ALS算法一定会收敛。但由于目标函数是非凸的,所以ALS算法并不保证会收敛到全局最优解。然而在实际应用中,ALS算法对初始
点不是很敏感,且是不是全局最优解也不会有大的影响。

3.3 ALS算法流程

通常,产品的用户评分矩阵是庞大且稀疏的,因此在非常稀疏的数据集上采用简单的用户(或物品)相似度比较进行推荐,直观上给人的感觉是这样做缺少依据。理论上分析一下我们也能理解,基于记忆的协同过滤推荐实际上并没有充分挖掘数据集中的潜在因素。本节介绍的交替最小二乘法(AlternatingLeastSquares,ALS)算法,其核心思想就是要进一步挖掘通过观察得到的所有用户给产品的评分,并通过引入用户特征矩阵(UserFeaturesMatrix)和物品特征矩阵(ItemFeaturesMatrix)来建立一个机器学习模型,然后利用采集的数据对这个模型进行训练(反复迭代),最后得到用于推荐计算的用户特征矩阵和物品特征矩阵,从而来推断(也就是预测)每个用户的喜好并向用户推荐适合的物品。

ALS算法解决了用户评分矩阵中的缺失因子问题,实现了用预测得到的缺失因子进行推荐。

ALS 算法的思路就不同了。 ALS 算法基于下面这个假设:评分矩阵是近似低秩 (Low・Rank) 矩阵;换句话说,评分矩阵 A(mxn) 可以用两个小矩阵 U(mxk) 和 V(nxk) 来近似表示,即:

式中, k 远小于 m 和 n, 这样就把整个系统的自由度从 O(mn) 降到了 O((m+n)k) 。其实,
ALS 算法的低秩假设是建立在客观存在的合理性基础上的。例如,用户特征有很多,如年龄、性别、职业、身高、学历、婚姻、地区、存款等,可以说不胜枚举,但我们没有必要把用户的所有特征都用起来,因为并不是所有特征都起同样的作用。例如,在后面展示的电影推荐示例中,用户特征矩阵仅仅包含了用户编号、性别、年龄、职业、邮编5 个字段。同样,物品的属性也有很多,以电影为例,可以有主演、导演、特效、剧情、类型等,但实际应用中我们只需要描述少数关键属性即可,因此我们仅仅考虑了三个属性,即电影编号、电影名和电影类别(当然这只是示例,到底 k 取什么值,可以采用系统自适应调节方法,通过应用逐步找到最佳的 k 值)。

总之, ALS 算法的巧妙之处就在于,引入了两个特征矩阵,一个是用户特征矩阵,用 U表示,另一个是物品特征矩阵,用 V 表示,这两个矩阵的秩都比较低。接下来的问题是怎样得到这两个抽象的低秩序阵。既然已经假设评分矩阵 A 可以通过UVT 来近似,那么一个最直接的可以量化的东西就是通过 U 和 V 重构 A 时产生的误差。在ALS 算法中,使用式给出的 Frobenius 范数(又称为 Euclid 范数)

来表示重构误差,也就是每个元素的重构误差的平方和,如式所示。

这里存在一个问题,由于只观察到部分评分,A中有大量的未知元素是需要推断的,所以这个重构误差包含了未知数。解决方案很简单,就是只计算对已知评分的重构误差。

当然,也可以先用一个简单的方法把评分矩阵填满,再进行重构误差计算,但是这样做似乎也没有太多道理。总之,ALS算法就是求解下面的优化问题:

经过上面的处理,一个协同推荐问题就通过低秩假设被成功地转换成了一个优化问题。
但是,这个优化问题怎么解呢?不要忘记,我们的目标是求出U和V这两个矩阵。

ALS 算法可以大体描述如下。

第一步,用小于 1 的数随机初始化 V 。
第二步,在训练数据集上反复迭代、交替计算 U 和 V, 直到 RMSE (均方根误差,一种常用的离散性度量方法)值收敛或迭代次数足够多。
第三步,返回 UVT, 进行预测推荐。之所以说上述算法是一个大体描述,是因为第二步中还包含了如何计算 U 和 V 的表达式,它们是通过求偏导推出的。

3.4 ALS算法实战

3.4.1 数据说明

ALS 算法对 GroupLens Research ( http://grouplens.org/datasets/movielens/) 提供的数据进行学习并推荐。该数据为一组从 20 世纪 90 年代末到 21 世纪初由 MovieLens 用户提供的电影评价数据,包括评分、电影元数据(如风格类型和年代),以及关于用户的人口统计学数据(如年龄、邮编、性别和职业等)。根据不同需求, GroupLens Research 提供了不同大小的样本数据,包含了评分、用户信息和电影信息三种数据。下面先来看看待处理的电影评价数据,再给出应用程序并进行分析。

示例数据是用户、电影、评分的数据,其中用户有943人,电影有1682部,评价有100000条。文件在资料中。

196	242	3	881250949
186	302	3	891717742
22	377	1	878887116
244	51	2	880606923
166	346	1	886397596
298	474	4	884182806
115	265	2	881171488
253	465	5	891628467
305	451	3	886324817
6	86	3	883603013
62	257	2	879372434
286	1014	5	879781125
200	222	5	876042340
210	40	3	891035994
................

3.4.2 数据建模

ALS算法的第二步就是数据建模,其实在MLlib算法库中有可以直接使用的训练算法,ALS.tran方法源码如下:

 def train(
      ratings: RDD[Rating],       //需要训练的数据集
      rank: Int,                  //模型中隐藏因子数,rank一般选在8到20之间
      iterations: Int,            //算法中迭代次数,一般10次即可
      lambda: Double,             //ALS中的正则化参数,一般设置0.01
      blocks: Int,                //并行计算的block数(-1为自动配置)
      alpha: Double,              //ALS隐式反馈变化率用于控制每次拟合修正的幅度
      seed: Long                  //加载矩阵的随机数
    ): MatrixFactorizationModel = 
    new ALS(blocks, blocks, rank, iterations, lambda, true, alpha, seed).run(ratings)
  

3.4.3 实战

package cn.oldlu.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;

public class MyRecommend 

    public static void main(String[] args) 
        SparkConf sparkConf = new SparkConf()
                .setAppName("MyRecommend")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        jsc.setLogLevel("WARN"); //设置日志级别

        JavaRDD<String> rawData = jsc.textFile("F://ml-100k//u.data");//设置数据集文件
        JavaRDD<String[]> rawRatings = rawData.map(v1 -> v1.split("\\t"));//将数据按照\\t分割
        //转化为Rating结构,参数分别为:用户id,商品id,评分
        JavaRDD<Rating> ratings = rawRatings.map(v1 -> new Rating(Integer.valueOf(v1[0]), Integer.valueOf(v1[1]), Double.valueOf(v1[2])));

        //设置训练模型
        MatrixFactorizationModel model = ALS.train(ratings.rdd(), 8, 10, 0.01);

        //为789用户推荐10个商品
        Rating[] recommendProducts = model.recommendProducts(789, 10);

        //打印推荐结果
        for (Rating rating : recommendProducts) 
            System.out.println(rating.user() + "->" + rating.product()+": " + rating.rating());
        
    


3.4.4 优化改进

在上面的实战中,rank、iterations、lambda参数都是写死的,根据不同环境和数据集需要作出调整,所以我们需要计算中最佳的参数,才能得到最佳的训练集。

package cn.oldlu.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import scala.Tuple2;

import java.util.List;

public class MyRecommend2 

    public static void main(String[] args) 
        SparkConf sparkConf = new SparkConf()
                .setAppName("MyRecommend")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        jsc.setLogLevel("WARN"); //设置日志级别

        JavaRDD<String> rawData = jsc.textFile("F://ml-100k//u.data");//设置数据集文件
        JavaRDD<String[]> rawRatings = rawData.map(v1 -> v1.split("\\t"));//将数据按照\\t分割
        //装载样本评分数据,其中最后一列Timestamp取除10的余数作为key,Rating为值,即(Int,Rating)
        JavaPairRDD<Long, Rating> ratings = rawRatings.mapToPair(v1 -> 
            Rating rating = new Rating(Integer.valueOf(v1[0]), Integer.valueOf(v1[1]), Double.valueOf(v1[2]));
            return new Tuple2<>(Long.valueOf(v1[3]) % 10, rating);
        );

        //装载电影目录对照表(电影ID->电影标题)
        List<Tuple2> movies = jsc.textFile("F://ml-100k//u.item").map(v1 -> 
            String[] ss = v1.split("\\\\|");
            return new Tuple2(ss[0], ss[1]);
        ).collect();

        //统计有用户数量和电影数量以及用户对电影的评分数目
        Long numRatings = ratings.count();
        Long numUsers = ratings.map(v1 -> ((Rating) v1._2()).user()).distinct().count();
        Long numMovies = ratings.map(v1 -> ((Rating) v1._2()).product()).distinct().count();
        System.out.println("用户:" + numUsers + "电影:" + numMovies + "评论:" + numRatings);

        //将样本评分表以key值切分成3个部分,分别用于训练 (60%,并加入用户评分), 校验 (20%), and 测试 (20%)
        //该数据在计算过程中要多次应用到,所以cache到内存

        Integer numPartitions = 4; // 分区数
        // 训练集
        JavaRDD<Rating> training = ratings
                .filter(v -> v._1() < 6)
                .values()
                .repartition(numPartitions)
                .cache();

        // 校验集
        JavaRDD<Rating> validation = ratings
                .filter(v -> v._1() >= 6 && v._1() < 8)
                .values()
                .repartition(numPartitions).cache();

        // 测试集
        JavaRDD<Rating> test = ratings
                .filter(v -> v._1() >= 8)
                .values()
                .cache();

        Long numTraining = training.count();
        Long numValidation = validation.count();
        Long numTest = test.count();
        System.out.println("训练集:" + numTraining + " 校验集:" + numValidation + " 测试集:" + numTest);

        //训练不同参数下的模型,并在校验集中验证,获取最佳参数下的模
        int[] ranks = new int[]10, 11, 12;
//        double[] lambdas = new double[]0.01, 0.03, 0.1, 0.3, 1, 3;
        double[] lambdas = new double[]0.01;
//        int[] numIters = new int[]8, 9, 10, 11, 12, 13, 14, 15;
        int[] numIters = new int[]8, 9, 10;

        MatrixFactorizationModel bestModel = null;
        double bestValidationRmse = Double.MAX_VALUE;
        int bestRank = 0;
        double bestLambda = -0.01;
        int bestNumIter = 0;

        for (int rank : ranks) 
            for (int numIter : numIters) 
                for (double lambda : lambdas) 
                    MatrixFactorizationModel model = ALS.train(training.rdd(), rank, numIter, lambda);
                    Double validationRmse = computeRmse(model, validation, numValidation);
                    System.out.println("RMSE(校验集) = " + validationRmse + ", rank = " + rank + ", lambda = " + lambda + ", numIter = " + numIter);

                    if (validationRmse < bestValidationRmse) 
                        bestModel = model;
                        bestValidationRmse = validationRmse;
                        bestRank = rank;
                        bestLambda = lambda;
                        bestNumIter = numIter;
                    
                
            
        



        double testRmse = computeRmse(bestModel, test, numTest);
        System.out.println("测试数据集在 最佳训练模型 rank = " + bestRank + ", lambda = " + bestLambda + ", numIter = " + bestNumIter + ", RMSE = " + testRmse);

        // 计算均值
        Double meanRating = training.union(validation).mapToDouble(v -> v.rating()).mean();

        // 计算标准误差值
        Double baselineRmse = Math.sqrt(test.map(v -> (meanRating - v.rating()) * (meanRating - v.rating())).reduce((v1, v2) -> (v1 + v2) / numTest));

        // 计算准确率提升了多少
        double improvement = (baselineRmse - testRmse) / baselineRmse * 100;

        System.out.println("最佳训练模型的准确率提升了:" + String.format("%.2f", improvement) + "%.");


        // 构建最佳训练模型
        bestModel = ALS.

大数据sparkmllib推荐系统

目录1从广告说起推荐系统2什么是推荐系统?3电商是推荐系统的先行者4推荐系统业务流程5推荐系统所涉及到的知识6协同过滤算法6.1基于用户的推荐UserCF6.2基于商品的推荐ItemCF6.3如何选择?7用户偏好收集7.1数据的降噪和... 查看详情

大数据sparkmllib机器学习(代码片段)

目录1什么是SparkMLlib?2支持的数据类型2.1本地向量集2.1.1、密集型数据集2.1.2稀疏型数据集2.2向量标签2.3本地矩阵2.4分布式矩阵2.4.1行矩阵2.4.2行索引矩阵2.4.3坐标矩阵2.4.4分块矩阵3RDD、DataSet、Dataframe区别及转化1什么是SparkMLlib... 查看详情

十sparkmllib的scala示例(代码片段)

简介spark MLlib官网:http://spark.apache.org/docs/latest/ml-guide.htmlmllib是spark core之上的算法库,包含了丰富的机器学习的一系列算法。你可以通过简单的API来构建算法模型,然后利用模型来进行预测分析推荐之类的。它包含了一... 查看详情

14.sparkmllib之快速入门(代码片段)

简介??MLlib是Spark提供提供机器学习的库,专为在集群上并行运行的情况而设计。MLlib包含很多机器学习算法,可在Spark支持的所有编程语言中使用。??MLlib设计理念是将数据以RDD的形式表示,然后在分布式数据集上调用各种算法。... 查看详情

sparkmllib数据类型(代码片段)

  MLlib支持几种数据类型:本地向量(localvectors),和存储在本地或者基于RDD的分布式矩阵(matrices)。底层的线性代数转换操作是基于Breeze和jblas实现的。在MLlib中有监督学习算法使用的训练样本数据类型被称为“带标签的... 查看详情

基于sparkmllib平台的协同过滤算法---电影推荐系统

协同过滤算法概述 基于模型的协同过滤应用---电影推荐实时推荐架构分析          一、协同过滤算法概述      本人对算法的研究,目前还不是很深入,这里简单的介... 查看详情

sparkmllib---sgd随机梯度下降算法(代码片段)

代码:packagemllibimportorg.apache.log4j.Level,Loggerimportorg.apache.spark.SparkContext,SparkConfimportscala.collection.mutable.HashMap/***随机梯度下降算法*Createdby汪本成on2016/8/5.*/objectSGD//屏蔽不必要的日志显示在 查看详情

sparkmllib之水塘抽样算法(reservoirsampling)(代码片段)

1.理解  问题定义可以简化如下:在不知道文件总行数的情况下,如何从文件中随机的抽取一行?  首先想到的是我们做过类似的题目吗?当然,在知道文件行数的情况下,我们可以很容易的用C运行库的rand函数随机的获得一... 查看详情

机器学习讲座,如何利用sparkmllib进行个性推荐?

随着互联网发展,更多电商网站更加提倡用户参与和用户贡献。而在现今的推荐技术和算法中,最被大家广泛认可和采用的就是基于协同过滤的推荐方法。这种在信息过滤和信息系统中很受欢迎的技术,与传统的基于内容过滤直... 查看详情

sparkmllib算法调用展示平台及其实现过程(代码片段)

1.软件版本:IDE:IntellijIDEA14,Java:1.7,Scala:2.10.6;Tomcat:7,CDH:5.8.0; Spark:1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0; 查看详情

java语言在spark3.2.4集群中使用sparkmllib库完成xgboost算法(代码片段)

一、概述XGBoost是一种基于决策树的集成学习算法,它在处理结构化数据方面表现优异。相比其他算法,XGBoost能够处理大量特征和样本,并且支持通过正则化控制模型的复杂度。XGBoost也可以自动进行特征选择并对缺失值进行处理... 查看详情

大数据推荐系统算法代码全接触(企业内训,现场实录,机器学习算法+spark实现)

【学途无忧网】大数据推荐系统算法代码全接触(企业内训,现场实录,机器学习算法+Spark实现)课程下载:https://pan.baidu.com/s/1piCNIxC2Sv0zMY0yWxY9Ug提取码:b10v一、课程简介:推荐系统是利用电子商务网站向客户提供商品信息和建... 查看详情

大数据推荐系统算法代码全接触(企业内训,现场实录,机器学习算法+spark实现)

【学途无忧网】大数据推荐系统算法代码全接触(企业内训,现场实录,机器学习算法+Spark实现)课程下载:https://pan.baidu.com/s/1piCNIxC2Sv0zMY0yWxY9Ug提取码:b10v一、课程简介:推荐系统是利用电子商务网站向客户提供商品信息和建... 查看详情

sparkmllib---linearregression(线性回归)logisticregression(逻辑回归)(代码片段)

1、随机梯度下降首先介绍一下随机梯度下降算法:1.1、代码一:packagemllibimportorg.apache.log4j.Level,Loggerimportorg.apache.spark.SparkContext,SparkConfimportscala.collection.mutable.HashMap/***随机梯度下降算法*Createdby汪本成 查看详情

spark学习10_1sparkmllib入门与相关资料索引(代码片段)

...文指南关于spark机器学习的知乎专栏Spark入门实战系列--8.SparkMLlib(上)--机器学习及SparkMLlib简介基本Kmeans算法介绍及其实现sparkMLlib概念1:相关系数(PPMCCorPCCorPearson‘sr皮尔森相关系数)andSpearman‘scorrelation(史匹曼等级相关系... 查看详情

如何利用sparkmllib进行个性推荐?

在现今的推荐技术和算法中,最被大家广泛认可和采用的就是基于协同过滤的推荐方法。协同过滤(CollaborativeFiltering,简称CF)是利用集体智慧的一个典型方法。换句话说,就是借鉴和你相关人群的观点来进行推荐。MLlib中的协同过... 查看详情

学习笔记spark——sparkmllib应用——sparkmllib应用(代码片段)

三、SparkMLlib应用3.1、SparkML线性模型数据准备基于SparkML的线性模型需要DataFrame类型的模型数据,DataFrame需要包含:一列标签列,一列由多个特征合并得到的特征列训练模型模型应用模型评估任务1:某专门面向年轻人制... 查看详情

学习笔记spark——sparkmllib应用——sparkmllib应用(代码片段)

三、SparkMLlib应用3.1、SparkML线性模型数据准备基于SparkML的线性模型需要DataFrame类型的模型数据,DataFrame需要包含:一列标签列,一列由多个特征合并得到的特征列训练模型模型应用模型评估任务1:某专门面向年轻人制... 查看详情