前面的文章介绍了Weight Slope One算法,这个算法通常用于评分的预测,这种预测通常基于大数据,这篇文章将要讲述的就是hadoop下实现Weight Slpope One算法。Mrjob是python中的一个hadoop框架,为什么用python呢?因为使用python开发起来比较快,而且hadoop程序多是I/O密集型,所以用python比用java慢不了多少。用Mrjob编写和调试hadoop程序是非常简单和直观的,但是它内部一个序列化过程对性能产生的损害,因此现在用mrjob,还是感觉慢了些,希望以后它能修正这个问题。这里有一篇文章()对python的hadoop框架做了个挺不错总结。关于mrjob怎么用的问题,这篇文章()给出了介绍。总之mrjob就是入门快,开发快,调试快,运行慢。
步入正题。通过前面的文章我们知道weight slope one算法假设项目之间的评分符合y=x+d模型。实现weigthedslopeone算法需要多个map-reduce过程,mrjob已经实现了这种chain map-reduce过程,上一次的map-reduce的输入直接变成下一次mapreduce的输出。下面分步介绍。
输入文件:
两个,均为为文本文件,第一个是训练样本,格式为:userId,movieId,score
第二个文件为需要预测的数据,格式为: userId,movieId. 需要预测userId,对movieId的评分。
为了说明方便,这里给出一个数据作为例子,数据如下:
trainsmall.txt
A 1 2
A 2 3A 3 1B 1 3B 2 5B 3 3C 1 4C 3 2D 1 3D 2 4predictsmall.txt
C 2
D 3第一步:产生user-item数据
输入: 训练数据,预测数据
输出:
key: userId
value: 元组---(movieId,score),如果没有评分,那么score为-1
def mapper1(self,_,line): item=line.split() if len(item)>2: yield item[0],(item[1],item[2]) elif(len(item)==2): yield item[0],(item[1],-1) def reducer1(self,key,value): for v in value: yield key,v
小样例这一步的输出:
"A" ["1", "2"]
"A" ["2", "3"]"A" ["3", "1"]"B" ["1", "3"]"B" ["2", "5"]"B" ["3", "3"]"C" ["1", "4"]"C" ["2", -1]"C" ["3", "2"]"D" ["1", "3"]"D" ["2", "4"]"D" ["3", -1]
第二步:产生产生两个电影的评分差
需要注意的是,只需要这两个电影必须来自同一个人,那么它们的评分差才是有意义的,因此上一步以userId作为key。
这一步不需要map过程。
输入:上一步的输出
输出:
key: (movieA,movieB) 注意,movieA要大于movieB,否则会产生重复,浪费资源,并且他们来自同一个人。
value: 如果movieA和movieB都已经评分,那么输出(userId,分数差,-1)
其中-1只是一个标志,为了区分另一种输出。
如果movieA,movieB中有一个是需要预测的(注意,最多有一个是需要预测的,可以看下面的代码),输出 (userId,对已经预测的那个电影的评分,1或者0)
需要说明的是,为了节约空间和计算量,规定将movieA和movieB中较大的放在前面,这种顺序对于第一种输出没有影响,但是对于第二种输出必须做出区分,因此1表示第二个是需要预测的,0表示第一个是需要预测的。
def reducer2(self,key,value): rateM={} preM=[] for movieid,rate in value: if rate==-1.0: preM.append(movieid) else: rateM[movieid]=rate for i,itemi in enumerate(rateM.iterkeys()): for j,itemj in enumerate(rateM.iterkeys()): if j<=i :continue if itemi>itemj: yield (itemi,itemj),(key,float(rateM[itemi])-float(rateM[itemj]),-1) else: yield (itemj,itemi),(key,float(rateM[itemj])-float(rateM[itemi]),-1) for rm,rate in rateM.iteritems(): for pm in preM: if rm>pm: yield (rm,pm),(key,rate,1) # the second is the one to be predicted else: yield (pm,rm),(key,rate,0) # the first is the one to be predicted
样例的输出:
["3", "1"] ["A", -1.0, -1]
["2", "1"] ["A", 1.0, -1]["3", "2"] ["A", -2.0, -1]["3", "1"] ["B", 0.0, -1]["2", "1"] ["B", 2.0, -1]["3", "2"] ["B", -2.0, -1]["3", "1"] ["C", -2.0, -1]["2", "1"] ["C", "4", 0]["3", "2"] ["C", "2", 1]["2", "1"] ["D", 1.0, -1]["3", "1"] ["D", "3", 0]["3", "2"] ["D", "4", 0]第三步:得到预测分数和预测频率
不需要map过程
输入:上一步的输出
输出:
key:(userId,movieId) 表示userId需要预测的项目movieId
value: (rate,fres) 表示userId对movieId的评分的一个预测
def reducer3(self,key,value): total=0 fre=0 needPredict=[] for userId,diff,flag in value: if flag!=-1: #yield key,(userId,diff,flag) needPredict.append((userId,float(diff),flag)) else: total+=float(diff) fre+=1 if fre>0 and len(needPredict)>0: avg=total/fre for userId,rate,flag in needPredict: if float(flag)==0: predictRate=avg+rate mp=key[0] else: predictRate=rate-avg mp=key[1] yield (userId,mp),(predictRate,fre)
样例输出:
["C", "2"] [5.333333333333333, 3]
["D", "3"] [2.0, 3]["C", "2"] [4.0, 2]["D", "3"] [2.0, 2]第四步:得到预测分数
不需要map过程
输入:上一次的输出
输出:最终的预测
def reducer4(self,key,value): totalFre=0 totalRate=0 for predictRate,fre in value: totalRate+=predictRate*fre totalFre+=fre yield key,totalRate/totalFre
样例输出:
["C", "2"] 4.7999999999999998
["D", "3"] 2.0完整代码:
from mrjob.job import MRJob class PredictBySlopeOne(MRJob): # first step, generate user-item table, input is the train.txt and the predict predict.txt def mapper1(self,_,line): item=line.split() if len(item)>2: yield item[0],(item[1],item[2]) elif(len(item)==2): yield item[0],(item[1],-1) def reducer1(self,key,value): for v in value: yield key,v def reducer2(self,key,value): rateM={} preM=[] for movieid,rate in value: if rate==-1.0: preM.append(movieid) else: rateM[movieid]=rate for i,itemi in enumerate(rateM.iterkeys()): for j,itemj in enumerate(rateM.iterkeys()): if j<=i :continue if itemi>itemj: yield (itemi,itemj),(key,float(rateM[itemi])-float(rateM[itemj]),-1) else: yield (itemj,itemi),(key,float(rateM[itemj])-float(rateM[itemi]),-1) for rm,rate in rateM.iteritems(): for pm in preM: if rm>pm: yield (rm,pm),(key,rate,1) # the second is the one to be predicted else: yield (pm,rm),(key,rate,0) # the first is the one to be predicted def reducer3(self,key,value): total=0 fre=0 needPredict=[] for userId,diff,flag in value: if flag!=-1: #yield key,(userId,diff,flag) needPredict.append((userId,float(diff),flag)) else: total+=float(diff) fre+=1 if fre>0 and len(needPredict)>0: avg=total/fre for userId,rate,flag in needPredict: if float(flag)==0: predictRate=avg+rate mp=key[0] else: predictRate=rate-avg mp=key[1] yield (userId,mp),(predictRate,fre) def reducer4(self,key,value): totalFre=0 totalRate=0 for predictRate,fre in value: totalRate+=predictRate*fre totalFre+=fre yield key,totalRate/totalFre def steps(self): return [self.mr(mapper=self.mapper1,combiner=self.reducer1,reducer=self.reducer1), self.mr(reducer=self.reducer2), self.mr(reducer=self.reducer3), self.mr(reducer=self.reducer4)]if __name__=="__main__": PredictBySlopeOne.run()