[python] 並行的向資料庫撈取資料 (query DB concurrently)

點閱: 181

如何並行的向資料庫撈取資料

公司的專案有時候需要從資料庫撈出歷年的資料,即便已經正確的使用到 PK 在進行撈取,但一次要撈多年的資料,如果只透過單一 cursor 去拿還是容易造成因為資料量太大,導致 DB 在執行計畫運算的過程中算出 full table scan 的效率比 pk scan 還快。

初步的想法式,我透過迴圈,向資料庫發出請求一次查一個月的資料,查出來的資料存到一個空串列中,最後轉成 DataFrame 後轉存 excel 。

但這個解法有個顯而易見的缺點,就是我得等第一個月的資料處理完後,才能進行第二個月,如果一次撈取的資料有好多年,就算 SQL 寫的再好,整段查詢完成也是曠日廢時的。因此就有了並行處理 (concurrently)的作法,所以也產生了這篇文章,紀錄如何使用 python 去並行處理撈資料的請求。

程式碼

先上完整程式碼,再來陸續講解每段做的事情

# main.py
import pandas as pd
import multiprocess as mp
import traceback
import cx_Oracle
from itertools import chain
result = []

def get_data(param):
    sql_str = """ select * from my_table where month = :month """
    db_pool = cx_Oracle.SessionPool('id', 'password', 'ip:port/services', encoding='UTF-8', min=4, max=8, increment=1)
    with db_pool.acquire() as conn:
        cursor = conn.cursor()
        data = cursor.execute(sql_str, {'month': param}).fetchall()
    if data:
        result.extend(data)
    return result

def main():
    try:
        month = ['202001', '202002', '202003', '202004', '202005', '202006']
        mp_pool = mp.Pool()  # init the multrprocess.Pool object to concurrently
        data = mp_pool.map(get_data, month)
        df = pd.DataFrame(list(chain.from_iterable(data)),  # unpack "list of list of tuples"
        columns=['col1', 'col2', 'col3']
        )
        df.to_excel('./result.xlsx', index=False)
    except:
        traceback.print_exc()

if __name__ == '__main__':
    main()

程式詳解

第一步: 實體化 multiprocess 類別

首先透過 mp_pool = mp.Pool() 實體化 Pool 類別。來看看官方文件如何描述這個實體化的方法:

file

Pool()可指定最大的 process 數量,若未指定,則使用 os.cpu_count() 的值來填入

第二步: map a iterable on function within multiprocess class

接著呼叫 Pool 類別的 map 方法,來 map a iterable on function,這邊我要做的事情就是把 month 這個 iterable 以並行式的 map 到自己定義的 get_data 函數中。

備註: pool.map() 只能將1個 iterable 丟給1個 map function 執行,若要該 map function 可以接受多個 iterable argument, 請執行 map.starmap()

https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap

第三步:詳解如何並發的向資料庫發出請求

get_data 函數中,我要作的事情就是把 iterable 內的每個元素(每個月份),透過 cx_Oracle.SessionPool() 產生的 group of cursor ,並行式的向 oracle 發出請求並把資料要回來。為了怕對 oracle 發出的連線過多把 DB 搞爆,可以透過 with db_pool.acquire() 來控制連線,當我的請求完成後就自動的把連線關掉。

那我們可以怎麼確認自己真的有同時對 oracle 發出多個請求呢?我們可以用下列的 SQL 指令檢查:

SELECT program, count(1)
FROM V$SESSION 
WHERE machine LIKE '%my_machine_name%' AND status = 'ACTIVE' AND PROGRAM LIKE '%python%'
GROUP BY PROGRAM

file

在 python 執行的過程中,我可以透過上述的 SQL 指令,觀察我現在的程式同時有多少個連線在向資料庫發出請求並索取資料。

在每次的資料索取完畢後,把他塞入空串列當中 return 回到外層的程式中。

第四步: 資料取出後的處理

資料取出來後,透過 itertool.chain.from_iterable() 這個方法,來解開串列,list of list of tuples 解成 list of tuples 後,就可以變成 pd.DataFrame 的形式,進行後續的資料操作啦。

以上就是紀錄如何透過 python 的 multiprocess 並行化向資料庫索取任務的過程。

About the Author

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

Related Posts