[Udemy] 學習筆記: Taming Big Data with Apache Spark and Python – Hands On!

點閱: 147

記錄在 udemy 上學 spark 的筆記

環境設定(Windows)

Udemy 上的教學是在 Windows 進行的,以下說明

python

  • python3

java

  • JDK and JRE: 安裝時要注意在路徑中不能有空白, windows 預設的 Program Files 會有問題
  • JAVA_HOME=C:\Program Files\Java\jdk-15.0.2
  • JAVA_HOME=C:\Program Files\Java\jdk-11.0.10
  • JAVA_HOME=C:\Program Files\Java\jdk1.8.0_281 (java 8)
  • 打開 C:\spark\conf 資料夾, 修改 log4j.properties.template 名稱為 log4j.properties ,把第 19 行的 log4j.rotCategory=INFO 改為 log4j.rotCategory=ERROR

hadoop

  • HADOOP_HOME=c:\winutils
  • https://sungod-spark.s3.amazonaws.com/winutils.exe 連結中下載工具,讓 PC 能夠了解 hadoop 存在於環境中而不用真的安裝 hadoop 環境。
  • mkdir c:\wintuils\bin,然後把 winutils.exe 放到此路徑中
  • c:\winutils\bin\winutils.exe chmod 777 \tmp\hive

環境變數

  • SPARK_HOME: c:\spark
  • JAVA_HOME: C:\Program Files\Java\jdk1.8.0_281
  • HADOOP_HOME: C:\winutils
  • path:
    • %SPARK_HOME%\bin
    • %JAVA_HOME%\bin

環境設定(WSL)

自己在 Windows Linux Subsystem(WSL) 也成功裝起來,以下是簡單步驟,主要參考這個網站的步驟。

插播一下, WSL 的實體位置

# 找出 WSL 的根目錄

\\wsl$

安裝步驟

# 更新系統
sudo apt update
sudo apt -y upgrade

# Install Java
sudo apt install curl mlocate default-jdk -y
java -version  # use for check java version

# Download, Extract and Move Spark to working directory
curl -O https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz

tar xvf spark-3.1.1-bin-hadoop3.2.tgz

sudo mv spark-3.1.1-bin-hadoop3.2/ /opt/spark 

# Set Environment Variable
vim ~/.bashrc
    # add inside ~/.bashrc
    export SPARK_HOME=/opt/spark
    export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
    source ~/.bashrc  # activate it

# Using Spark shell
spark-shell  # use scala CLI

pyspark  # use python CLI

我在執行 pyspark 的時候出錯:

  • env: 'python': No such file or directory

解決方法如下

# 執行 vim ~/.bashrc
alias python=python3  # 新增 python alias
export PYSPARK_PYTHON=python3  # 新增環境變數 pyspark_python 讓他去找 python3 當 executer

# 如果啟動之後還有其他套件要裝,就先安裝 pip 之後再看看有什麼其他要裝的 (我除了 pip 外還裝了 py4j)

CH7 Installing the MovieLens movie rating dataset

CH8 Run Your First Spark Program

出錯

  • 原因是: Java 版本太新 (我裝15, 建議降回11,最後裝回8才搞定)

  • mkdir c:\tmp\hive

  • spark-submit ratings-counter.py

  • 上面這段指令會透過 spark 執行 python script ,因此要修改 ratings-counter.py 裡面要讀取檔案的設定 (第7-9行,依據系統而自行設定檔案路徑)

  • file

  • u.data 的資料長這樣,欄位分別是 userID/movieID/ratingValue/timestamp

  • file

  • Java 版本太新的話會跳出下列錯誤

    Traceback (most recent call last):
    File "C:/sw/00-work/08-learning/12-Spark_Python/ratings-counter.py", line 12, in 
    ...
    ...
    ...
    : java.lang.IllegalArgumentException: Unsupported class file major version 59

CH9 What’s new in Spark3

  • Old version of spark MLLib with RDDs is deprecated -> use dataframe based
  • Faster than spark2 about 17 times
  • Python2 is not support anymore
  • GPU instance support
  • Deeper k8s supported
  • Binary file support
  • [Graphs in CS ]SparkGraph and Cypher supported
  • ACID support in data lakes with Delta Lake

CH10. Introduction to Spark.

  • Spark is a fast ad general engine for large scale data processing
  • Spark is a distributed systems: Cluster manager to Executor
  • Just like MapReduce but it’s faster
  • Less lines of code in Spark than MapReduce for same task
  • Spark Streaming, Spark SQL, MLLib, GraphX, Spark Core

CH11. The Resilient Distributed Dataset (RDD)

  • 有彈性的分散式資料集
  • RDD 就是 spark 的基礎,就是一個很大的 Dataset object ,具有一些屬性與方法,讓他可以操作 big data
  • 可參考文獻
  • spark shell will create a Spark Context(sc) object to run RDD
  • 從下面從下面程式碼讀起,第5行先起始 SparkContext() class object ,再賦予屬性 (sc.textFile(...))
  • file
  • 還可讀取其他分散式系統的連結(e.g. s3n://, hdfs://, JDBC, Cassandra, HBase, Elasticsearch, JSON, CSV … )
  • Function that can operate RDD
    • map
    • flatmap
    • filter
    • distinct
    • sample
    • union, intersection, subtract, cartesian
  • RDD Actions
    • collect
    • count
    • countByValue
    • take
    • top
    • reduce
  • Lazy Evaluation!!! -> Nothing actually happens in your driver program until an action is called.

CH12. Ratings Histogram Walkthrough

    • file
  • 再次回到 ratings-counter.py 來詳細檢視程式碼,可以看到第1行 import SparkConf 與 SparkContext 後,第4行起始 SparkConf 時設定了 setMaster("local"): 代表的意思是使用 local machine 的 single thread/single process 來執行 pyspark.
  • 第11行的 ratings = lines.map(lambda x: x.split()[2]) 意思是把每一行的第3個元素(也就是 ratingValue)抓出來,變成新的 RDD 叫做 ratings
  • 第13行呼叫 ratings (RDD) 的 countByValue() 方法,執行類似 SQL count group by 的函數,依據 value 計算次數,會得到一個 list of tuples [(3,2), (1,2), (2,1)]
  • 第15行就是純 python code,目的是把 list of tuples 變成 OrderedDict ,最後到16行再把這個結果印在 console 上。

CH13. Key-Value RDD’s, and the Average Friends by Age Example

介紹 RDD 方法

  • reduceByKey(): Combine values with same key using some function e.g. rdd.reduceByKey(lambda x, y: x+y)
  • groupByKey(): Group values with the same key
  • sortByKey(): Sort RDD by key values
  • join, rightOuterJoin, leftOuterJoin, cogroup, subtractByKey…
  • 重點: 如果沒有要更改 keys, 那就呼叫 mapValues()/flatMapValues() 而不要只呼叫 map/flatmap ,因為這樣的作法會只把比較有效率 (下一章會執行程式碼並詳解)

CH14. Running The Average Friends by Age Example

file

  • 直接從第15行開始講解: 這邊呼叫 rdd.mapValues() 的方法,會只把 values(也就是 numFriends) 當作參數傳入函式中
  • 看一下 mapValuesreduceByKey 產生的物件長怎樣:
  • file
    • rdd.mapValues(lambda x: x(x, 1)) 的意思是把 keys 保留(33, 55) 並把 values (385, 2, 221) 當作 arguments 丟入 lambda function 的 x 當中
    • reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) 的意思是把 keys 保留(33, 55) 並把 [(385, 1)] 當作 x, [(2, 1)] 當作 y 傳入 lambda function 的 x, y 當中
  • 在 terminal 輸入 spark-submit friends-by-age.py 即可跑出結果,擷取部分如下

file

CH15. Filtering RDD’s, and the Minimum Temperature by Location Example

  • filter function: 保留輸入的參數,去除其他的欄位
  • 資料欄位: station_id, date, observation_type, value, …其他雜魚欄位

file

CH16. Running the Minimum Temperature Example, and Modifying it for Maximums

  • 課堂程式碼
    file

  • 把第15行印出來,可以看到目前的 RDD 是 (stationID, entryType, temperature) 的 tuple
    file

  • 把第16行印出來,可以看到目前的 RDD 是 (stationID, temperature) 的 tuple
    file

  • 把第17行印出來,可以看到目前的 RDD 是執行 min(tuple)的結果 (min(stationID, temperature))
    file

CH17. Running the Maximum Temperature by Location Example

  • 同 CH16 ,注意要把 1800.csv 的 entryType 改為 TMAX 以及 reduceByKey 改以 max 去計算結果

CH18. Counting Word Occurences using flatmap()

  • 本範例 flatMap 而非 mapBook.txt 裡面的文字出現頻率列出。
  • faltMap: 可以發現不是非常完美的將字串切乾淨,因為指使用 split() 預設是將空白建切開,但有些不乾淨的字串並未切掉(如: ., ;, ?, ,, "),下一章將是泛使用 regular expression 來去除 punctuations

file

  • map: 直接跳出 exception
    file

CH19 Improving The Word Count Script With Regular Expressions

  • 內容一樣,只是多了 re 去解析字串

CH20 Sorting The World Count Results

  • 同樣的程式碼,再多一段排序的指令
  • 第13行先用 mapreduce 來產出 {words: counts} 的字典,接下來14行把字典的 key:value 反轉成 value:key 並用 sortByKey() 去排序 (說白話點,就是用 value 也就是次數排序)
  • 第15行再用 collect() 執行運算
  • file

CH21-CH23

練習題目,就不放上來了

CH24 Introduction SparkSQL

CH25 Executing SQL commands and SQL-style functions on a DataFrame

直接看程式碼

file

  • 第7行定義一個 mapper function ,用來將每行的 csv 讀成 spark.Row 物件
  • 第16行起始化 DataFrame 物件並 cache 起來備用
  • 進行 SQL 操作(20行),或是物件操作(30行)進行 CRUD
  • 第32行結束 spark session

CH 26. Using DataFrames Instead of RDD

直接看程式碼

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
people = spark.read.option("header", "true").option("inferSchema", "true").csv("./fakefriends-header.csv")

print("Here is our inferred schema:")
people.printSchema()

> Here is our inferred schema:
> root
> |-- userID: integer (nullable = true)
> |-- name: string (nullable = true)
> |-- age: integer (nullable = true)
> |-- friends: integer (nullable = true)

print("Let's display the name column:")
people.select(['name', 'age']).show()

> Let's display the name column:
> +--------+---+
> |    name|age|
> +--------+---+
> |    Will| 33|
> |Jean-Luc| 26|
> |    Hugh| 55|
> |  Deanna| 40|
> |   Quark| 68|
> |   .....| ..|

print("Filter out anyone over 21:")
people.filter(people.age < 21).show()

> ilter out anyone over 21:
> +------+-------+---+-------+
> |userID|   name|age|friends|
> +------+-------+---+-------+
> |    21|  Miles| 19|    268|
> |    48|    Nog| 20|      1|
> |    52|Beverly| 19|    269|
> |    54|  Brunt| 19|      5|

print("Group by age")
people.groupBy("age").count().show()

> roup by age
> +---+-----+
> |age|count|
> +---+-----+
> | 31|    8|
> | 65|    5|
> | 53|    7|
> | 34|    6|

print("Make everyone 10 years older:")
people.select(people.name, people.age + 10).show()

> Make everyone 10 years older:
> +--------+----------+
> |    name|(age + 10)|
> +--------+----------+
> |    Will|        43|
> |Jean-Luc|        36|
> |    Hugh|        65|
> |  Deanna|        50|

spark.stop()

CH 27. [Exercise] Friends by age with DataFrames

直接使用 DataFrame 操作

from pyspark.sql import SparkSession
from pyspark.sql import functions as func
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
people = spark.read.option("header", "true").option("inferSchema", "true").csv("./fakefriends-header.csv")

print('====================Exercise=================')
print('get avg friends group by age')
people.select(['age', 'friends']).groupBy('age').avg('friends').sort('age').show()

# prettier
print('=============prettier=====================')

people.select(['age', 'friends']).groupBy('age').agg(func.round(func.avg('friends'), 2).alias('avg_friends')).\
    sort('age').show()
spark.stop()

結果

get avg friends group by age
+---+------------------+
|age|      avg(friends)|
+---+------------------+
| 18|           343.375|
| 19|213.27272727272728|
| 20|             165.0|
| 21|           350.875|
| 22|206.42857142857142|
| 23|             246.3|
| 24|             233.8|
| 25|197.45454545454547|
| 26|242.05882352941177|
| 27|           228.125|
| 28|             209.1|
| 29|215.91666666666666|
| 30| 235.8181818181818|
| 31|            267.25|
| 32| 207.9090909090909|
| 33| 325.3333333333333|
| 34|             245.5|
| 35|           211.625|
| 36|             246.6|
| 37|249.33333333333334|
+---+------------------+
============prettier=====================
+---+-----------+
|age|avg_friends|
+---+-----------+
| 18|     343.38|
| 19|     213.27|
| 20|      165.0|
| 21|     350.88|
| 22|     206.43|
| 23|      246.3|
| 24|      233.8|
| 25|     197.45|
| 26|     242.06|
| 27|     228.13|
| 28|      209.1|
| 29|     215.92|
| 30|     235.82|
| 31|     267.25|
| 32|     207.91|
| 33|     325.33|
| 34|      245.5|
| 35|     211.63|
| 36|      246.6|
| 37|     249.33|
+---+-----------+
only showing top 20 rows

About the Author

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

Related Posts