日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

Spark學習實例(Python):窗口操作 Window

系統(tǒng) 1773 0

說到流處理,Spark為我們提供了窗口函數(shù),允許在滑動數(shù)據窗口上應用轉換,常用場景如每五分鐘商場人流密度、每分鐘流量等等,接下來我們通過畫圖來了解Spark Streaming的窗口函數(shù)如何工作的,處理過程圖如下所示:

Spark學習實例(Python):窗口操作 Window Operations_第1張圖片

上圖中綠色的小框框是一批一批的數(shù)據流,虛線框和實線框分別是前一個窗口和后一個窗口,從圖中可以看出后一個窗口在前一個窗口基礎上移動了兩個批次的數(shù)據流,而我們真正通過算子操作的數(shù)據其實就是窗口內所有的數(shù)據流。

在代碼實現(xiàn)前了解下窗口操作常用的函數(shù)有:

  • window
  • countByWindow
  • reduceByWindow
  • reduceByKeyAndWindow
  • reduceByKeyAndWindow
  • countByValueAndWindow

window最原始的窗口,提供兩個參數(shù),第一個參數(shù)是窗口長度,第二個參數(shù)是滑動間隔,返回一個新的DStream, 返回的結果可以進行算子操作,代碼實現(xiàn)如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數(shù)指統(tǒng)計多長時間的數(shù)據
    ssc = StreamingContext(sc, 5)
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數(shù)是窗口長度,這里是60秒, 第二個參數(shù)是滑動間隔,這里是10秒
    dstream = lines.window(60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-13 19:46:45
    # -------------------------------------------
    # hello
    # world
    ssc.start()
    ssc.awaitTermination()
            
          

現(xiàn)在終端使用nc發(fā)送數(shù)據

root@root:~$ nc -lk 9999
hello
world

countByWindow統(tǒng)計每個滑動窗口內數(shù)據條數(shù),要注意的是使用該函數(shù)要加上checkpoint機制,代碼實現(xiàn)如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數(shù)指統(tǒng)計多長時間的數(shù)據
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/tmp/window")
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數(shù)是窗口長度,這里是60秒, 第二個參數(shù)是滑動間隔,這里是10秒
    dstream = lines.countByWindow(60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-14 18:56:40
    # -------------------------------------------
    # 2
    ssc.start()
    ssc.awaitTermination()
            
          

reduceByWindow聚合每個鍵的值,底層執(zhí)行的是reduceByKeyAndWindow,實現(xiàn)代碼如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def fun(x):
    return x

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數(shù)指統(tǒng)計多長時間的數(shù)據
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/tmp/window")
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數(shù)執(zhí)行指定函數(shù), 第二個參數(shù)是窗口長度,這里是60秒, 第三個參數(shù)是滑動間隔,這里是10秒
    dstream = lines.reduceByWindow(fun, 60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-14 19:23:30
    # -------------------------------------------
    # hello
    ssc.start()
    ssc.awaitTermination()
            
          

reduceByKeyAndWindow是對(K,V)窗口數(shù)據相同的K執(zhí)行對應的fun,實現(xiàn)代碼如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def fun(x,y):
    return x+y

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數(shù)指統(tǒng)計多長時間的數(shù)據
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/tmp/window")
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數(shù)執(zhí)行的功能函數(shù)fun, 第二個參數(shù)是窗口長度,這里是60秒, 第三個參數(shù)是滑動間隔,這里是10秒,
    # 第四個參數(shù)設定并行度
    dstream = lines.map(lambda x:(x,1)).reduceByKeyAndWindow(fun, 60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-14 19:23:30
    # -------------------------------------------
    # ('hello', 2)
    ssc.start()
    ssc.awaitTermination()
            
          

countByValueAndWindow是對窗口數(shù)據進行單詞統(tǒng)計,實現(xiàn)代碼如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數(shù)指統(tǒng)計多長時間的數(shù)據
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/tmp/window")
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數(shù)是窗口長度,這里是60秒, 第二個參數(shù)是滑動間隔,這里是10秒, 第三個參數(shù)任務并行度
    dstream = lines.countByValueAndWindow(60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-14 19:23:30
    # -------------------------------------------
    # ('hello', 3)
    # ('world', 1)
    ssc.start()
    ssc.awaitTermination()
            
          

以上就是所有窗口函數(shù)的使用

?

Spark學習目錄:

  • Spark學習實例1(Python):單詞統(tǒng)計 Word Count
  • Spark學習實例2(Python):加載數(shù)據源Load Data Source
  • Spark學習實例3(Python):保存數(shù)據Save Data
  • Spark學習實例4(Python):RDD轉換 Transformations
  • Spark學習實例5(Python):RDD執(zhí)行 Actions
  • Spark學習實例6(Python):共享變量Shared Variables
  • Spark學習實例7(Python):RDD、DataFrame、DataSet相互轉換
  • Spark學習實例8(Python):輸入源實時處理 Input Sources Streaming
  • Spark學習實例9(Python):窗口操作 Window Operations

?

?


更多文章、技術交流、商務合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 库车县| 南木林县| 衢州市| 平武县| 罗田县| 鄯善县| 尚义县| 阿拉善右旗| 新竹县| 洛川县| 北海市| 宜丰县| 通化市| 莫力| 鹤山市| 化州市| 东阳市| 元阳县| 晋中市| 蒲江县| 前郭尔| 青州市| 观塘区| 永嘉县| 双峰县| 平谷区| 兴城市| 沾化县| 江油市| 昭通市| 伊宁县| 吴江市| 太湖县| 松江区| 青岛市| 太仆寺旗| 明水县| 白河县| 丰都县| 六枝特区| 华容县|