Storm怎么实现单词计数
                                            本篇内容主要讲解“Storm怎么实现单词计数”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Storm怎么实现单词计数”吧!

“只有客户发展了,才有我们的生存与发展!”这是创新互联的服务宗旨!把网站当作互联网产品,产品思维更注重全局思维、需求分析和迭代思维,在网站建设中就是为了建设一个不仅审美在线,而且实用性极高的网站。创新互联对成都网站制作、成都网站建设、外贸营销网站建设、网站制作、网站开发、网页设计、网站优化、网络推广、探索永无止境。
在上一次单词计数的基础上做如下改动: 使用 自定义 分组策略,将首字母相同的单词发送给同一个task计数
自定义 CustomStreamGrouping
package com.zhch.v4;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.task.WorkerTopologyContext;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class ModuleGrouping implements CustomStreamGrouping, Serializable {
    private List tasks;
    @Override
    public void prepare(WorkerTopologyContext workerContext, GlobalStreamId streamId, List targetTasks) {
        this.tasks = targetTasks;
    }
    @Override
    public List chooseTasks(int taskId, List   数据源spout
package com.zhch.v4;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class SentenceSpout extends BaseRichSpout {
    private FileReader fileReader = null;
    private boolean completed = false;
    private ConcurrentHashMap pending;
    private SpoutOutputCollector collector;
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }
    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
        this.pending = new ConcurrentHashMap();
        try {
            this.fileReader = new FileReader(map.get("wordsFile").toString());
        } catch (Exception e) {
            throw new RuntimeException("Error reading file [" + map.get("wordsFile") + "]");
        }
    }
    @Override
    public void nextTuple() {
        if (completed) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
        }
        String line;
        BufferedReader reader = new BufferedReader(fileReader);
        try {
            while ((line = reader.readLine()) != null) {
                Values values = new Values(line);
                UUID msgId = UUID.randomUUID();
                this.pending.put(msgId, values);
                this.collector.emit(values, msgId);
            }
        } catch (Exception e) {
            throw new RuntimeException("Error reading tuple", e);
        } finally {
            completed = true;
        }
    }
    @Override
    public void ack(Object msgId) {
        this.pending.remove(msgId);
    }
    @Override
    public void fail(Object msgId) {
        this.collector.emit(this.pending.get(msgId), msgId);
    }
}  实现语句分割bolt
package com.zhch.v4;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SplitSentenceBolt extends BaseRichBolt {
    private OutputCollector collector;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }
    @Override
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for (String word : words) {
            collector.emit(tuple, new Values(word));
        }
        this.collector.ack(tuple);
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}实现单词计数bolt
package com.zhch.v4;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt {
    private OutputCollector collector;
    private HashMap counts = null;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.counts = new HashMap();
    }
    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = this.counts.get(word);
        if (count == null) {
            count = 0L;
        }
        count++;
        this.counts.put(word, count);
        BufferedWriter writer = null;
        try {
            writer = new BufferedWriter(new FileWriter("/home/grid/stormData/result.txt"));
            List keys = new ArrayList();
            keys.addAll(this.counts.keySet());
            Collections.sort(keys);
            for (String key : keys) {
                Long c = this.counts.get(key);
                writer.write(key + " : " + c);
                writer.newLine();
                writer.flush();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (writer != null) {
                try {
                    writer.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                writer = null;
            }
        }
        this.collector.ack(tuple);
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word", "count"));
    }
}    实现单词计数topology
package com.zhch.v4;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
public class WordCountTopology {
    public static final String SENTENCE_SPOUT_ID = "sentence-spout";
    public static final String SPLIT_BOLT_ID = "split-bolt";
    public static final String COUNT_BOLT_ID = "count-bolt";
    public static final String TOPOLOGY_NAME = "word-count-topology-v4";
    public static void main(String[] args) throws Exception {
        SentenceSpout spout = new SentenceSpout();
        SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
        WordCountBolt countBolt = new WordCountBolt();
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
        builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4)
                .shuffleGrouping(SENTENCE_SPOUT_ID);
        builder.setBolt(COUNT_BOLT_ID, countBolt, 2)
                .customGrouping(SPLIT_BOLT_ID, new ModuleGrouping()); //使用 自定义 分组策略
        Config config = new Config();
        config.put("wordsFile", args[0]);
        if (args != null && args.length > 1) {
            config.setNumWorkers(2);
            //集群模式启动
            StormSubmitter.submitTopology(args[1], config, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
            }
            cluster.killTopology(TOPOLOGY_NAME);
            cluster.shutdown();
        }
    }
}提交到Storm集群
storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v4.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v4
运行结果:
[grid@hadoop5 stormData]$ cat result.txt Apache : 1 ETL : 1 It : 1 Storm : 4 a : 4 analytics : 1 and : 5 any : 1 at : 1 can : 1 cases: : 1 clocked : 1 computation : 2 continuous : 1 easy : 2 guarantees : 1 is : 6 it : 2 machine : 1 makes : 1 many : 1 million : 1 more : 1 of : 2 online : 1 open : 1 operate : 1 over : 1 scalable : 1 second : 1 set : 1 simple : 1 source : 1 streams : 1 system : 1 unbounded : 1 up : 1 use : 2 used : 1 what : 1 will : 1 with : 1 your : 1 [grid@hadoop6 stormData]$ cat result.txt Hadoop : 1 RPC : 1 batch : 1 be : 2 benchmark : 1 data : 2 did : 1 distributed : 2 doing : 1 fast: : 1 fault-tolerant : 1 for : 2 free : 1 fun : 1 has : 1 language : 1 learning : 1 lot : 1 node : 1 per : 2 process : 1 processed : 2 processing : 2 programming : 1 realtime : 3 reliably : 1 to : 3 torm : 1 tuples : 1
到此,相信大家对“Storm怎么实现单词计数”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
网站栏目:Storm怎么实现单词计数
URL链接:http://www.scyingshan.cn/article/ggeshe.html

 建站
建站
 咨询
咨询 售后
售后
 建站咨询
建站咨询 
 