TopKey怎么设置分隔符
                                            本篇内容介绍了“TopKey怎么设置分隔符”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
创新互联专业为企业提供昌江黎族网站建设、昌江黎族做网站、昌江黎族网站设计、昌江黎族网站制作等企业网站建设、网页设计与制作、昌江黎族企业网站模板建站服务,十年昌江黎族做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。
key和value的默认分隔符为tab键
设置分隔符
程序一
package org.conan.myhadoop.TopKey;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//单文件最值
public class TopKMapReduce {
    static class TopKMapper extends
            Mapper {
        // 输出的key
        private Text mapOutputKey = new Text();
        // 输出的value
        private LongWritable mapOutputValue = new LongWritable();
        // 存储最大值和初始值
        long topkValue = Long.MIN_VALUE;
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String lineValue = value.toString();
            String[] strs = lineValue.split("\t");
            // 中间值
            long tempValue = Long.valueOf(strs[1]);
            if (topkValue < tempValue) {
                topkValue = tempValue;
                mapOutputKey.set(strs[0]);
            }
        }
        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            mapOutputValue.set(topkValue);
            context.write(mapOutputKey, mapOutputValue);
        }
        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            super.setup(context);
        }
    }
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, TopKMapReduce.class.getSimpleName());
        job.setJarByClass(TopKMapReduce.class);
        Path inputDir = new Path(args[0]);
        FileInputFormat.addInputPath(job, inputDir);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(TopKMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        // job.setReducerClass(ModuleReducer.class);
        // job.setOutputKeyClass(LongWritable.class);
        // job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);
        Path outputDir = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputDir);
        Boolean isCompletion = job.waitForCompletion(true);
        return isCompletion ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput",
                "hdfs://hadoop-master:9000/data/topkoutput" };
        int status = new TopKMapReduce().run(args);
        System.exit(status);
    }
} 程序二
package org.conan.myhadoop.TopKey;
import java.io.IOException;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//单文件 top n TreeMap实现
public class TopKMapReduceV2 {
    static class TopKMapper extends
            Mapper {
        
        public static final int K=3;//前三名
        private LongWritable mapKey = new LongWritable();
        private Text mapValue = new Text();
        
        TreeMap topMap = null;//默认按key的升序排列
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String lineValue = value.toString();
            String[] strs = lineValue.split("\t");
        
            long tempValue = Long.valueOf(strs[1]);
               String tempKey=strs[0];
               mapKey.set(tempValue);
               mapValue.set(tempKey);
               topMap.put(mapKey, mapValue);
    
               if(topMap.size()>K){
                   topMap.remove(topMap.firstKey());
                   
               }
        }
        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            Set keySet=    topMap.keySet();
            for( LongWritable key:keySet) {
                
                context.write(topMap.get(key), key);
            }
        }
        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            super.setup(context);
        }
    }
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, TopKMapReduceV2.class.getSimpleName());
        job.setJarByClass(TopKMapReduceV2.class);
        Path inputDir = new Path(args[0]);
        FileInputFormat.addInputPath(job, inputDir);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(TopKMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        // job.setReducerClass(ModuleReducer.class);
        // job.setOutputKeyClass(LongWritable.class);
        // job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);
        Path outputDir = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputDir);
        Boolean isCompletion = job.waitForCompletion(true);
        return isCompletion ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput",
                "hdfs://hadoop-master:9000/data/topkoutput2" };
        int status = new TopKMapReduceV2().run(args);
        System.exit(status);
    }
}   程序三
package org.conan.myhadoop.TopKey;
import java.io.IOException;
import java.util.Comparator;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//单文件 top n TreeSet实现
public class TopKMapReduceV3 {
    static class TopKMapper extends
            Mapper {
        
        public static final int K=3;//前三名
        
        
        TreeSet topSet = new TreeSet(//
                new Comparator() {
            @Override
            public int compare(TopKWritable o1, TopKWritable o2) {
            
                return o1.getCount().compareTo(o2.getCount());
            }
        }) ;
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String lineValue = value.toString();
            String[] strs = lineValue.split("\t");
        
            long tempValue = Long.valueOf(strs[1]);
             
              
               topSet.add(new TopKWritable(strs[0], tempValue));
    
               if(topSet.size()>K){
                   topSet.remove(topSet.first());
                   
               }
        }
        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            
            for( TopKWritable top:topSet) {
                
                context.write(new Text(top.getWord()), new LongWritable(top.getCount()));
            }
        }
        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            super.setup(context);
        }
    }
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, TopKMapReduceV3.class.getSimpleName());
        job.setJarByClass(TopKMapReduceV3.class);
        Path inputDir = new Path(args[0]);
        FileInputFormat.addInputPath(job, inputDir);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(TopKMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        // job.setReducerClass(ModuleReducer.class);
        // job.setOutputKeyClass(LongWritable.class);
        // job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);
        Path outputDir = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputDir);
        Boolean isCompletion = job.waitForCompletion(true);
        return isCompletion ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput",
                "hdfs://hadoop-master:9000/data/topkoutput3" };
        int status = new TopKMapReduceV3().run(args);
        System.exit(status);
    }
}    程序四 自定义数据类型加比较器
package org.conan.myhadoop.TopKey;
import java.io.IOException;
import java.util.Comparator;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//多个文件,需要reduce统计top n
public class TopKMapReduceV4 {
    static class TopKMapper extends
            Mapper {
        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String lineValue = value.toString();
            String[] strs = lineValue.split("\t");
            long tempValue = Long.valueOf(strs[1]);
            context.write(new Text(strs[0]), new LongWritable(tempValue));
        }
        @Override
        public void cleanup(Context context) throws IOException,
                InterruptedException {
            super.cleanup(context);
        }
        @Override
        public void setup(Context context) throws IOException,
                InterruptedException {
            super.setup(context);
        }
    }
    public static class TopKReducer extends
            Reducer {
        public static final int K = 3;// 前三名
        TreeSet topSet = new TreeSet(//
                new Comparator() {
                    @Override
                    public int compare(TopKWritable o1, TopKWritable o2) {
                        return o1.getCount().compareTo(o2.getCount());
                    }
                });
        @Override
        public void setup(Context context) throws IOException,
                InterruptedException {
            super.setup(context);
        }
        @Override
        public void reduce(Text key, Iterable values,
                Context context) throws IOException, InterruptedException {
            long count = 0;
            for (LongWritable value : values) {
                count += value.get();
            }
            topSet.add(new TopKWritable(key.toString(), count));
            if (topSet.size() > K) {
                topSet.remove(topSet.first());
            }
        }
        @Override
        public void cleanup(Context context) throws IOException,
                InterruptedException {
            
            for (TopKWritable top : topSet) {
                context.write(new Text(top.getWord()),
                        new LongWritable(top.getCount()));
            }
        }
    }
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, TopKMapReduceV4.class.getSimpleName());
        job.setJarByClass(TopKMapReduceV4.class);
        Path inputDir = new Path(args[0]);
        FileInputFormat.addInputPath(job, inputDir);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(TopKMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.setReducerClass(TopKReducer.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(LongWritable.class);
        job.setNumReduceTasks(1);
        Path outputDir = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputDir);
        Boolean isCompletion = job.waitForCompletion(true);
        return isCompletion ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput",
                "hdfs://hadoop-master:9000/data/topkoutput4" };
        int status = new TopKMapReduceV4().run(args);
        System.exit(status);
    }
}      package org.conan.myhadoop.TopKey; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; //自定义数据类型 public class TopKWritable implements WritableComparable{ private String word; private Long count; public TopKWritable(){}; public TopKWritable(String word,Long count) { this.set(word, count); } public void set(String word,Long count) { this.word = word; this.count = count; } public String getWord() { return word; } public Long getCount() { return count; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(word); out.writeLong(count); } @Override public void readFields(DataInput in) throws IOException { this.word=in.readUTF(); this.count=in.readLong(); } @Override public int compareTo(TopKWritable o) { int cmp=this.word.compareTo(o.getWord()); if(0!=cmp){ return cmp; } return this.count.compareTo(o.getCount()); } @Override public String toString() { return word +"\t"+count; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((count == null) ? 0 : count.hashCode()); result = prime * result + ((word == null) ? 0 : word.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; TopKWritable other = (TopKWritable) obj; if (count == null) { if (other.count != null) return false; } else if (!count.equals(other.count)) return false; if (word == null) { if (other.word != null) return false; } else if (!word.equals(other.word)) return false; return true; } } 
程序五:经典案例

package org.conan.myhadoop.TopKey;
import java.io.IOException;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * 
 数据格式: 语言类别 歌曲名称 收藏次数 播放次数 歌手名称
 * 
 * 需求: 统计前十首播放最多的歌曲名称和次数
 * 
 * 
 */
public class TopKeyMapReduce {
    public static final int K = 10;
    static class TopKeyMapper extends
            Mapper {
        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String lineValue = value.toString();
            if (null == lineValue) {
                return;
            }
            String[] strs = lineValue.split("\t");
           if (null!=strs&&strs.length==5){
               
               String languageType=strs[0];
               String singName=strs[1];
               String playTimes=strs[3];
               context.write(//
                       new Text(languageType+"\t"+ singName),//
                       new LongWritable(Long.valueOf(playTimes)));
           }
            
            
        }
        @Override
        public void cleanup(Context context) throws IOException,
                InterruptedException {
            super.cleanup(context);
        }
        @Override
        public void setup(Context context) throws IOException,
                InterruptedException {
            super.setup(context);
        }
    }
    public static class TopKeyReducer extends
            Reducer {
        TreeSet topSet = new TreeSet();
        @Override
        public void setup(Context context) throws IOException,
                InterruptedException {
            super.setup(context);
        }
        @Override
        public void reduce(Text key, Iterable values,
                Context context) throws IOException, InterruptedException {
            if (null==key){
                return;
            }
            
            String[] splited =key.toString().split("\t");
            if(null==splited||splited.length==0){
                return ;
            }
            
            String languageType=splited[0];
            String singName=splited[1];
            
            
            Long playTimes=0L; 
            for (LongWritable value : values) {
                playTimes += value.get();
            }
            topSet.add(new TopKeyWritable(languageType, singName, playTimes));
        
            if (topSet.size() > K) {
                topSet.remove(topSet.last());
            }
        }
        @Override
        public void cleanup(Context context) throws IOException,
                InterruptedException {
            for (TopKeyWritable top : topSet) {
                context.write(top,NullWritable.get());
            }
        }
    }
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, TopKeyMapReduce.class.getSimpleName());
        job.setJarByClass(TopKeyMapReduce.class);
        Path inputDir = new Path(args[0]);
        FileInputFormat.addInputPath(job, inputDir);
        job.setInputFormatClass(TextInputFormat.class);
        
        job.setMapperClass(TopKeyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        
        job.setReducerClass(TopKeyReducer.class);
        job.setOutputKeyClass(TopKeyWritable.class);
        job.setOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(1);
        Path outputDir = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputDir);
        Boolean isCompletion = job.waitForCompletion(true);
        return isCompletion ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        args = new String[] { "hdfs://hadoop-master:9000/data/topkey/input",
                "hdfs://hadoop-master:9000/data/topkey/output" };
        int status = new TopKMapReduceV4().run(args);
        System.exit(status);
    }
}     package org.conan.myhadoop.TopKey; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class TopKeyWritable implements WritableComparable{ String languageType; String singName; Long playTimes; public TopKeyWritable() { }; public TopKeyWritable(String languageType, String singName, Long playTimes) { this.set(languageType, singName, playTimes); }; public void set(String languageType, String singName, Long playTimes) { this.languageType = languageType; this.singName = singName; this.playTimes = playTimes; } public String getLanguageType() { return languageType; } public String getSingName() { return singName; } public Long getPlayTimes() { return playTimes; } @Override public void readFields(DataInput in) throws IOException { this.languageType = in.readUTF(); this.singName = in.readUTF(); this.playTimes = in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(languageType); out.writeUTF(singName); out.writeLong(playTimes); } @Override public int compareTo(TopKeyWritable o) { // 加个负号倒排序 return -(this.getPlayTimes().compareTo(o.getPlayTimes())); } @Override public String toString() { return languageType + "\t" + singName + "\t" + playTimes; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((languageType == null) ? 0 : languageType.hashCode()); result = prime * result + ((playTimes == null) ? 0 : playTimes.hashCode()); result = prime * result + ((singName == null) ? 0 : singName.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; TopKeyWritable other = (TopKeyWritable) obj; if (languageType == null) { if (other.languageType != null) return false; } else if (!languageType.equals(other.languageType)) return false; if (playTimes == null) { if (other.playTimes != null) return false; } else if (!playTimes.equals(other.playTimes)) return false; if (singName == null) { if (other.singName != null) return false; } else if (!singName.equals(other.singName)) return false; return true; } } 
“TopKey怎么设置分隔符”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!
当前标题:TopKey怎么设置分隔符
链接URL:http://www.scyingshan.cn/article/ihghje.html

 建站
建站
 咨询
咨询 售后
售后
 建站咨询
建站咨询 
 