日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

Storm 中drpc調用

系統 2005 0
      package storm.starter;



import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.LocalDRPC;

import backtype.storm.StormSubmitter;

import backtype.storm.drpc.DRPCSpout;

import backtype.storm.task.ShellBolt;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

import storm.starter.spout.RandomSentenceSpout;



import java.lang.management.ManagementFactory;

import java.util.HashMap;

import java.util.Map;



import org.apache.log4j.Logger;

import org.apache.log4j.PropertyConfigurator;



/**

 * This topology demonstrates Storm's stream groupings and multilang

 * capabilities.

 */

public class Drpctest {

	public static final Logger logger = Logger.getLogger(Drpctest.class);

	public static class WordCount extends BaseBasicBolt {

		Map<String, Integer> counts = new HashMap<String, Integer>();



		@Override

		public void execute(Tuple tuple, BasicOutputCollector collector) {

			String word = tuple.getString(0);

			logger.error(this.toString() + "word = " + word);

			Integer count = counts.get(word);

			if (count == null)

				count = 0;

			count++;

			counts.put(word, count);

			logger.error(this.toString() + "count = " + count);

			collector.emit(new Values(word, count));

		}



		String str = Thread.currentThread().getName();



		@Override

		public void declareOutputFields(OutputFieldsDeclarer declarer) {

			logger.error("declareOutputFields :");

			declarer.declare(new Fields("result", "count"));

		}

	}



	public static class DrpcBolt extends BaseBasicBolt {

		Map<String, Integer> counts = new HashMap<String, Integer>();



		@Override

		public void execute(Tuple tuple, BasicOutputCollector collector) {

			String logString = tuple.getString(0);

			logger.error("DrpcBolt recve :" + logString);

		}



		@Override

		public void declareOutputFields(OutputFieldsDeclarer declarer) {

			// 暫時沒用

			declarer.declare(new Fields("word1", "count1"));

		}

	}



	public static void main(String[] args) throws Exception {

		TopologyBuilder builder = new TopologyBuilder();



		// drpc

		LocalDRPC drpc = new LocalDRPC();

		DRPCSpout drpc_spout = new DRPCSpout("testdrpc", drpc);

		builder.setSpout("drpcspout", drpc_spout, 3);



		PropertyConfigurator

				.configure("/home/hadoop/code1/Kafka/src/Log4j.properties");



		// 接入drpc

		builder.setBolt("DrpcBolt", new DrpcBolt(), 1).shuffleGrouping(

				"drpcspout");



		Config conf = new Config();

		conf.setDebug(true);



		if (args != null && args.length > 0) {

			conf.setNumWorkers(3);



			StormSubmitter.submitTopology(args[0], conf,

					builder.createTopology());

		} else {

			conf.setMaxTaskParallelism(3);

			conf.setDebug(true);



			LocalCluster cluster = new LocalCluster();

			cluster.submitTopology("word-count", conf, builder.createTopology());



			String str = "send test drpc"; // 和 DRPCSpout 名字對應

			drpc.execute("testdrpc", str);



			Thread.sleep(10000);



			cluster.shutdown();

		}

	}

}


    

?

Storm 中drpc調用


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 辰溪县| 无极县| 崇文区| 景德镇市| 麟游县| 汪清县| 阜新市| 巴楚县| 嘉义县| 那坡县| 靖西县| 潞城市| 东方市| 阳新县| 五河县| 沁水县| 尼木县| 郸城县| 杭锦旗| 绿春县| 重庆市| 淅川县| 宁城县| 永嘉县| 广宁县| 县级市| 获嘉县| 渭南市| 永胜县| 富平县| 许昌市| 绍兴县| 景德镇市| 绍兴市| 黄浦区| 内丘县| 丰顺县| 贺兰县| 钦州市| 隆安县| 公主岭市|