我的Spark學習筆記( 二 )

mapPartitionsWithIndex算子:分區索引 + 數據迭代器import org.apache.spark.{SparkConf, SparkContext}// 分區索引object mapPartitionsWithIndex {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)val mpiRDD = rdd.mapPartitionsWithIndex(//(分區索引,數據迭代器)(index, iter) => {println("index:" + index, "iter[" + iter.mkString(",") + "]")})mpiRDD.collect().foreach(println)sc.stop()}}flatMap算子:數據扁平化import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// 將處理的數據進行扁平化后再進行映射處理,所以算子也稱之為扁平映射object flatMap {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd: RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4)))// 多個list合并成一個listval flatRDD: RDD[Int] = rdd.flatMap(list => list)flatRDD.collect().foreach(println)sc.stop()}}glom算子:分區內數據合并import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// 將同一個分區的數據直接轉換為相同類型的內存數組進行處理 , 分區不變object glom {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)// 把每一個分區內數據合并成Arrayval glomRDD: RDD[Array[Int]] = rdd.glom()glomRDD.collect().foreach(array => {println(array.mkString(","))})sc.stop()}}groupBy算子:數據分組import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// 將數據根據指定的規則進行分組, 分區默認不變,但是數據會被打亂重新組合,我們將這樣的操作稱之為 shuffle 。// 極限情況下,數據可能被分在同一個分區中一個組的數據在一個分區中 , 但是并不是說一個分區中只有一個組,分組和分區沒有必然的關系object groupBy {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)// groupBy會將數據源中的每一個數據進行分組判斷,根據返回的分組key進行分組,相同的key值的數據會放置在一個組中// val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(num => num % 2)val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)groupRDD.collect().foreach(println)sc.stop()}}filter算子:數據過濾import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// 將數據根據指定的規則進行篩選過濾 , 符合規則的數據保留,不符合規則的數據丟棄 。// 當數據進行篩選過濾后,分區不變,但是分區內的數據可能不均衡,生產環境下,可能會出現數據傾斜 。object filter {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4))val filterRDD: RDD[Int] = rdd.filter(num => num % 2 != 0)filterRDD.collect().foreach(println)sc.stop()}}sample算子:數據采樣隨機抽取import org.apache.spark.{SparkConf, SparkContext}// 根據指定的規則從數據集中抽取數據object sample {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val dataRDD = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1)// 抽取數據不放回(伯努利算法)// 伯努利算法:又叫 0、 1 分布 。例如扔硬幣,要么正面,要么反面 。// 具體實現:根據種子和隨機算法算出一個數和第二個參數設置幾率比較 , 小于第二個參數要,大于不要// 第一個參數:抽取的數據是否放回,false:不放回// 第二個參數:抽取的幾率 , 范圍只能在[0,1]之間,0:全不?。?1:全?。?// 第三個參數:隨機數種子val dataRDD1 = dataRDD.sample(false, 0.5)// 抽取數據放回(泊松算法)// 第一個參數:抽取的數據是否放回,true:放回; false:不放回// 第二個參數:重復數據的幾率 , 范圍大于等于0,可以大于1 表示每一個元素被期望抽取到的次數// 第三個參數:隨機數種子// 例如數據集內有10個,fraction為1的話抽取10個,0.5的話抽取5個 , 2的話抽取20個val dataRDD2 = dataRDD.sample(true, 2)println(dataRDD1.collect().mkString(","))println(dataRDD2.collect().mkString(","))sc.stop()}}distinct算子:數據去重import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object distinct {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))val rdd1: RDD[Int] = rdd.distinct()val rdd2: RDD[Int] = rdd.distinct(2)// 底層相當于這樣寫val rdd3 = rdd.map(x => (x, null)).reduceByKey((x, _) => x).map(_._1)println(rdd.collect().mkString(","))println(rdd1.collect().mkString(","))println(rdd2.collect().mkString(","))println(rdd3.collect().mkString(","))sc.stop()}}

推薦閱讀