http://www.cnblogs.com/panfeng412/category/367117.html
本文首先介紹了Storm的基本概念和數(shù)據(jù)流模型,然后結(jié)合一個(gè)典型應(yīng)用場景來說明Storm支持Topology之間數(shù)據(jù)流訂閱的必要性,最后對比了Storm與另一個(gè)流處理系統(tǒng)在數(shù)據(jù)流模型上的區(qū)別之處。
Storm基本概念
Storm是一個(gè)開源的實(shí)時(shí)計(jì)算系統(tǒng),它提供了一系列的基本元素用于進(jìn)行計(jì)算:Topology、Stream、Spout、Bolt等等。
在Storm中,一個(gè)實(shí)時(shí)應(yīng)用的計(jì)算任務(wù)被打包作為Topology發(fā)布,這同Hadoop的MapReduce任務(wù)相似。但是有一點(diǎn)不同的是:在Hadoop中,MapReduce任務(wù)最終會執(zhí)行完成后結(jié)束;而在Storm中,Topology任務(wù)一旦提交后永遠(yuǎn)不會結(jié)束,除非你顯示去停止任務(wù)。
計(jì)算任務(wù)Topology是由不同的Spouts和Bolts,通過數(shù)據(jù)流(Stream)連接起來的圖。下面是一個(gè)Topology的結(jié)構(gòu)示意圖:
其中包含有:
Spout:Storm中的消息源,用于為Topology生產(chǎn)消息(數(shù)據(jù)),一般是從外部數(shù)據(jù)源(如Message Queue、RDBMS、NoSQL、Realtime Log)不間斷地讀取數(shù)據(jù)并發(fā)送給Topology消息(tuple元組)。
Bolt:Storm中的消息處理者,用于為Topology進(jìn)行消息的處理,Bolt可以執(zhí)行過濾, 聚合, 查詢數(shù)據(jù)庫等操作,而且可以一級一級的進(jìn)行處理。
最終,Topology會被提交到storm集群中運(yùn)行;也可以通過命令停止Topology的運(yùn)行,將Topology占用的計(jì)算資源歸還給Storm集群。
Storm數(shù)據(jù)流模型
數(shù)據(jù)流(Stream)是Storm中對數(shù)據(jù)進(jìn)行的抽象,它是時(shí)間上無界的tuple元組序列。在Topology中,Spout是Stream的源頭,負(fù)責(zé)為Topology從特定數(shù)據(jù)源發(fā)射Stream;Bolt可以接收任意多個(gè)Stream作為輸入,然后進(jìn)行數(shù)據(jù)的加工處理過程,如果需要,Bolt還可以發(fā)射出新的Stream給下級Bolt進(jìn)行處理。
下面是一個(gè)Topology內(nèi)部Spout和Bolt之間的數(shù)據(jù)流關(guān)系:
Topology中每一個(gè)計(jì)算組件(Spout和Bolt)都有一個(gè)并行執(zhí)行度,在創(chuàng)建Topology時(shí)可以進(jìn)行指定,Storm會在集群內(nèi)分配對應(yīng)并行度個(gè)數(shù)的線程來同時(shí)執(zhí)行這一組件。
那么,有一個(gè)問題:既然對于一個(gè)Spout或Bolt,都會有多個(gè)task線程來運(yùn)行,那么如何在兩個(gè)組件(Spout和Bolt)之間發(fā)送tuple元組呢?
Storm提供了若干種數(shù)據(jù)流分發(fā)(Stream Grouping)策略用來解決這一問題。在Topology定義時(shí),需要為每個(gè)Bolt指定接收什么樣的Stream作為其輸入(注:Spout并不需要接收Stream,只會發(fā)射Stream)。
目前Storm中提供了以下7種Stream Grouping策略:Shuffle Grouping、Fields Grouping、All Grouping、Global Grouping、Non Grouping、Direct Grouping、Local or shuffle grouping,具體策略可以參考 這里 。
一種Storm不能支持的場景
以上介紹了一些Storm中的基本概念,可以看出, Storm中Stream的概念是Topology內(nèi)唯一的,只能在Topology內(nèi)按照“發(fā)布-訂閱”方式在不同的計(jì)算組件(Spout和Bolt)之間進(jìn)行數(shù)據(jù)的流動,而Stream在Topology之間是無法流動的 。
這一點(diǎn)限制了Storm在一些場景下的應(yīng)用,下面通過一個(gè)簡單的實(shí)例來說明。
假設(shè)現(xiàn)在有一個(gè)Topology1的結(jié)構(gòu)如下:通過Spout產(chǎn)生數(shù)據(jù)流后,依次需要經(jīng)過Filter Bolt,Join Bolt,Business1 Bolt。其中,F(xiàn)ilter Bolt用于對數(shù)據(jù)進(jìn)行過濾,Join Bolt用于數(shù)據(jù)流的聚合,Business1 Bolt用于進(jìn)行一個(gè)實(shí)際業(yè)務(wù)的計(jì)算邏輯。
目前這個(gè)Topology1已經(jīng)被提交到Storm集群運(yùn)行,而現(xiàn)在我們又有了新的需求,需要計(jì)算一個(gè)新的業(yè)務(wù)邏輯,而這個(gè)Topology的特點(diǎn)是和Topology1公用同樣的數(shù)據(jù)源,而且前期的預(yù)處理過程完全一樣(依次經(jīng)歷Filter Bolt和Join Bolt),那么這時(shí)候Storm怎么來滿足這一需求?據(jù)個(gè)人了解,有以下幾種“曲折”的實(shí)現(xiàn)方式:
1) 第一種方式: 首先kill掉已經(jīng)在集群中運(yùn)行的Topology1計(jì)算任務(wù),然后實(shí)現(xiàn)Business2 Bolt的計(jì)算邏輯,并重新打包形成一個(gè)新的Topology計(jì)算任務(wù)jar包后,提交到Storm集群中重新運(yùn)行,這時(shí)候Storm內(nèi)的整體Topology結(jié)構(gòu)如下:
這種方式的缺點(diǎn)在于:由于要重啟Topology,所以如果Spout或Bolt有狀態(tài)則會丟失掉;同時(shí)由于Topology結(jié)構(gòu)發(fā)生了變化,因此重新運(yùn)行Topology前需要對程序的穩(wěn)定性、正確性進(jìn)行驗(yàn)證;另外Topology結(jié)構(gòu)的變化也會帶來額外的運(yùn)維開銷。
2 ) 第二種方式: 完全開發(fā)部署一套新的Topology,其中前面的公共部分的Spout和Bolt可以直接復(fù)用,只需要重新開發(fā)新的計(jì)算邏輯Business2 Bolt來替換原有的Business1 Bolt即可。然后重新提交新的Topology運(yùn)行。這時(shí)候Storm內(nèi)的整體Topology結(jié)構(gòu)如下:
這種方式的缺點(diǎn)在于:由于兩個(gè)Topology都會從External Data Source讀取同一份數(shù)據(jù),無疑增加了External Data Source的負(fù)載壓力;而且會導(dǎo)致同樣的數(shù)據(jù)在Storm集群內(nèi)被傳輸相同的兩份,被同樣的計(jì)算單元Bolt進(jìn)行處理,浪費(fèi)了Storm的計(jì)算資源和網(wǎng)絡(luò)傳輸帶寬。假設(shè)現(xiàn)在不止有兩個(gè)這樣的Topology計(jì)算任務(wù),而是有N個(gè),那么對Storm的計(jì)算Slot的浪費(fèi)很嚴(yán)重。
注意:上述兩種方式還有一個(gè)公共的缺點(diǎn)——系統(tǒng)可擴(kuò)展性不好,這意味著不管哪種方式,只要以后有這種新增業(yè)務(wù)邏輯的需求,都需要進(jìn)行復(fù)雜的人工操作或線性的資源浪費(fèi)現(xiàn)象。
3) 第三種方式: OK,看了以上兩種方式后,也許你會提出下面的解決方案:通過Kafka這樣的消息中間件,實(shí)現(xiàn)不同Topology的Spout共享數(shù)據(jù)源,而且這樣可以做到消息可靠傳輸、消息rewind回傳等,好處是對于Storm來說,已經(jīng)有了 storm-kafka 插件的支持。這時(shí)候Storm內(nèi)的整體Topology結(jié)構(gòu)如下:
這種實(shí)現(xiàn)方式可以通過引入一層消息中間件減少對External Data Source的重復(fù)訪問的壓力,而且可以通過消息中間件層,屏蔽掉External Data Source的細(xì)節(jié),如果需要擴(kuò)展新的業(yè)務(wù)邏輯,只需要重新部署運(yùn)行新的Topology,應(yīng)該說是現(xiàn)有 Storm版本 下很好的實(shí)現(xiàn)方式了。不過消息中間件的引入,無疑將給系統(tǒng)帶來了一定的復(fù)雜性,這對于Storm上的應(yīng)用開發(fā)來說提高了門檻。
值得注意的是,方案三中仍遺留有一點(diǎn)問題沒有解決:對于Storm集群來說,這種方式還是沒有能夠從根本上避免數(shù)據(jù)在Storm不同Topology內(nèi)的重復(fù)發(fā)送與處理。這是由于Storm的數(shù)據(jù)流模型上的限制所導(dǎo)致的,如果Storm實(shí)現(xiàn)了不同Topology之間Stream的共享,那么這一問題也就迎刃而解了。
一個(gè)流處理系統(tǒng)的數(shù)據(jù)流模型
個(gè)人工作中有幸參與過一個(gè)流處理框架的開發(fā)與應(yīng)用。下面我們來簡單看看其中所采用的數(shù)據(jù)流模型:
其中:
1) 數(shù)據(jù)流(data stream) :時(shí)間分布和數(shù)量上無限的一系列數(shù)據(jù)記錄的集合體;
2) 數(shù)據(jù)記錄(data record) :數(shù)據(jù)流的最小組成單元,每條數(shù)據(jù)記錄包括 3 類數(shù)據(jù):所屬數(shù)據(jù)流名稱(stream name)、用于路由的數(shù)據(jù)(keys)和具體數(shù)據(jù)處理邏輯所需的數(shù)據(jù)(value);
3) 數(shù)據(jù)處理任務(wù)定義(task definition) :定義一個(gè)數(shù)據(jù)處理任務(wù)的基本屬性,無法直接被執(zhí)行,必須特化為具體的任務(wù)實(shí)例。其基本屬性包括:
- (可選)輸入流(input stream):描述該任務(wù)依賴哪些數(shù)據(jù)流作為輸入,是一個(gè)數(shù)據(jù)流名稱列表;數(shù)據(jù)流產(chǎn)生源不會依賴其他數(shù)據(jù)流,可忽略該配置;
- 數(shù)據(jù)處理邏輯(process logic):描述該任務(wù)具體的處理邏輯,例如由獨(dú)立進(jìn)程進(jìn)行的外部處理邏輯;
- (可選)輸出流(output stream):描述該任務(wù)產(chǎn)生哪個(gè)數(shù)據(jù)流,是一個(gè)數(shù)據(jù)流名稱;數(shù)據(jù)流處理鏈末級任務(wù)不會產(chǎn)生新的數(shù)據(jù)流,可忽略該配置;
4) 數(shù)據(jù)處理任務(wù)實(shí)例(task instance) :對一個(gè)數(shù)據(jù)處理任務(wù)定義進(jìn)行具體約束后,可推送到某個(gè)處理結(jié)點(diǎn)上運(yùn)行的邏輯實(shí)體。附加下列屬性:
- 數(shù)據(jù)處理任務(wù)定義:指向該任務(wù)實(shí)例對應(yīng)的數(shù)據(jù)處理任務(wù)定義實(shí)體;
- 輸入流過濾條件(input filting condition):一個(gè) boolean 表達(dá)式列表,描述每個(gè)輸入流中符合什么條件的數(shù)據(jù)記錄可以作為有效數(shù)據(jù)交給處理邏輯;若某個(gè)輸入流中所有數(shù)據(jù)記錄都是有效數(shù)據(jù),則可直接用 true 表示;
- (可選)強(qiáng)制輸出周期(output interval):描述以什么頻率強(qiáng)制該任務(wù)實(shí)例產(chǎn)生輸出流記錄,可以用輸入流記錄個(gè)數(shù)或間隔時(shí)間作為周期;忽略該配置時(shí),輸出流記錄產(chǎn)生周期完全由處理邏輯自身決定,不受框架約束;
5) 數(shù)據(jù)處理結(jié)點(diǎn)(node) :可容納多個(gè)數(shù)據(jù)處理任務(wù)實(shí)例運(yùn)行的實(shí)體機(jī)器,每個(gè)數(shù)據(jù)處理結(jié)點(diǎn)的IPv4地址必須保證唯一。
該分布式流處理系統(tǒng)由多個(gè)數(shù)據(jù)處理結(jié)點(diǎn)(node)組成;每個(gè)數(shù)據(jù)處理結(jié)點(diǎn)(node)上運(yùn)行有多個(gè)數(shù)據(jù)任務(wù)實(shí)例(task instance);每個(gè)數(shù)據(jù)任務(wù)實(shí)例(task instance)屬于一個(gè)數(shù)據(jù)任務(wù)定義(task definition),任務(wù)實(shí)例是在任務(wù)定義的基礎(chǔ)上,添加了輸入流過濾條件和強(qiáng)制輸出周期屬性后,可實(shí)際推送到數(shù)據(jù)處理結(jié)點(diǎn)(node)上運(yùn)行的邏輯實(shí)體;數(shù)據(jù)任務(wù)定義(task definition)包含輸入數(shù)據(jù)流、數(shù)據(jù)處理邏輯以及輸出數(shù)據(jù)流屬性。
該系統(tǒng)中,通過分布式應(yīng)用程序協(xié)調(diào)服務(wù)ZooKeeper集群存儲以上數(shù)據(jù)流模型中的所有配置信息;不同的數(shù)據(jù)處理節(jié)點(diǎn)統(tǒng)一通過ZooKeeper集群獲取數(shù)據(jù)流的配置信息后進(jìn)行任務(wù)實(shí)例的運(yùn)行與停止、數(shù)據(jù)流的流入和流出。
同時(shí),每個(gè)數(shù)據(jù)處理任務(wù)可以接受流系統(tǒng)中已存在的任意數(shù)據(jù)流(data stream)作為輸入,并產(chǎn)出新的任意名稱的數(shù)據(jù)流(data stream),被其他結(jié)點(diǎn)上運(yùn)行的任務(wù)實(shí)例訂閱。不同結(jié)點(diǎn)之間對于各個(gè)數(shù)據(jù)流(data stream)的訂閱關(guān)系,通過ZooKeeper集群來動態(tài)感知并負(fù)責(zé)通知流系統(tǒng)做出變化。
二者在數(shù)據(jù)流模型上的不同之處
至于兩個(gè)系統(tǒng)的實(shí)現(xiàn)細(xì)節(jié),我們先不去做具體比較,下面僅列出二者在數(shù)據(jù)流模型上的一些不同之處(這里并不是為了全面對比二者的不同之處,只是列出其中的關(guān)鍵部分):
1) 在Storm中,數(shù)據(jù)流Stream是在Topology內(nèi)進(jìn)行定義,并在Topology內(nèi)進(jìn)行傳輸?shù)?;而在上面提到的流處理系統(tǒng)中,數(shù)據(jù)流Stream是在整個(gè)系統(tǒng)內(nèi)全局唯一的,可以在整個(gè)集群內(nèi)被訂閱。
2) 在Storm中,數(shù)據(jù)流Stream的發(fā)布和訂閱都是靜態(tài)的,所謂靜態(tài)是指數(shù)據(jù)流的發(fā)布與訂閱關(guān)系在向Storm集群提交Topology計(jì)算任務(wù)時(shí),被一次性生成的,這一關(guān)系在Topology的運(yùn)行過程中是不能被改變的;而在上面提到的流處理系統(tǒng)中,數(shù)據(jù)流Stream的發(fā)布和訂閱都是動態(tài)的,即數(shù)據(jù)處理任務(wù)task可以動態(tài)的發(fā)布Stream,也可以動態(tài)的訂閱系統(tǒng)內(nèi)已經(jīng)生成的任意Stream,數(shù)據(jù)流的訂閱關(guān)于通過分布式應(yīng)用程序協(xié)調(diào)服務(wù)ZooKeeper集群的動態(tài)節(jié)點(diǎn)來維護(hù)管理。
有了以上的對比,我們不難發(fā)現(xiàn),對于本文所舉的應(yīng)用場景實(shí)例,Storm的數(shù)據(jù)流模式尚不能很方便的支持,而在這里提到的這個(gè)流處理系統(tǒng)的全局?jǐn)?shù)據(jù)流模型下,這一應(yīng)用場景的需求可以很方便的滿足。
總結(jié)的話
個(gè)人覺得,Storm有必要實(shí)現(xiàn)不同Topology之間Stream的共享,這個(gè)至少可以在不損失Storm現(xiàn)有功能的前提下,使得Storm在處理實(shí)際生產(chǎn)環(huán)境下的一些應(yīng)用場景時(shí)更加從容應(yīng)對。
至于如何在現(xiàn)有Storm的基礎(chǔ)上實(shí)現(xiàn)這一需求,可能的方式很多。一種簡單的方式是通過Zookeeper來集中存儲、動態(tài)感知Topology之間Stream的“發(fā)布-訂閱”關(guān)系,同時(shí)在Storm的消息分發(fā)過程中對這種情況加以處理。
以上觀點(diǎn),如果不對之處,歡迎大家指出。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
