• <noscript id="e0iig"><kbd id="e0iig"></kbd></noscript>
  • <td id="e0iig"></td>
  • <option id="e0iig"></option>
  • <noscript id="e0iig"><source id="e0iig"></source></noscript>
  • Spark Streaming處理文件(本地文件以及hdfs上面的文件)

    標簽: Dstream  Spark Streaming

    標題介紹文件流之前先介紹一下Dstream

    下面是來自官網一段的說明,Discretized Streams或DStream是Spark Streaming提供的基本抽象。它表示連續的數據流,可以是從源接收的輸入數據流,也可以是通過轉換輸入流生成的已處理數據流。在內部,DStream由一系列連續的RDD表示,這是Spark對不可變的分布式數據集的抽象(有關更多詳細信息,請參見Spark編程指南)。DStream中的每個RDD都包含來自特定間隔的數據,如下圖所示。
    在這里插入圖片描述
    在DStream上執行的任何操作都轉換為對基礎RDD的操作。例如,在較早的將行流轉換為單詞的示例中,將flatMap操作應用于linesDStream中的每個RDD 以生成DStream的 wordsRDD。如下圖所示。
    在這里插入圖片描述

    要從與HDFS API兼容的任何文件系統(即HDFS,S3,NFS等)上的文件中讀取數據,可以通過創建DStream StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]

    文件流不需要運行接收器,因此無需分配任何內核來接收文件數據。

    對于簡單的文本文件,最簡單的方法是StreamingContext.textFileStream(dataDirectory)

    在本地運行Spark Streaming程序時,請勿使用“ local”或“ local [1]”作為主URL。這兩種方式均意味著僅一個線程將用于本地運行任務。如果您使用的是基于接收器的輸入DStream(例如套接字,Kafka,Flume等),則將使用單個線程來運行接收器,而不會留下任何線程來處理接收到的數據。因此,在本地運行時,請始終使用“ local [ n ]”作為主URL,其中n >要運行的接收者數(有關如何設置主服務器的信息,請參見Spark屬性)。

    為了將邏輯擴展到在集群上運行,分配給Spark Streaming應用程序的內核數必須大于接收器數。否則,系統將接收數據,但無法處理它。

    下面代碼開發本地文件和hdfs的文件流

    package com.zgw.spark.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * Created by Zhaogw&Lss on 2019/10/21.
      */
    object SparkStream02 {
    
        def main(args: Array[String]): Unit = {
          var sparkConf =new SparkConf().setMaster("local[*]").setAppName("SparkStream02").set("spark.testing.memory", "2147480000")
    
          //分析環境對象以及采集周期
          val streamContext = new StreamingContext(sparkConf,Seconds(10))
    
          val inputFile = "hdfs://192.168.181.128:8020/spark/"
    
          //文件流
          /*val fileStreamLine: DStream[String] = streamContext.textFileStream("file:///E:/test")*/
          val fileStreamLine: DStream[String] = streamContext.textFileStream(inputFile)
          //將采集數據進行分解
          val dStream: DStream[String] = fileStreamLine.flatMap(line => line.split(" "))
    
          //將數據進行結構轉變
          val map: DStream[(String, Int)] = dStream.map((_,1))
          //聚合處理
          val key: DStream[(String, Int)] = map.reduceByKey(_+_)
          //結果打印
          key.print()
          //啟動采集器
          streamContext.start()
          //等待采集器執行
          streamContext.awaitTermination()
    
    
        }
    
    
    }
    
    

    在這里插入圖片描述
    在這里插入圖片描述
    Spark打印出文件流的信息,這里有幾個要注意的點,一是寫hdfs文件路徑時要注意fs.defaultFS(在core-xml的配置)是要能在外面訪問通的
    在這里插入圖片描述
    在這里插入圖片描述

    telnet 192.168.181.128 8020      能通,不然會報連接拒絕的錯
    

    監控規則:

    • 可以監視一個簡單目錄,例如"hdfs://namenode:8040/logs/"。發現后,將直接處理該路徑下的所有文件

    • A POSIX glob pattern can be supplied, such as "hdfs://namenode:8040/logs/2017/*". Here, the DStream will consist of all files in the directories matching the pattern. That is: it is a pattern of directories, not of files in directories.

    • 所有文件必須使用相同的數據格式。

    • 根據文件的修改時間而非創建時間,將其視為時間段的一部分。

    • 處理后,在當前窗口中對文件的更改將不會導致重新讀取該文件。也就是說:忽略更新。

    • 目錄下的文件越多,掃描更改所需的時間就越長-即使未修改任何文件。

    • 如果使用通配符來標識目錄(例如)“hdfs://namenode:8040/logs/2016-*”,則重命名整個目錄以匹配路徑會將目錄添加到受監視目錄列表 中。流中僅包含目錄中修改時間在當前窗口內的文件。

    • 調用FileSystem.setTimes() 修復時間戳是一種在以后的窗口中拾取文件的方法,即使其內容沒有更改。

    完整代碼托管于https://github.com/daizikaikou/learningSpark

    版權聲明:本文為qq_35885488原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
    本文鏈接:https://blog.csdn.net/qq_35885488/article/details/102681368

    智能推薦

    spark:讀取不了本地文件,Spark默認讀取(當前部署環境)HDFS文件系統

     指定讀取文件的目錄為:  報錯: 若是指定文件路徑為 “file://…”,則讀取的是本地目錄; 未指定默認為HDFS文件系統 修改為: 執行命令為:  則不報錯。 運行結果為:    ...

    Spark 2.2.1 處理HDFS文件數據源的案例與解讀

    Spark 2.2.1 處理HDFS文件數據源的案例與解讀 上一節講解了從Sockets獲取數據,Spark StreamingContext API還提供了從其他基礎數據源創建DStream實例的方法,這里以文件數據源作為例子,對文件流進行處理。 在企業中可引入類似Flume的工具收集數據,企業從各種數據源獲取數據,存入某個文件存儲系統(一般使用HDFS)。例如將從Flume數據源收集來的日志文...

    搭建以hdfs為文件存儲庫的spark集群

     1,搭建啟動hadoop HA集群,之前文章有詳細的搭建過程。 啟動HDFS(在weekend01上執行) sbin/start-dfs.sh 找一個文件上傳到自定義hdfs目錄下面 啟動spark shell 交互命令行(在weekend02 啟動) bin/spark-shell --master spark://weekend02:7077 --executor-memory 5...

    Spark刪除HDFS文件的兩種方式

    github:https://github.com/jpegbert/SparkCoding/tree/master/src/main/scala/delete_hdfs_file  ...

    hive 導出數據之一列多行,轉為一行多列

    需求:提取數據 說明:原數據是一列多行,需要轉化為一行多列 待查詢表為:temp_05 待查詢數據為: 待查詢數據如圖: 需要提取的數據表頭如下: 預定日期 昨日價格 前天價格 2018-02-01 2018-02-02 2018-02-03 2018-02-04 可用提數 SQL 數據如圖: 以下為嘗試過程 數據如圖: 數據如圖: 數據如圖: 數據如圖:...

    猜你喜歡

    asp.net做一個簡易的聊天室

    要求: 結果: 關鍵代碼: Default.aspx Default.aspx.cs Default2.aspx Default2.aspx.cs Default3.aspx Default3.aspx.cs Default4.aspx...

    動態SQL和多表關聯-筆記

    《動態SQL與多表關聯》筆記 學習目標 能夠使用動態SQL完成SQL拼接 能夠使用resultMap完成多表查詢 能夠使用一對一查詢 能夠使用一對多查詢 (注:多對多其實就是兩個一個多) 映射文件:為什么要resultMap 目標 定義結果映射 使用結果映射 回顧 在mybatis中有2種配置文件: 核心配置文件,如:sqlMapConfig.xml 實體類映射文件,如:UserMapper.xm...

    【OpenGL C++ UE4】獲取模型頂點及面索引數據,并優化存儲結構供UE4繪制

    目錄 一、功能需求 二、成果 三、環境配置 四、詳細步驟 4.1 Max制作三棱錐并處理 4.2 核心代碼 4.2.1 傳入結構體數據 4.2.2 頂點去重、更新索引 4.2.3 輸出本地CSV文件 4.3 UE4繪制 一、功能需求 想必你肯定會問我一個問題,UE4直接導入模型不好么? 哈哈,前提是在做畢設時,導師提供的只有頂點與面索引數據,沒有模型。 下文詳細介紹了畢設開發中的難點,涉...

    解決Pyinstaller打包numpy和pandas庫文件過大問題

    解決Pyinstaller壓縮numpy和pandas庫文件過大問題 文件包類型和網上的方法 Windows下docker的安裝 在docker下實現打包     今天是2021年的第一天,先祝各位小伙伴現年快樂哈。最近因為做了一個項目,需要打包文件,文件中包含了numpy和pandas庫,結果打包出來幾百行的代碼居然要900m,人都傻了,翻遍了全網找解決方...

    精品国产乱码久久久久久蜜桃不卡