首页 技术 正文
技术 2022年11月19日
0 收藏 621 点赞 5,071 浏览 13784 个字

1 协同过滤算法

  协同过滤算法是现在推荐系统的一种常用算法。分为user-CF和item-CF。

  本文的电影推荐系统使用的是item-CF,主要是由于用户数远远大于电影数,构建矩阵的代价更小;另外,电影推荐系统中使用基于物品的推荐对用户来说更有说服力。因此本文对user-CF只做简单介绍,主要介绍item-CF。

  1.1 基于用户的协同过滤算法

      a 计算出用户两两之间的相似度,得到用户相似度矩阵;

      b 预测用户的喜好,使用公式:

    

     其中,p(u,i)表示用户u对物品i的感兴趣程度,S(u,k)表示和用户u兴趣最接近的K个用户,N(i)表示对物品i有过行为的用户集合,Wuv表示用户u和用户v的兴趣相似度,Rvi表示用户v对物品i的兴趣。

     c 根据预测出来的喜好度来做推荐。

   1.2 基于物品的协同过滤算法

     1.2.1 物品相似度计算

     物品相似度的计算有多种。在这里使用同现矩阵。其中第m行第n列的元素表示物品m和物品n的相似度,具体是:如果一个用户同时看过电影m和n,则m和n的相似度就加1。还要对如下所示:

     之后还要对同现矩阵做归一化,注意归一化之后矩阵不是对称的:

      1.2.2 预测用户对未看电影的打分

      用户打分的预测值由下式计算:

      

      因此,最后得到的预测矩阵可由同现矩阵与评分矩阵直接相乘得到:

      1.2.3 推荐

      根据预测的打分,选出未看电影中的topk即生成推荐列表。

2 mapReduce工作流程

2.1 输入数据形式

表示userID, movieID,评分

2.2 总体流程

2.3 MR1

  MR1负责数据预处理,将同一个user的数据merge到一起。

  mapper负责拆分数据:

  reducer负责合并:

2.4 MR2

  MR2负责构建同现矩阵。

  mapper将一个用户看过的每部电影进行两两组合发送:

  reducer负责merge这些值,就得到同现矩阵的每个单元(行号:列号):

2.5 MR3

   MR3负责将同现矩阵归一化。

   mapper 负责读取上一个MR产生的同现矩阵cells,然后按行号发送到reducer(由于归一化是按行的,所以这里要以行号为Key)。

     reducer将得到的一行sum之后,用原来的值除以sum得到归一化的值,然后将每个单元按照列号写入HDFS(按列号写是为之后的矩阵相乘做准备)。

   因此,MR3的输入输出如下:

2.6 MR4

  MR4将完成矩阵小单元相乘的工作。

  mapper1负责读入归一化的同现矩阵的小单元,然后按列号发送(之前已经按列号存储了,这里直接读取并发送就行)

  mapper2负责读取输入的rowdata文件,即评分矩阵的每个小单元,然后按行号(movie id)发送:

   在reducer中,接收到的值分别来自同现矩阵的第x列和评分矩阵的第x行。我们知道,最终生成的预测矩阵i行j列的小单元(i,j)是等于对应的同现矩阵的(i, x)乘以评分矩阵的(x, j),再对所有x求和。而这里的reducer中聚集了所有x值相同的来自两个矩阵的小单元,因此它们两两之间是可以互乘的。这里我们用=和:来区分两个矩阵的小单元。下图中橘黄色是处于同一个reducer里面的小单元,将来自同现矩阵和评分矩阵的小单元区分开后,将它们两两相乘,得到预测矩阵的行号与列号的不同组合,以它为key写入hdfs。

2.7 MR5

  MR5负责将乘积的结果相加。

  

3 主要代码

DataDividerByUser.java

 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; public class DataDividerByUser {     public static class DataDividerMapper extends Mapper<LongWritable, Text, IntWritable, Text> {         // map method         @Override         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {             //input user,movie,rating             String[] user_movie_rating = value.toString().split(",");             int userId = Integer.parseInt(user_movie_rating[0]);             String outPutKey = user_movie_rating[1] + ":" + user_movie_rating[2];             //divide data by user             context.write(new IntWritable(userId), new Text(outPutKey));         }     }     public static class DataDividerReducer extends Reducer<IntWritable, Text, IntWritable, Text> {         // reduce method         @Override         public void reduce(IntWritable key, Iterable<Text> values, Context context)                 throws IOException, InterruptedException {             StringBuilder sb = new StringBuilder();             //merge data for one user             for (Text value : values) {                 sb.append(value.toString());                 sb.append(",");             }             sb.deleteCharAt(sb.length() - 1);             context.write(key, new Text(sb.toString()));         }     }     public static void main(String[] args) throws Exception {         Configuration conf = new Configuration();         Job job = Job.getInstance(conf);         job.setMapperClass(DataDividerMapper.class);         job.setReducerClass(DataDividerReducer.class);         job.setJarByClass(DataDividerByUser.class);         job.setInputFormatClass(TextInputFormat.class);         job.setOutputFormatClass(TextOutputFormat.class);         job.setOutputKeyClass(IntWritable.class);         job.setOutputValueClass(Text.class);         TextInputFormat.setInputPaths(job, new Path(args[0]));         TextOutputFormat.setOutputPath(job, new Path(args[1]));         job.waitForCompletion(true);     } }

CoOccurrenceMatrixGenerator.java

 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class CoOccurrenceMatrixGenerator {     public static class MatrixGeneratorMapper extends Mapper<LongWritable, Text, Text, IntWritable> {         // map method         @Override         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {             //value = userid \t movie1: rating, movie2: rating...             String[] movie_rating = value.toString().split("\t")[1].split(",");             //key = movie1: movie2 value = 1             //calculate each user rating list: <movieA, movieB>             for (int i = 0; i < movie_rating.length; i++) {                 for (int j = 0; j < movie_rating.length; j++) {                     String outPutKey = movie_rating[i].split(":")[0] + ":" + movie_rating[j].split(":")[0];                     context.write(new Text(outPutKey), new IntWritable(1));                 }             }         }     }     public static class MatrixGeneratorReducer extends Reducer<Text, IntWritable, Text, IntWritable> {         // reduce method         @Override         public void reduce(Text key, Iterable<IntWritable> values, Context context)                 throws IOException, InterruptedException {             //key movie1:movie2 value = iterable<1, 1, 1>             //calculate each two movies have been watched by how many people             int sum = 0;             for (IntWritable value : values) {                 sum += value.get();             }             context.write(key, new IntWritable(sum));         }     }     public static void main(String[] args) throws Exception{         Configuration conf = new Configuration();         Job job = Job.getInstance(conf);         job.setMapperClass(MatrixGeneratorMapper.class);         job.setReducerClass(MatrixGeneratorReducer.class);         job.setJarByClass(CoOccurrenceMatrixGenerator.class);         job.setInputFormatClass(TextInputFormat.class);         job.setOutputFormatClass(TextOutputFormat.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(IntWritable.class);         TextInputFormat.setInputPaths(job, new Path(args[0]));         TextOutputFormat.setOutputPath(job, new Path(args[1]));         job.waitForCompletion(true);     } }

Normalize.java

 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class Normalize {     public static class NormalizeMapper extends Mapper<LongWritable, Text, Text, Text> {         // map method         @Override         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {             //movieA:movieB \t relation             String movieA = value.toString().split("\t")[0].split(":")[0];             String movieB = value.toString().split("\t")[0].split(":")[1];             String relation = value.toString().split("\t")[1];             //collect the relationship list for movieA             context.write(new Text(movieA), new Text(movieB + ":" + relation));         }     }     public static class NormalizeReducer extends Reducer<Text, Text, Text, Text> {         // reduce method         @Override         public void reduce(Text key, Iterable<Text> values, Context context)                 throws IOException, InterruptedException {             //key = movieA, value=<movieB:relation, movieC:relation...>             //normalize each unit of co-occurrence matrix             Map<String, Double> map = new HashMap<String, Double>();             double sum = 0;             for (Text value : values) {                 String[] movie_relation = value.toString().split(":");                 map.put(movie_relation[0], Double.parseDouble(movie_relation[1]));                 sum += Double.parseDouble(movie_relation[1]);             }             for (Map.Entry<String, Double> entry : map.entrySet()) {                 String outputKey = entry.getKey();                 String outputValue = key.toString() + "=" + String.valueOf(entry.getValue() / sum);                 context.write(new Text(outputKey), new Text(outputValue));             }         }     }     public static void main(String[] args) throws Exception {         Configuration conf = new Configuration();         Job job = Job.getInstance(conf);         job.setMapperClass(NormalizeMapper.class);         job.setReducerClass(NormalizeReducer.class);         job.setJarByClass(Normalize.class);         job.setInputFormatClass(TextInputFormat.class);         job.setOutputFormatClass(TextOutputFormat.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(Text.class);         TextInputFormat.setInputPaths(job, new Path(args[0]));         TextOutputFormat.setOutputPath(job, new Path(args[1]));         job.waitForCompletion(true);     } }

Multiplication.java

 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; public class Multiplication {     public static class CooccurrenceMapper extends Mapper<LongWritable, Text, Text, Text> {         // map method         @Override         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {             //input: movieB \t movieA=relation             //pass data to reducer             String[] movieB_movieARelation = value.toString().split("\t");             context.write(new Text(movieB_movieARelation[0]), new Text(movieB_movieARelation[1]));         }     }     public static class RatingMapper extends Mapper<LongWritable, Text, Text, Text> {         // map method         @Override         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {             //input: user,movie,rating             //pass data to reducer             String[] user_movie_rating = value.toString().split(",");             String outputKey = user_movie_rating[0] + ":" + user_movie_rating[2];             context.write(new Text(user_movie_rating[1]), new Text(outputKey));         }     }     public static class MultiplicationReducer extends Reducer<Text, Text, Text, DoubleWritable> {         // reduce method         @Override         public void reduce(Text key, Iterable<Text> values, Context context)                 throws IOException, InterruptedException {             //key = movieB value = <movieA=relation, movieC=relation... userA:rating, userB:rating...>             //collect the data for each movie, then do the multiplication             Map<String, Double> coMap = new HashMap<String, Double>();             Map<String, Double> ratingMap = new HashMap<String, Double>();             for (Text value : values) {                 String s = value.toString();                 if (s.contains("=")) {                     coMap.put(s.split("=")[0], Double.parseDouble(s.split("=")[1]));                 } else {                     ratingMap.put(s.split(":")[0], Double.parseDouble(s.split(":")[1]));                 }             }             for (Map.Entry<String, Double> entry1 : coMap.entrySet()) {                 for (Map.Entry<String, Double> entry2 : ratingMap.entrySet()) {                     double mult = entry1.getValue() * entry2.getValue();                     String outputKey = entry2.getKey() + ":" + entry1.getKey();                     context.write(new Text(outputKey), new DoubleWritable(mult));                 }             }          }     }     public static void main(String[] args) throws Exception {         Configuration conf = new Configuration();         Job job = Job.getInstance(conf);         job.setJarByClass(Multiplication.class);         ChainMapper.addMapper(job, CooccurrenceMapper.class, LongWritable.class, Text.class, Text.class, Text.class, conf);         ChainMapper.addMapper(job, RatingMapper.class, Text.class, Text.class, Text.class, Text.class, conf);         job.setMapperClass(CooccurrenceMapper.class);         job.setMapperClass(RatingMapper.class);         job.setReducerClass(MultiplicationReducer.class);         job.setMapOutputKeyClass(Text.class);         job.setMapOutputValueClass(Text.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(DoubleWritable.class);         MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CooccurrenceMapper.class);         MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, RatingMapper.class);         TextOutputFormat.setOutputPath(job, new Path(args[2]));         job.waitForCompletion(true);     } }

Sum.java

 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; /**  * Created by Michelle on 11/12/16.  */ public class Sum {     public static class SumMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {         // map method         @Override         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {             //pass data to reducer             String[] key_value = value.toString().split("\t");             context.write(new Text(key_value[0]), new DoubleWritable(Double.parseDouble(key_value[1])));         }     }     public static class SumReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {         // reduce method         @Override         public void reduce(Text key, Iterable<DoubleWritable> values, Context context)                 throws IOException, InterruptedException {             //user:movie relation            //calculate the sum             double sum = 0;             for (DoubleWritable value : values) {                 sum += value.get();             }             context.write(key, new DoubleWritable(sum));         }     }     public static void main(String[] args) throws Exception {         Configuration conf = new Configuration();         Job job = Job.getInstance(conf);         job.setMapperClass(SumMapper.class);         job.setReducerClass(SumReducer.class);         job.setJarByClass(Sum.class);         job.setInputFormatClass(TextInputFormat.class);         job.setOutputFormatClass(TextOutputFormat.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(DoubleWritable.class);         TextInputFormat.setInputPaths(job, new Path(args[0]));         TextOutputFormat.setOutputPath(job, new Path(args[1]));         job.waitForCompletion(true);     } }

Driver.java

 public class Driver {     public static void main(String[] args) throws Exception {         DataDividerByUser dataDividerByUser = new DataDividerByUser();         CoOccurrenceMatrixGenerator coOccurrenceMatrixGenerator = new CoOccurrenceMatrixGenerator();         Normalize normalize = new Normalize();         Multiplication multiplication = new Multiplication();         Sum sum = new Sum();         String rawInput = args[0];         String userMovieListOutputDir = args[1];         String coOccurrenceMatrixDir = args[2];         String normalizeDir = args[3];         String multiplicationDir = args[4];         String sumDir = args[5];         String[] path1 = {rawInput, userMovieListOutputDir};         String[] path2 = {userMovieListOutputDir, coOccurrenceMatrixDir};         String[] path3 = {coOccurrenceMatrixDir, normalizeDir};         String[] path4 = {normalizeDir, rawInput, multiplicationDir};         String[] path5 = {multiplicationDir, sumDir};         dataDividerByUser.main(path1);         coOccurrenceMatrixGenerator.main(path2);         normalize.main(path3);         multiplication.main(path4);         sum.main(path5);     } }
相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,488
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,903
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,737
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,489
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:8,128
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:5,290