我的Spark學習筆記

一、架構設計

我的Spark學習筆記

文章插圖
  • Driver根據用戶代碼構建計算流圖,拆解出分布式任務并分發到 Executors 中去;每個Executors收到任務,然后處理這個 RDD 的一個數據分片子集
  • DAGScheduler根據用戶代碼構建 DAG;以 Shuffle 為邊界切割 Stages;基于 Stages 創建 TaskSets,并將 TaskSets 提交給 TaskScheduler 請求調度
  • TaskScheduler 在初始化的過程中,會創建任務調度隊列,任務調度隊列用于緩存 DAGScheduler 提交的 TaskSets 。TaskScheduler 結合 SchedulerBackend 提供的 WorkerOffer,按照預先設置的調度策略依次對隊列中的任務進行調度 , 也就是把任務分發給SchedulerBackend
  • SchedulerBackend 用一個叫做 ExecutorDataMap 的數據結構,來記錄每一個計算節點中 Executors 的資源狀態 。會與集群內所有 Executors 中的 ExecutorBackend 保持周期性通信 。SchedulerBackend收到TaskScheduler過來的任務,會把任務分發給ExecutorBackend去具體執行
  • ExecutorBackend收到任務后多線程執行(一個線程處理一個Task) 。處理完畢后反饋StatusUpdate給SchedulerBackend,再返回給TaskScheduler,最終給DAGScheduler

我的Spark學習筆記

文章插圖
二、常用算子2.1、RDD概念Spark 主要以一個 彈性分布式數據集_(RDD)的概念為中心,它是一個容錯且可以執行并行操作的元素的集合 。有兩種方法可以創建 RDD:在你的 driver program(驅動程序)中 _parallelizing 一個已存在的集合 , 或者在外部存儲系統中引用一個數據集,例如,一個共享文件系統,HDFS,HBase,或者提供 Hadoop InputFormat 的任何數據源 。
從內存創建RDDimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// 從內存創建RDDobject MakeRDDFromMemory {def main(args: Array[String]): Unit = {// 準備環境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")// 并行度,如果不設置則默認當前運行環境的最大可用核數sparkConf.set("spark.default.parallelism", "2")val sc = new SparkContext(sparkConf)// 從內存中創建RDD,將內存中集合的數據作為處理的數據源val seq = Seq[Int](1, 2, 3, 4, 5, 6)val rdd: RDD[Int] = sc.makeRDD(seq)rdd.collect().foreach(println)// numSlices表示分區的數量,不傳默認spark.default.parallelismval rdd2: RDD[Int] = sc.makeRDD(seq, 3)// 將處理的數據保存成分區文件rdd2.saveAsTextFile("output")sc.stop()}}從文件中創建RDDimport org.apache.spark.{SparkConf, SparkContext}// 從文件中創建RDD(本地文件、HDFS文件)object MakeRDDFromTextFile {def main(args: Array[String]): Unit = {// 準備環境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")val sc = new SparkContext(sparkConf)// 從文件中創建RDD , 將文件中的數據作為處理的數據源// path路徑默認以當前環境的根路徑為基準 ??梢詫懡^對路徑,也可以寫相對路徑//val rdd: RDD[String] = sc.textFile("datas/1.txt")// path路徑可以是文件的具體路徑 , 也可以目錄名稱//val rdd = sc.textFile("datas")// path路徑還可以使用通配符 *//val rdd = sc.textFile("datas/1*.txt")// path還可以是分布式存儲系統路徑:HDFSval rdd = sc.textFile("hdfs://localhost:8020/test.txt")rdd.collect().foreach(println)sc.stop()}}2.2、常用算子map算子:數據轉換import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// map算子object map {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4))// 轉換函數def mapFunction(num: Int): Int = {num * 2}// 多種方式如下//val mapRDD: RDD[Int] = rdd.map(mapFunction)//val mapRDD: RDD[Int] = rdd.map((num: Int) => {//num * 2//})//val mapRDD: RDD[Int] = rdd.map((num: Int) => num * 2)//val mapRDD: RDD[Int] = rdd.map((num) => num * 2)//val mapRDD: RDD[Int] = rdd.map(num => num * 2)val mapRDD: RDD[Int] = rdd.map(_ * 2)mapRDD.collect().foreach(println)sc.stop()}}mapPartitions算子:數據轉換(分區批處理)import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** * mapPartitions VS map * * map 傳入的是分區中的每個元素 , 是對每個元素就進行一次轉換和改變,但不會減少或增多元素 * mapPartitions 傳入的參數是Iterator返回值也是Iterator,所傳入的計算邏輯是對一個Iterator進行一次運算,可以增加或減少元素 * * * Map 算子因為類似于串行操作,所以性能比較低,而是 mapPartitions 算子類似于批處理 , 所以性能較高 。* 但是 mapPartitions 算子會長時間占用內存 , 這樣會導致內存OOM 。而map會在內存不夠時進行GC 。* * 詳細參考 https://blog.csdn.net/AnameJL/article/details/121689987 */object mapPartitions {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)// mapPartitions: 可以以分區為單位進行數據轉換操作,但是會將整個分區的數據加載到內存進行引用 。// 在內存較小,數據量較大的場合下 , 容易出現內存溢出 。val mpRDD: RDD[Int] = rdd.mapPartitions(iter => {println("批處理當前分區數據")iter.map(_ * 2)})mpRDD.collect().foreach(println)sc.stop()}}

推薦閱讀