org.apache.stormstorm-core0.9.3org.apache.kafka

日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

kafka+storm連接

系統 1878 0

本項目為maven項目,需要添加必要的storm庫,以及kafka依賴,使用storm自帶的storm-kafka進行連接,根據自己集群環境

      		<dependency>

			<groupId>org.apache.storm</groupId>

			<artifactId>storm-core</artifactId>

			<version>0.9.3</version>

		</dependency>



		<dependency>

			<groupId>org.apache.kafka</groupId>

			<artifactId>kafka_2.10</artifactId>

			<version>0.8.2.1</version>

			<exclusions>

				<exclusion>

					<groupId>org.apache.zookeeper</groupId>

					<artifactId>zookeeper</artifactId>

				</exclusion>

				<exclusion>

					<groupId>log4j</groupId>

					<artifactId>log4j</artifactId>

				</exclusion>

			</exclusions>

		</dependency>



		<dependency>

			<groupId>org.apache.storm</groupId>

			<artifactId>storm-kafka</artifactId>

			<version>0.9.3</version>

		</dependency>


    

  實例topology:

      package com.xh.kafka.test;



import storm.kafka.BrokerHosts;

import storm.kafka.KafkaSpout;

import storm.kafka.SpoutConfig;

import storm.kafka.StringScheme;

import storm.kafka.ZkHosts;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.spout.SchemeAsMultiScheme;

import backtype.storm.topology.TopologyBuilder;



public class KafkaSpoutTest {



	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

		

		BrokerHosts brokerHosters = new ZkHosts("zookeeperip1:2181,zookeeperip2:2181/kafka/65_250-252");

		

		String topic = "log_test";

		

		//offsetZkRoot 和 offsetZkId 自定義即可

		String offsetZkRoot = "/storm_test";

		String offsetZkId = "kafka-storm";

		

		SpoutConfig spoutConfig = new SpoutConfig(brokerHosters, topic, offsetZkRoot, offsetZkId);

		

		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());		

		

				

		Config conf = new Config();

			

		TopologyBuilder builder = new TopologyBuilder();



		builder.setSpout("spout", new KafkaSpout(spoutConfig));

		builder.setBolt("bolt", new SequenceBolt()).shuffleGrouping("spout");

	

		if(args != null && args.length > 0){

			conf.setNumWorkers(3);

			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

		}else{

			LocalCluster cluster = new LocalCluster();

			cluster.submitTopology("my-topology", conf, builder.createTopology());

		}

	}



}


    

  此外,不管是本地運行還是集群運行,都需要 修改host文件,添加,kafka集群的機器名 ,例如:

      192.168.*.* kafka-01

192.168.**.** kafka-02

192.168.***.*** kafka-03


    

  否則會報錯如下:

      23810 [Thread-10-spout] INFO  kafka.consumer.SimpleConsumer - Reconnect due to socket error: java.nio.channels.ClosedChannelException



23815 [Thread-10-spout] ERROR backtype.storm.util - Async loop died!

java.lang.RuntimeException: java.nio.channels.ClosedChannelException

at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.3.jar:0.9.3]

at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[storm-core-0.9.3.jar:0.9.3]

at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]

at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]

at java.lang.Thread.run(Unknown Source) [na:1.7.0_65]

Caused by: java.nio.channels.ClosedChannelException: null

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.10-0.8.2.1.jar:na]

at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) ~[kafka_2.10-0.8.2.1.jar:na]

at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) ~[kafka_2.10-0.8.2.1.jar:na]

at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) ~[kafka_2.10-0.8.2.1.jar:na]

at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2.1.jar:na]

at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.PartitionManager.<init>(PartitionManager.java:83) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-0.9.3.jar:0.9.3]

... 6 common frames omitted


    

kafka+storm連接


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 册亨县| 微博| 建水县| 惠来县| 深州市| 凉山| 岚皋县| 桦甸市| 大石桥市| 蒙城县| 牙克石市| 库车县| 秭归县| 余干县| 佛坪县| 涡阳县| 若尔盖县| 无锡市| 耿马| 阿巴嘎旗| 南昌市| 抚远县| 图片| 鄢陵县| 温宿县| 元江| 双城市| 贵德县| 若尔盖县| 米泉市| 邯郸市| 福清市| 瓮安县| 毕节市| 弥渡县| 南康市| 白城市| 墨脱县| 潢川县| 鞍山市| 和田县|