日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

Storm源碼淺析之topology的提交

系統(tǒng) 2497 0
最近一直在讀twitter開源的這個分布式流計算框架——storm的源碼,還是有必要記錄下一些比較有意思的地方。我按照storm的主要概念進(jìn)行組織,并且只分析我關(guān)注的東西,因此稱之為淺析。

一、介紹
Storm的開發(fā)語言主要是Java和Clojure,其中Java定義骨架,而Clojure編寫核心邏輯。源碼統(tǒng)計結(jié)果:

<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–> 180 textfiles.
177 uniquefiles.
7 filesignored.

http: // cloc.sourceforge.netv1.55T=1.0s(171.0files/s,46869.0lines/s)
——————————————————————————-
Languagefilesblankcommentcode
——————————————————————————-
Java
125 5010 2414 25661
Lisp
33 732 283 4871
Python
7 742 433 4675
CSS
1 12 45 1837
ruby
2 22 0 104
BourneShell
1 0 0 6
Javascript
2 1 15 6
——————————————————————————-
SUM:
171 6519 3190 37160
——————————————————————————-

Java代碼25000多行,而Clojure(Lisp)只有4871行,說語言不重要再次證明是扯淡。

二、Topology和Nimbus
Topology是storm的核心理念,將spout和bolt組織成一個topology,運行在storm集群里,完成實時分析和計算的任務(wù)。這里我主要想介紹下topology部署到storm集群的大概過程。提交一個topology任務(wù)到Storm集群是通過StormSubmitter.submitTopology方法提交:

<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–> StormSubmitter.submitTopology(name,conf,builder.createTopology());

我們將topology打成jar包后,利用bin/storm這個python腳本,執(zhí)行如下命令:

<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–> bin / stormjarxxxx.jarcom.taobao.MyTopologyargs

將jar包提交給storm集群。storm腳本會啟動JVM執(zhí)行Topology的main方法,執(zhí)行submitTopology的過程。而submitTopology會將jar 文件上傳 到nimbus,上傳是通過socket傳輸。在storm這個python腳本的jar方法里可以看到:

<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–> def jar(jarfile,klass, * args):
exec_storm_class(
klass,
jvmtype
= -client ,
extrajars
= [jarfile,CONF_DIR,STORM_DIR + " /bin " ],
args
= args,
prefix
= exportSTORM_JAR= + jarfile + ; )

將jar文件的地址設(shè)置為環(huán)境變量STORM_JAR,這個環(huán)境變量在執(zhí)行submitTopology的時候用到:

<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–> // StormSubmitter.java
private static void submitJar(Mapconf){
if (submittedJar == null ){
LOG.info(
Jarnotuploadedtomasteryet.Submittingjar );
StringlocalJar
= System.getenv( STORM_JAR ) ;
submittedJar
= submitJar(conf,localJar);
}
else {
LOG.info(
Jaralreadyuploadedtomaster.Notsubmittingjar. );
}
}

通過環(huán)境變量找到j(luò)ar包的地址,然后上傳。利用環(huán)境變量傳參是個小技巧。

其次,nimbus在接收到j(luò)ar文件后,存放到數(shù)據(jù)目錄的inbox目錄, nimbus數(shù)據(jù)目錄的結(jié)構(gòu)

<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–> - nimbus
- inbox
- stormjar - 57f1d694 - 2865 - 4b3b - 8a7c - 99104fc0aea3.jar
- stormjar - 76b4e316 - b430 - 4215 - 9e26 - 4f33ba4ee520.jar

- stormdist
- storm - id
- stormjar.jar
- stormconf.ser
- stormcode.ser

其中inbox用于存放提交的jar文件,每個jar文件都重命名為stormjar加上一個32位的UUID。而stormdist存放的是啟動topology后生成的文件,每個topology都分配一個唯一的id,ID的規(guī)則是“name-計數(shù)-時間戳”。啟動后的topology的jar文件名命名為storm.jar ,而它的配置經(jīng)過java序列化后存放在stormconf.ser文件,而stormcode.ser是將topology本身序列化后存放的文件。 這些文件在部署的時候,supervisor會從這個目錄下載這些文件,然后在supervisor本地執(zhí)行這些代碼。
進(jìn)入重點,topology任務(wù)的分配過程(zookeeper路徑說明忽略root):
1.在zookeeper上創(chuàng)建/taskheartbeats/{storm id} 路徑,用于任務(wù)的心跳檢測。storm對zookeeper的一個重要應(yīng)用就是利用zk的臨時節(jié)點做存活檢測。task將定時刷新節(jié)點的時間戳,然后nimbus會檢測這個時間戳是否超過timeout設(shè)置。
2.從topology中獲取bolts,spouts設(shè)置的并行數(shù)目以及全局配置的最大并行數(shù),然后產(chǎn)生task id列表,如[1 2 3 4]
3.在zookeeper上創(chuàng)建/tasks/{strom id}/{task id}路徑,并存儲task信息
4.開始分配任務(wù)(內(nèi)部稱為assignment), 具體步驟:
(1)從zk上獲得已有的assignment(新的toplogy當(dāng)然沒有了)
(2)查找所有可用的slot,所謂slot就是可用的worker,在所有supervisor上配置的多個worker的端口。
(3)將任務(wù)均勻地分配給可用的worker,這里有兩種情況:
(a)task數(shù)目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最終是這樣分配

<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–> { 1 :[host1:port1] 2 :[host2:port1]
3 :[host1:port1] 4 :[host2:port1]}

,可以看到任務(wù)平均地分配在兩個worker上。
(b)如果task數(shù)目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先會將woker排序, 將不同host間隔排列 ,保證task不會全部分配到同一個worker上,也就是將worker排列成

<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–> [host1:port1host2:port1host1:port2host2:port2]

,然后分配任務(wù)為

<!–

Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/

–> { 1 :host1:port1, 2 :host2:port2}

(4)記錄啟動時間
(5)判斷現(xiàn)有的assignment是否跟重新分配的assignment相同,如果相同,不需要變更,否則更新assignment到zookeeper的/assignments/{storm id}上。
5.啟動topology,所謂啟動,只是將zookeeper上/storms/{storm id}對應(yīng)的數(shù)據(jù)里的active設(shè)置為true。
6.nimbus會檢查task的心跳,如果發(fā)現(xiàn)task心跳超過超時時間,那么會重新跳到第4步做re-assignment。

Storm源碼淺析之topology的提交


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 新泰市| 灵山县| 宝丰县| 疏附县| 定安县| 吴忠市| 新密市| 黑水县| 垫江县| 沅陵县| 新绛县| 三明市| 太康县| 洛扎县| 万盛区| 磐石市| 米易县| 朝阳区| 安阳市| 万山特区| 河西区| 梁平县| 澄迈县| 广南县| 枣强县| 嘉兴市| 苏州市| 洞口县| 湄潭县| 宜州市| 炎陵县| 夏邑县| 曲水县| 东兰县| 朝阳市| 科技| 股票| 隆回县| 台东县| 太白县| 青田县|