博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
19-hadoop-fof好友推荐
阅读量:6284 次
发布时间:2019-06-22

本文共 9493 字,大约阅读时间需要 31 分钟。

好友推荐的案例, 需要两个job, 第一个进行好友关系度计算, 第二个job将计算的关系进行推荐

 

1, fof关系类

package com.wenbronk.friend;import org.apache.hadoop.io.Text;/** * 定义fof关系 * @author root * */public class Fof extends Text{    public Fof() {        super();    }        /**'     * 不论谁在前,返回一致的顺序     * @param a     * @param b     */    public Fof(String a, String b) {        super(getFof(a, b));    }        /**     * 按字典顺序排序, 保证两个fof为同一组输出     * @param a     * @param b     * @return     */    public static String getFof(String a, String b) {        int r = a.compareTo(b);        if (r < 0) {            return a + "\t" + b;        }else {            return b + "\t" + a;        }                }            }

2, user类

package com.wenbronk.friend;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class User implements WritableComparable
{ private String uname; private int friedsCount; public String getUname() { return uname; } public void setUname(String uname) { this.uname = uname; } public int getFriedsCount() { return friedsCount; } public void setFriedsCount(int friedsCount) { this.friedsCount = friedsCount; } public User() { super(); } public User(String uname, int friedsCount) { super(); this.uname = uname; this.friedsCount = friedsCount; } @Override public void readFields(DataInput arg0) throws IOException { this.uname = arg0.readUTF(); this.friedsCount = arg0.readInt(); } @Override public void write(DataOutput arg0) throws IOException { arg0.writeUTF(uname); arg0.writeInt(friedsCount); } @Override public int compareTo(User o) { int result = this.uname.compareTo(o.getUname()); if (result == 0) { return Integer.compare(this.friedsCount, o.getFriedsCount()); } return result; } }

3, sort

package com.wenbronk.friend;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** * 排序 * @author root * */public class FofSort extends WritableComparator {    public FofSort() {        super(User.class, true);    }        @Override    public int compare(WritableComparable a, WritableComparable b) {        User user1 = (User) a;        User user2 = (User) b;                int compareTo = user1.getUname().compareTo(user2.getUname());        if (compareTo == 0) {            compareTo = Integer.compare(user1.getFriedsCount(), user2.getFriedsCount());        }        return compareTo;    }    }

 

4, group

package com.wenbronk.friend;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** * 自定义分组 * @author root * */public class FofGroup extends WritableComparator {    public FofGroup() {        super(User.class, true);    }        @Override    public int compare(WritableComparable a, WritableComparable b) {        User u1 = (User) a;        User u2 = (User) b;        return u1.getUname().compareTo(u2.getUname());    }    }

5, job

package com.wenbronk.friend;import java.io.IOException;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * 1个mapreduce找到所有的fof关系 第二个mapreduce执行排序 *  * @author root */public class RunJob {    public static void main(String[] args) throws IOException {        Configuration configuration = new Configuration();//        configuration.set("mapred.jar", "C:/Users/wenbr/Desktop/fof.jar");                // 本地运行        configuration.set("fs.default", "hdfs://wenbronk.hdfs.com:8020    ");        configuration.set("yarn.resourcemanager", "hdfs://192.168.208.106");                        if (runFindFof(configuration)) {            // 根据foffind进行排序            run2(configuration);        }            }    /**     * 找到所有的fof关系     * @throws IOException      */    private static boolean runFindFof(Configuration conf) throws IOException {        try {            FileSystem fs = FileSystem.get(conf);            Job job = Job.getInstance(conf);            job.setJobName("friend");                        job.setJarByClass(RunJob.class);            job.setMapperClass(FofMapper.class);            job.setReducerClass(FofReduce.class);            job.setMapOutputKeyClass(Fof.class);            job.setMapOutputValueClass(IntWritable.class);            //            job.setJar("C:/Users/wenbr/Desktop/friend.jar");                        job.setInputFormatClass(KeyValueTextInputFormat.class);//            FileInputFormat.addInputPath(job, new Path("/usr/friend.txt"));            FileInputFormat.addInputPath(job, new Path("E:\\sxt\\1-MapReduce\\data\\friend.txt"));                        Path path = new Path("/root/usr/fof/f1");            if (fs.exists(path)) {                fs.delete(path, true);            }            FileOutputFormat.setOutputPath(job, path);                        return job.waitForCompletion(true);        }catch(Exception e) {            e.printStackTrace();        }        return false;    }    static class FofMapper extends Mapper
{ @Override protected void map(Text key, Text value, Mapper
.Context context) throws IOException, InterruptedException { // super.map(key, value, context); String user = key.toString(); String[] frieds = StringUtils.split(value.toString(), '\t'); for (int i = 0; i < frieds.length; i++) { String f1 = frieds[i]; // 去掉是直接好友的, 按组输出, 如果组中有value=0 的, 整组数据舍弃 context.write(new Fof(user, f1), new IntWritable(0)); for (int j = i + 1; j < frieds.length; j++) { String f2 = frieds[j]; Fof fof = new Fof(f1, f2); context.write(fof, new IntWritable(1)); } } } } static class FofReduce extends Reducer
{ @Override protected void reduce(Fof arg0, Iterable
arg1, Reducer
.Context arg2) throws IOException, InterruptedException { boolean flag = false; int sum = 0; for (IntWritable count : arg1) { // 值有0的, 整组数据舍弃 if (count.get() == 0) { flag = true; break; } else { sum += count.get(); } } if (!flag) { arg2.write(arg0, new IntWritable(sum)); } } } /** * 向用户推荐好友 * @param config */ public static void run2(Configuration config) { try { FileSystem fileSystem = FileSystem.get(config); Job job = Job.getInstance(config); job.setJobName("fof2"); job.setMapperClass(SortMapper.class); job.setReducerClass(SortReduce.class); job.setSortComparatorClass(FofSort.class); job.setGroupingComparatorClass(FofGroup.class); job.setMapOutputKeyClass(User.class); job.setMapOutputValueClass(User.class); job.setInputFormatClass(KeyValueTextInputFormat.class); // 设置MR执行的输入文件 FileInputFormat.addInputPath(job, new Path("/usr/output/f1")); // 设置输出文件, 文件不可存在 Path path = new Path("/root/usr/fof/f2"); if (fileSystem.exists(path)) { fileSystem.delete(path, true); } FileOutputFormat.setOutputPath(job, path); boolean f = job.waitForCompletion(true); if (f) { System.out.println("job, 成功执行"); } }catch (Exception e) { e.printStackTrace(); } } static class SortMapper extends Mapper
{ @Override protected void map(Text key, Text value, Mapper
.Context context) throws IOException, InterruptedException { String[] args = StringUtils.split(value.toString(), '\t'); String other = args[0]; int friendsCount = Integer.parseInt(args[1]); // 输出两次, 同时给fof两个用户推荐好友 context.write(new User(key.toString(), friendsCount), new User(other, friendsCount)); context.write(new User(other, friendsCount), new User(key.toString(), friendsCount)); } } static class SortReduce extends Reducer
{ @Override protected void reduce(User arg0, Iterable
arg1, Reducer
.Context arg2) throws IOException, InterruptedException { String uname = arg0.getUname(); StringBuilder stringBuilder = new StringBuilder(); for (User user : arg1) { stringBuilder.append(user.getUname() + ": " + user.getFriedsCount()); stringBuilder.append(", "); } arg2.write(new Text(uname), new Text(stringBuilder.toString())); } }}

 

初始文档 

小明    老王    如花    林志玲老王    小明    凤姐如花    小明    李刚    凤姐林志玲    小明    李刚    凤姐    郭美美李刚    如花    凤姐    林志玲郭美美    凤姐    林志玲凤姐    如花    老王    林志玲    郭美美

 

系列来自尚学堂视频

转载地址:http://dpxva.baihongyu.com/

你可能感兴趣的文章
轻松学PHP
查看>>
Linux中的网络监控命令
查看>>
this的用法
查看>>
windows下安装redis
查看>>
CentOS7 yum 安装git
查看>>
启动日志中频繁出现以下信息
查看>>
httpd – 对Apache的DFOREGROUND感到困惑
查看>>
分布式锁的一点理解
查看>>
idea的maven项目,install下载重复下载本地库中已有的jar包,而且下载后jar包都是lastupdated问题...
查看>>
2019测试指南-web应用程序安全测试(二)指纹Web服务器
查看>>
树莓派3链接wifi
查看>>
js面向对象编程
查看>>
Ruby中类 模块 单例方法 总结
查看>>
jQuery的validate插件
查看>>
5-4 8 管道符 作业控制 shell变量 环境变量配置
查看>>
Enumberable
查看>>
开发者论坛一周精粹(第五十四期) 求购备案服务号1枚!
查看>>
validate表单验证及自定义方法
查看>>
javascript 中出现missing ) after argument list的错误
查看>>
使用Swagger2构建强大的RESTful API文档(2)(二十三)
查看>>