日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

示例:python模擬日志生成+Flume+Kafka+Spark

系統(tǒng) 1998 0

生成模擬數(shù)據(jù)

  1. 編寫(xiě) generate_log.py
            
              
                #coding=UTF-8
              
              
                import
              
               random

              
                import
              
               time

url_paths
              
                =
              
              
                [
              
              
                "class/112.html"
              
              
                ,
              
              
                "class/128.html"
              
              
                ,
              
              
                "class/145.html"
              
              
                ,
              
              
                "class/130.html"
              
              
                ,
              
              
                "class/146.html"
              
              
                ,
              
              
                "class/131.html"
              
              
                ,
              
              
                "learn/821"
              
              
                ,
              
              
                "course/list"
              
              
                ]
              
              

ip_slices
              
                =
              
              
                [
              
              
                132
              
              
                ,
              
              
                156
              
              
                ,
              
              
                124
              
              
                ,
              
              
                10
              
              
                ,
              
              
                29
              
              
                ,
              
              
                167
              
              
                ,
              
              
                143
              
              
                ,
              
              
                187
              
              
                ,
              
              
                30
              
              
                ,
              
              
                46
              
              
                ,
              
              
                55
              
              
                ,
              
              
                63
              
              
                ,
              
              
                72
              
              
                ,
              
              
                87
              
              
                ,
              
              
                98
              
              
                ,
              
              
                168
              
              
                ]
              
              

http_referers
              
                =
              
              
                [
              
              
                "https://www.baidu.com/s?wd={query}"
              
              
                ,
              
              
                "https://www.sogou.com/web?query={query}"
              
              
                ,
              
              
                "https://cn.bing.com/search?q={query}"
              
              
                ,
              
              
                "https://www.so.com/s?q={query}"
              
              
                ]
              
              

search_keyword
              
                =
              
              
                [
              
              
                "spark sql實(shí)戰(zhàn)"
              
              
                ,
              
              
                "hadoop 基礎(chǔ)"
              
              
                ,
              
              
                "storm實(shí)戰(zhàn)"
              
              
                ,
              
              
                "spark streaming實(shí)戰(zhàn)"
              
              
                ]
              
              

status_code
              
                =
              
              
                [
              
              
                "200"
              
              
                ,
              
              
                "404"
              
              
                ,
              
              
                "500"
              
              
                ]
              
              
                def
              
              
                sample_status_code
              
              
                (
              
              
                )
              
              
                :
              
              
                return
              
               random
              
                .
              
              sample
              
                (
              
              status_code
              
                ,
              
              
                1
              
              
                )
              
              
                [
              
              
                0
              
              
                ]
              
              
                def
              
              
                sample_referer
              
              
                (
              
              
                )
              
              
                :
              
              
                if
              
               random
              
                .
              
              uniform
              
                (
              
              
                0
              
              
                ,
              
              
                1
              
              
                )
              
              
                >
              
              
                0.2
              
              
                :
              
              
                return
              
              
                "-"
              
              
    refer_str
              
                =
              
              random
              
                .
              
              sample
              
                (
              
              http_referers
              
                ,
              
              
                1
              
              
                )
              
              
    query_str
              
                =
              
              random
              
                .
              
              sample
              
                (
              
              search_keyword
              
                ,
              
              
                1
              
              
                )
              
              
                return
              
               refer_str
              
                [
              
              
                0
              
              
                ]
              
              
                .
              
              
                format
              
              
                (
              
              query
              
                =
              
              query_str
              
                [
              
              
                0
              
              
                ]
              
              
                )
              
              
                def
              
              
                sample_url
              
              
                (
              
              
                )
              
              
                :
              
              
                return
              
               random
              
                .
              
              sample
              
                (
              
              url_paths
              
                ,
              
              
                1
              
              
                )
              
              
                [
              
              
                0
              
              
                ]
              
              
                def
              
              
                sample_ip
              
              
                (
              
              
                )
              
              
                :
              
              
                slice
              
              
                =
              
              random
              
                .
              
              sample
              
                (
              
              ip_slices
              
                ,
              
              
                4
              
              
                )
              
              
                return
              
              
                "."
              
              
                .
              
              join
              
                (
              
              
                [
              
              
                str
              
              
                (
              
              item
              
                )
              
              
                for
              
               item 
              
                in
              
              
                slice
              
              
                ]
              
              
                )
              
              
                def
              
              
                generate_log
              
              
                (
              
              count
              
                =
              
              
                10
              
              
                )
              
              
                :
              
              
    time_str
              
                =
              
              time
              
                .
              
              strftime
              
                (
              
              
                "%Y-%m-%d %H:%M:%S"
              
              
                ,
              
              time
              
                .
              
              localtime
              
                (
              
              
                )
              
              
                )
              
              

    f
              
                =
              
              
                open
              
              
                (
              
              
                "C:/Users/DaiRenLong/Desktop/streaming_access.log"
              
              
                ,
              
              
                "w+"
              
              
                )
              
              
                while
              
               count 
              
                >=
              
              
                1
              
              
                :
              
              
        query_log
              
                =
              
              
                "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{refer}"
              
              
                .
              
              
                format
              
              
                (
              
              url
              
                =
              
              sample_url
              
                (
              
              
                )
              
              
                ,
              
              ip
              
                =
              
              sample_ip
              
                (
              
              
                )
              
              
                ,
              
              refer
              
                =
              
              sample_referer
              
                (
              
              
                )
              
              
                ,
              
              status_code
              
                =
              
              sample_status_code
              
                (
              
              
                )
              
              
                ,
              
              local_time
              
                =
              
              time_str
              
                )
              
              
                print
              
              
                (
              
              query_log
              
                )
              
              
        f
              
                .
              
              write
              
                (
              
              query_log
              
                +
              
              
                "\n"
              
              
                )
              
              
        count
              
                =
              
              count
              
                -
              
              
                1
              
              
                if
              
               __name__ 
              
                ==
              
              
                '__main__'
              
              
                :
              
              
                # 每一分鐘生成一次日志信息
              
              
                while
              
              
                True
              
              
                :
              
              
        generate_log
              
                (
              
              
                )
              
              
        time
              
                .
              
              sleep
              
                (
              
              
                60
              
              
                )
              
            
          
  1. 日志文件對(duì)接flume==>kafka
    Flume配置文件: https://blog.csdn.net/drl_blogs/article/details/95192574#execkafkaconf_1
  2. 運(yùn)行flume
            
              flume-ng agent \
--name exec-memory-logger \
--conf conf 
              
                $FLUME_HOME
              
              /conf \
--conf-file 
              
                $FLUME_HOME
              
              /conf/streaming_project.conf \
-Dflume.root.logger
              
                =
              
              INFO,console 
              
                &
              
            
          
  1. 運(yùn)行kafka消費(fèi)者
            
               kafka-console-consumer.sh \
 --zookeeper hadoop01:2181 \
 --topic kafka_streaming_topic

            
          
  1. 運(yùn)行python文件測(cè)試
            
               python generate_log.py

            
          
  1. 查看kafka消費(fèi)者消費(fèi)者是否有信息

  2. 編寫(xiě)代碼打通通道

            
              
                import
              
               org
              
                .
              
              apache
              
                .
              
              log4j
              
                .
              
              
                {
              
              Level
              
                ,
              
               Logger
              
                }
              
              
                import
              
               org
              
                .
              
              apache
              
                .
              
              spark
              
                .
              
              SparkConf

              
                import
              
               org
              
                .
              
              apache
              
                .
              
              spark
              
                .
              
              streaming
              
                .
              
              kafka
              
                .
              
              KafkaUtils

              
                import
              
               org
              
                .
              
              apache
              
                .
              
              spark
              
                .
              
              streaming
              
                .
              
              
                {
              
              Seconds
              
                ,
              
               StreamingContext
              
                }
              
              

object kafka_Receiver_streaming 
              
                {
              
              
  Logger
              
                .
              
              
                getLogger
              
              
                (
              
              
                "org"
              
              
                )
              
              
                .
              
              
                setLevel
              
              
                (
              
              Level
              
                .
              
              WARN
              
                )
              
              
  def 
              
                main
              
              
                (
              
              args
              
                :
              
               Array
              
                [
              
              String
              
                ]
              
              
                )
              
              
                :
              
               Unit 
              
                =
              
              
                {
              
              
    val sparkConf 
              
                =
              
              
                new
              
              
                SparkConf
              
              
                (
              
              
                )
              
              
                .
              
              
                setAppName
              
              
                (
              
              
                "kafka_Receiver_streaming"
              
              
                )
              
              
                .
              
              
                setMaster
              
              
                (
              
              
                "local[*]"
              
              
                )
              
              
                .
              
              
                set
              
              
                (
              
              
                "spark.port.maxRetries"
              
              
                ,
              
              
                "100"
              
              
                )
              
              

    val ssc 
              
                =
              
              
                new
              
              
                StreamingContext
              
              
                (
              
              sparkConf
              
                ,
              
              
                Seconds
              
              
                (
              
              
                60
              
              
                )
              
              
                )
              
              

    val messages 
              
                =
              
               KafkaUtils
              
                .
              
              
                createStream
              
              
                (
              
              ssc
              
                ,
              
              
                "hadoop01:2181"
              
              
                ,
              
              
                "test"
              
              
                ,
              
              
                Map
              
              
                (
              
              
                "kafka_streaming_topic"
              
              
                -
              
              
                >
              
              
                1
              
              
                )
              
              
                )
              
              
    messages
              
                .
              
              
                map
              
              
                (
              
              _
              
                .
              
              _2
              
                )
              
              
                .
              
              
                count
              
              
                (
              
              
                )
              
              
                .
              
              
                print
              
              
                (
              
              
                )
              
              

    ssc
              
                .
              
              
                start
              
              
                (
              
              
                )
              
              
    ssc
              
                .
              
              
                awaitTermination
              
              
                (
              
              
                )
              
              
                }
              
              
                }
              
            
          
  1. 運(yùn)行代碼查看結(jié)果

更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 肥西县| 平原县| 陈巴尔虎旗| 舒兰市| 乐山市| 嘉祥县| 个旧市| 阆中市| 平塘县| 宜都市| 新余市| 山阴县| 神木县| 龙岩市| 宜黄县| 乌审旗| 西充县| 报价| 胶州市| 阿拉尔市| 古丈县| 绥芬河市| 南召县| 陵水| 林西县| 宁武县| 六安市| 绵竹市| 佛学| 华阴市| 和政县| 历史| 游戏| 马尔康县| 庆城县| 银川市| 沾化县| 长治县| 乌海市| 定边县| 滨海县|