[BigQuery] Python上傳DataFrame至BigQuery
前言
- 前陣子被問到能不能分享python與bigquery互相串接,能夠定期上傳資料或是下載資料,而筆者本身也挺有興趣的,畢竟BigQuery有著動態儲存前10G免費以及每個月免費1T查詢,資料量不大的情況下,用起來還是挺舒服的,這篇文章筆者將會介紹如何把pandas-DataFrame上傳到google-BigQuery。
- 如果你正在盤算著做一個自己的Data Science Project,思考資料要存哪裡,BigQuery可能會是個好的選擇,除了儲存前10G免費,每個月1T查詢免費,DataStudio也能夠直接連接做視覺化喔。
下載
pip install google-cloud-bigquery==1.24.0
pip install pyarrow==0.16.0# 感謝網友Dennis Dai提出bigquery版號不同會造成的問題
查了一下官方文件,目前把dataframe上傳到BigQuery有兩種方式,一種是pandas-gbq,另一種則是google.cloud.bigquery。
以前雖然就是直接用gbq,不過google本身的API鐵定是有更完整的功能,因此這次選了google本身的API,有興趣的朋友想看兩者的比較可以看這裡。
官方甚至做了詳細的比較(企圖把人從gbq拉回官方API ?)
pyarrow則是一個中介物,在傳輸的過程中會先將dataframe轉換成 Parquet格式,再進行上傳,而gbq則會以csv格式上傳。
申請金鑰
接著我們要有API的金鑰,你可以選擇在GCP主控台上申請或是使用Command line,筆者在這邊從GCP主控台介紹 :
建立金鑰時需要建立基本資訊以及權限,這裏的角色選擇Project擁有者。建立完成後就會下載一組金鑰的json檔案,我們把它放到要執行檔案的python資料夾中。
建立DataSet以及Table
終於來到這一步了,實際上也可以從google-console的BigQuery圖形化介面中建立,不過這邊筆者就用python做連接,讓讀者能夠熟悉API的使用方式
- 建立連線,確認金鑰沒有問題
from google.cloud import bigquery as bqimport osos.environ["GOOGLE_APPLICATION_CREDENTIALS"]="./TEST_BQ_keys.json"client = bq.Client()print(client)
<google.cloud.bigquery.client.Client object at 0x10eab2438>
正確的情況下應該要回應上述,表示Client物件建立成功,這裏補充一下一個坑,這邊使用 :
import osos.environ["GOOGLE_APPLICATION_CREDENTIALS"]="./TEST_BQ_keys.json"
來建立環境變數,是因為google文件中的方法沒辦法讀取到,在stackoverflow上的解答中使用os.environ來建立變數,這樣就讀的到了。
2. 建立DataSet以及Table
dataset_id = f"{client.project}.NEW_DATA_SET"table_id = f"{dataset_id}.TEST_TABLE"tschema = [bq.SchemaField("full_name", "STRING", mode="NULLABLE"),bq.SchemaField("age", "INTEGER", mode="NULLABLE"),]dataset = bq.Dataset(dataset_id)dataset.location = "asia-east1"dataset = client.create_dataset(dataset)table = bq.Table(table_id, schema=tschema)table = client.create_table(table) # API requestprint(f"Created dataset {client.project}.{dataset.dataset_id}")print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")# 感謝網友陳伯翰修正地區會跳回us的bug
這裡需要注意的地方是,在 SchemaField 中的mode要設定成 NULLABLE ,因為我們pandas.DataFrame預設的狀態是允許null的,接著應該就可以看到成功建立的訊息,同時也能夠從GCP的console主頁上看到自己建立的DataSet及Table
上傳DataFrame
import pandas as pddataset_id = "NEW_DATA_SET"dataset_ref = client.dataset(dataset_id)table_ref = dataset_ref.table("TEST_TABLE")df = pd.DataFrame({u'full_name':['A','B','C','D'],u'age':[200, 100, 30, 4]})job = client.load_table_from_dataframe(df, table_ref, location="asia-east1")job.result() # Waits for table load to complete.assert job.state == "DONE"
這裏要注意的點是, load_tabl_from_dataframe
這個方法吃的是table_ref而不是table_id,因此前面 dataset_ref, table_ref
要稍作調整。
正常情況下就可以把dataframe上傳上去了,但是好景不常,沒遇到個什麼錯誤還真的覺得不太對勁,結果錯誤就來了XD
google.api_core.exceptions.NotFound: 404 POST https://www.googleapis.com/upload/bigquery/v2/projects/test-252313/jobs?uploadType=resumable: Not found: Dataset test-252313:NEW_DATA_SET
竟然說找不到DATA_SET !?,google大神啊,別鬧我了,剛剛不是跟我說建好了嗎QQ,仔細檢查了一下DataSet的描述 :
上圖是已經修正好的,看來還是有一些bug,儘管我們已經把dataset的location屬性調整成’asia-east-1’,但bigquery常常建出來卻還是在US,這點各位讀者要小心確認一下,畢竟如果要傳資料,傳到彰化(asia-east-1)去和傳到美國去(US)等待時間可是有差啊!
修改之後再次上傳,就沒有問題摟!
如此一來我們就完成了上傳的部分啦!
至於想要了解更深的朋友,就拜訪BigQuery API,一起交流吧XD
費用
看起來BigQuery的費用區分成幾種 :
- 動態儲存 / 靜態儲存
- 查詢
- 資料串流
- 動態儲存
- 查詢
- 串流
如果使用量較大的朋友,就要注意瞜(使用量大應該不會需要這麼基本的教學),使用量不會太大的朋友們,看來BigQuery花到我們錢的機會不多,只要小心自己上傳的DataFrame就可以瞜!
今天的分享就到這邊,如果各位讀者覺得有幫助,就幫我拍個手吧!
如果有任何問題,也可以在下方留言或是寄信到yltsai0609@gmail.com
有空的話我都會盡量回的!
完整程式碼
from google.cloud import bigquery as bqimport osimport pandas as pdos.environ["GOOGLE_APPLICATION_CREDENTIALS"]="./TEST_BQ_keys.json"client = bq.Client()print(client)dataset_id = f"{client.project}.NEW_DATA_SET"table_id = f"{dataset_id}.TEST_TABLE"tschema = [bq.SchemaField("full_name", "STRING", mode="NULLABLE"),bq.SchemaField("age", "INTEGER", mode="NULLABLE"),]dataset = bq.Dataset(dataset_id)dataset.location = "asia-east1"dataset = client.create_dataset(dataset)table = bq.Table(table_id, schema=schema)table = client.create_table(table) # API requestprint(f"Created dataset {client.project}.{dataset.dataset_id}")print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")dataset_id = "NEW_DATA_SET"dataset_ref = client.dataset(dataset)table_ref = dataset_ref.table("TEST_TABLE")df = pd.DataFrame({u'full_name':['A','B','C','D'],u'age':[200, 100, 30, 4]})job = client.load_table_from_dataframe(df, table_ref, location="asia-east1")job.result() # Waits for table load to complete.assert job.state == "DONE"