表數(shù)據(jù)超過百萬級(jí)別時(shí)使用pandas讀取數(shù)據(jù)速度過慢,如果仍然想用pandas讀取,可以通過多進(jìn)程提高效率。同時(shí)可以將常用數(shù)據(jù)保存為pkl文件,以便后續(xù)使用。
@主要代碼實(shí)現(xiàn)
#按照表中的某字段將表劃分為比較均勻的多個(gè)子集
#本例中需要讀取的表中包含了城市字段,
#且涉及的城市包含了全國大部分城市,數(shù)據(jù)分布較為均勻,因此制作了一張省份城市配置表,將數(shù)據(jù)劃分
#讀取省份-城市配置表,獲取城市列表
def
get_division_list
(
db_connect
,
division_table
)
:
sql
=
'select * from {};'
.
format
(
division_table
)
data
=
pd
.
read_sql
(
sql
,
con
=
db_connect
)
#省份列表
province_list
=
data
[
'province'
]
.
unique
(
)
.
tolist
(
)
#獲取城市列表
city_list
=
[
]
for
code
in
province_list
:
city
=
data
[
data
[
'province'
]
.
str
.
contains
(
code
)
]
[
'city_code'
]
.
unique
(
)
.
tolist
(
)
city_list
.
append
(
city
)
#此處返回的城市列表demo為[['南京','蘇州','揚(yáng)州'],['深圳','廣州']]
return
city_list
#單進(jìn)程讀取目標(biāo)表
def
read_data
(
db_connect
,
target_table
,
code
)
:
start
=
time
.
time
(
)
sql
=
'select * from {0} where city_code in ({1});'
.
format
(
target_table
,
"'"
+
"','"
.
join
(
code
)
+
"'"
)
data_df
=
pd
.
read_sql
(
sql
,
con
=
db_connect
)
print
(
'數(shù)據(jù)讀入成功!'
)
end
=
time
.
time
(
)
print
(
'Task runs %0.2f seconds.'
%
(
(
end
-
start
)
)
)
return
data_df
if
__name__
==
"__main__"
:
# 開啟的進(jìn)程數(shù), 與邏輯核保持一致即可,普通臺(tái)式機(jī)建議18,高性能工作站建議60
target_table
=
'table_name1'
division_table
=
'table_name2'
db_connect
=
connect_db
(
)
#數(shù)據(jù)庫連接函數(shù),詳見最后所附完整項(xiàng)目代碼
city_list
=
get_division_list
(
db_connect
,
division_table
)
proc_num
=
31
#進(jìn)程數(shù)
pool
=
Pool
(
processes
=
proc_num
)
jop_result
=
[
]
for
code
in
city_list
:
# 維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個(gè)進(jìn)程執(zhí)行完畢后會(huì)添加新的進(jìn)程進(jìn)去
res
=
pool
.
apply_async
(
read_data
,
(
db_connect
,
target_table
,
code
,
)
)
jop_result
.
append
(
res
)
pool
.
close
(
)
#關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成
pool
.
join
(
)
#調(diào)用join之前,先調(diào)用close函數(shù),否則會(huì)出錯(cuò)。執(zhí)行完close后不會(huì)有新的進(jìn)程加入到pool,join函數(shù)等待所有子進(jìn)程結(jié)束
#獲取結(jié)果,本例中將讀取的數(shù)據(jù)保存到本地的pkl文件中,以便后續(xù)使用
for
index
,
tmp
in
enumerate
(
jop_result
)
:
result_path
=
r
'path\result_'
+
str
(
index
)
+
'.pkl'
tmp_df
=
tmp
.
get
(
)
with
open
(
result_path
,
'wb'
)
as
f
:
pickle
.
dump
(
tmp_df
,
f
)
;
完整代碼鏈接: https://github.com/AlisaAlbert/TransferData/blob/master/ReadData.py
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
