博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
利用Mrjob实现Weighted Slope One算法
阅读量:5132 次
发布时间:2019-06-13

本文共 6908 字,大约阅读时间需要 23 分钟。

前面的文章介绍了Weight Slope One算法,这个算法通常用于评分的预测,这种预测通常基于大数据,这篇文章将要讲述的就是hadoop下实现Weight Slpope One算法。Mrjob是python中的一个hadoop框架,为什么用python呢?因为使用python开发起来比较快,而且hadoop程序多是I/O密集型,所以用python比用java慢不了多少。用Mrjob编写和调试hadoop程序是非常简单和直观的,但是它内部一个序列化过程对性能产生的损害,因此现在用mrjob,还是感觉慢了些,希望以后它能修正这个问题。这里有一篇文章()对python的hadoop框架做了个挺不错总结。关于mrjob怎么用的问题,这篇文章()给出了介绍。总之mrjob就是入门快,开发快,调试快,运行慢。

步入正题。通过前面的文章我们知道weight slope one算法假设项目之间的评分符合y=x+d模型。实现weigthedslopeone算法需要多个map-reduce过程,mrjob已经实现了这种chain map-reduce过程,上一次的map-reduce的输入直接变成下一次mapreduce的输出。下面分步介绍。

输入文件:

两个,均为为文本文件,第一个是训练样本,格式为:userId,movieId,score

第二个文件为需要预测的数据,格式为: userId,movieId. 需要预测userId,对movieId的评分。

为了说明方便,这里给出一个数据作为例子,数据如下:

trainsmall.txt

A   1   2

A   2   3
A   3   1
B   1   3
B   2   5
B   3   3
C   1   4
C   3   2
D   1   3
D   2   4

predictsmall.txt

C   2

D   3

第一步:产生user-item数据

输入: 训练数据,预测数据

输出:

key: userId

value: 元组---(movieId,score),如果没有评分,那么score为-1

def mapper1(self,_,line):        item=line.split()        if len(item)>2:            yield item[0],(item[1],item[2])        elif(len(item)==2):            yield item[0],(item[1],-1)        def reducer1(self,key,value):        for v in value:            yield key,v

小样例这一步的输出:

"A" ["1", "2"]

"A" ["2", "3"]
"A" ["3", "1"]
"B" ["1", "3"]
"B" ["2", "5"]
"B" ["3", "3"]
"C" ["1", "4"]
"C" ["2", -1]
"C" ["3", "2"]
"D" ["1", "3"]
"D" ["2", "4"]
"D" ["3", -1]

 

第二步:产生产生两个电影的评分差

需要注意的是,只需要这两个电影必须来自同一个人,那么它们的评分差才是有意义的,因此上一步以userId作为key。

这一步不需要map过程。

输入:上一步的输出

输出: 

key: (movieA,movieB)    注意,movieA要大于movieB,否则会产生重复,浪费资源,并且他们来自同一个人。

value: 如果movieA和movieB都已经评分,那么输出(userId,分数差,-1)

其中-1只是一个标志,为了区分另一种输出。

如果movieA,movieB中有一个是需要预测的(注意,最多有一个是需要预测的,可以看下面的代码),输出 (userId,对已经预测的那个电影的评分,1或者0)

需要说明的是,为了节约空间和计算量,规定将movieA和movieB中较大的放在前面,这种顺序对于第一种输出没有影响,但是对于第二种输出必须做出区分,因此1表示第二个是需要预测的,0表示第一个是需要预测的。

def reducer2(self,key,value):        rateM={}        preM=[]        for movieid,rate in value:            if rate==-1.0:                preM.append(movieid)            else:                rateM[movieid]=rate                        for i,itemi in enumerate(rateM.iterkeys()):            for j,itemj in enumerate(rateM.iterkeys()):                if j<=i :continue                if itemi>itemj:                    yield (itemi,itemj),(key,float(rateM[itemi])-float(rateM[itemj]),-1)                else:                    yield (itemj,itemi),(key,float(rateM[itemj])-float(rateM[itemi]),-1)                        for rm,rate in rateM.iteritems():            for pm in preM:                if rm>pm:                    yield (rm,pm),(key,rate,1)   # the second is the one to be predicted                else:                    yield (pm,rm),(key,rate,0)   # the first is the one to be predicted

样例的输出:

["3", "1"] ["A", -1.0, -1]

["2", "1"] ["A", 1.0, -1]
["3", "2"] ["A", -2.0, -1]
["3", "1"] ["B", 0.0, -1]
["2", "1"] ["B", 2.0, -1]
["3", "2"] ["B", -2.0, -1]
["3", "1"] ["C", -2.0, -1]
["2", "1"] ["C", "4", 0]
["3", "2"] ["C", "2", 1]
["2", "1"] ["D", 1.0, -1]
["3", "1"] ["D", "3", 0]
["3", "2"] ["D", "4", 0]

第三步:得到预测分数和预测频率

不需要map过程

输入:上一步的输出

输出:

key:(userId,movieId)    表示userId需要预测的项目movieId

value: (rate,fres)   表示userId对movieId的评分的一个预测

def reducer3(self,key,value):        total=0        fre=0        needPredict=[]        for userId,diff,flag in value:            if flag!=-1:                #yield key,(userId,diff,flag)                needPredict.append((userId,float(diff),flag))            else:                total+=float(diff)                fre+=1                        if fre>0 and len(needPredict)>0:            avg=total/fre            for userId,rate,flag in needPredict:                if float(flag)==0:                    predictRate=avg+rate                    mp=key[0]                else:                    predictRate=rate-avg                    mp=key[1]                yield (userId,mp),(predictRate,fre)

样例输出:

["C", "2"] [5.333333333333333, 3]

["D", "3"] [2.0, 3]
["C", "2"] [4.0, 2]
["D", "3"] [2.0, 2]

第四步:得到预测分数

不需要map过程

输入:上一次的输出

输出:最终的预测

def reducer4(self,key,value):        totalFre=0        totalRate=0        for predictRate,fre in value:            totalRate+=predictRate*fre            totalFre+=fre        yield key,totalRate/totalFre

样例输出:

["C", "2"] 4.7999999999999998

["D", "3"] 2.0

完整代码:

from mrjob.job import MRJob class PredictBySlopeOne(MRJob):    # first step, generate user-item table, input is the train.txt and the predict predict.txt    def mapper1(self,_,line):        item=line.split()        if len(item)>2:            yield item[0],(item[1],item[2])        elif(len(item)==2):            yield item[0],(item[1],-1)            def reducer1(self,key,value):        for v in value:            yield key,v                  def reducer2(self,key,value):        rateM={}        preM=[]        for movieid,rate in value:            if rate==-1.0:                preM.append(movieid)            else:                rateM[movieid]=rate                        for i,itemi in enumerate(rateM.iterkeys()):            for j,itemj in enumerate(rateM.iterkeys()):                if j<=i :continue                if itemi>itemj:                    yield (itemi,itemj),(key,float(rateM[itemi])-float(rateM[itemj]),-1)                else:                    yield (itemj,itemi),(key,float(rateM[itemj])-float(rateM[itemi]),-1)                        for rm,rate in rateM.iteritems():            for pm in preM:                if rm>pm:                    yield (rm,pm),(key,rate,1)   # the second is the one to be predicted                else:                    yield (pm,rm),(key,rate,0)   # the first is the one to be predicted                            def reducer3(self,key,value):        total=0        fre=0        needPredict=[]        for userId,diff,flag in value:            if flag!=-1:                #yield key,(userId,diff,flag)                needPredict.append((userId,float(diff),flag))            else:                total+=float(diff)                fre+=1                        if fre>0 and len(needPredict)>0:            avg=total/fre            for userId,rate,flag in needPredict:                if float(flag)==0:                    predictRate=avg+rate                    mp=key[0]                else:                    predictRate=rate-avg                    mp=key[1]                yield (userId,mp),(predictRate,fre)                            def reducer4(self,key,value):        totalFre=0        totalRate=0        for predictRate,fre in value:            totalRate+=predictRate*fre            totalFre+=fre        yield key,totalRate/totalFre                                          def steps(self):        return [self.mr(mapper=self.mapper1,combiner=self.reducer1,reducer=self.reducer1),                self.mr(reducer=self.reducer2),                self.mr(reducer=self.reducer3),                self.mr(reducer=self.reducer4)]if __name__=="__main__":    PredictBySlopeOne.run()

 

 

 

 

 

转载于:https://www.cnblogs.com/naniJser/archive/2013/05/02/3054832.html

你可能感兴趣的文章
SDN第四次作业
查看>>
django迁移数据库错误
查看>>
Data truncation: Out of range value for column 'Quality' at row 1
查看>>
字符串处理
查看>>
HtmlUnitDriver 网页内容动态抓取
查看>>
ad logon hour
查看>>
罗马数字与阿拉伯数字转换
查看>>
Eclipse 反编译之 JadClipse
查看>>
距离公式汇总以及Python实现
查看>>
Linux内核态、用户态简介与IntelCPU特权级别--Ring0-3
查看>>
第23月第24天 git命令 .git-credentials git rm --cached git stash clear
查看>>
java SE :标准输入/输出
查看>>
[ JAVA编程 ] double类型计算精度丢失问题及解决方法
查看>>
好玩的-记最近玩的几个经典ipad ios游戏
查看>>
PyQt5--EventSender
查看>>
Sql Server 中由数字转换为指定长度的字符串
查看>>
tmux的简单快捷键
查看>>
[Swift]LeetCode922.按奇偶排序数组 II | Sort Array By Parity II
查看>>
VC6.0调试技巧(一)(转)
查看>>
php match_model的简单使用
查看>>