最近需要提供一個(gè)包含多個(gè)神經(jīng)網(wǎng)絡(luò)推理的python代碼供gRPC調(diào)用,即我需要在這個(gè)主程序的基礎(chǔ)上封裝一個(gè)支持gRPC的服務(wù)端(server)。本教程的目的在于通過簡單的代碼,來幫助有需求的朋友使用python來構(gòu)建屬于自己的gRPC服務(wù)端/客戶端。
0. 前言
最近需要用grpc調(diào)用我們的算法模塊, 對于我來講,就是需要提供一個(gè)grpc的server,供它們的go或者c++的client進(jìn)行消費(fèi)。那么, 在python里面如何定義一個(gè)完整的server–client,并且使其跑的非常好是個(gè)很重要的任務(wù)。
1. gRPC的官方介紹
中文官網(wǎng)的python接口例子直接放在grpc的github中,可能需要我們進(jìn)一步的挖掘,這里,為了避免繁瑣,我將通過一個(gè)簡單的例子來說明如何將我們的任務(wù)封裝為gRPC的 服務(wù)端 (server),并開啟 客戶端 (client)對其進(jìn)行調(diào)用。
在此之前,先簡單介紹一下什么是gRPC:
1.1 什么是gRPC
-
gRPC 是一個(gè)高性能、開源和通用的 RPC(遠(yuǎn)程過程調(diào)用) 框架,面向移動(dòng)和 HTTP/2 設(shè)計(jì)。目前提供 C、Java 和 Go 語言版本,分別是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持.
gRPC 基于 HTTP/2 標(biāo)準(zhǔn)設(shè)計(jì),帶來諸如雙向流、流控、頭部壓縮、單 TCP 連接上的多復(fù)用請求等特。這些特性使得其在移動(dòng)設(shè)備上表現(xiàn)更好,更省電和節(jié)省空間占用。 -
在 gRPC 里客戶端(client)應(yīng)用可以像調(diào)用本地對象一樣直接調(diào)用另一臺(tái)不同的機(jī)器上服務(wù)端(server)應(yīng)用的方法,使得您能夠更容易地創(chuàng)建分布式應(yīng)用和服務(wù)。與許多 RPC 系統(tǒng)類似,gRPC 也是基于以下理念: ① 定義一個(gè)服務(wù) , ② 指定其能夠被遠(yuǎn)程調(diào)用的方法(包含參數(shù)和返回類型) , ③ 在服務(wù)端實(shí)現(xiàn)這個(gè)接口,并運(yùn)行一個(gè) gRPC 服務(wù)器來處理客戶端調(diào)用。 在客戶端擁有一個(gè) 存根(stub) 能夠像服務(wù)端一樣的方法。
- gRPC 客戶端和服務(wù)端可以在多種環(huán)境中運(yùn)行和交互 - 從 google 內(nèi)部的服務(wù)器到你自己的筆記本,并且可以用任何 gRPC 支持的語言來編寫。所以,你可以很容易地用 Java 創(chuàng)建一個(gè) gRPC 服務(wù)端,用 Go、Python、Ruby 來創(chuàng)建客戶端。此外,Google 最新 API 將有 gRPC 版本的接口,使你很容易地將 Google 的功能集成到你的應(yīng)用里。
1.2 使用 protocol buffers
gRPC 默認(rèn)使用 protocol buffers,這是 Google 開源的一套成熟的結(jié)構(gòu)數(shù)據(jù)序列化機(jī)制(當(dāng)然也可以使用其他數(shù)據(jù)格式如 JSON)。正如你將在下方例子里所看到的,你用 proto files 創(chuàng)建 gRPC 服務(wù),用 protocol buffers 消息類型來定義方法參數(shù)和返回類型。你可以在 Protocol Buffers 文檔找到更多關(guān)于 Protocol Buffers 的資料。
Protocol buffers 版本
盡管 protocol buffers 對于開源用戶來說已經(jīng)存在了一段時(shí)間,例子內(nèi)使用的卻一種名叫 proto3 的新風(fēng)格的 protocol buffers,它擁有輕量簡化的語法、一些有用的新功能,并且支持更多新語言。當(dāng)前針對 Java 和 C++ 發(fā)布了 beta 版本,針對 JavaNano(即 Android Java)發(fā)布 alpha 版本,在protocol buffers Github 源碼庫里有 Ruby 支持, 在golang/protobuf Github 源碼庫里還有針對 Go 語言的生成器, 對更多語言的支持正在開發(fā)中。 你可以在 proto3 語言指南里找到更多內(nèi)容, 在與當(dāng)前默認(rèn)版本的發(fā)布說明比較,看到兩者的主要不同點(diǎn)。更多關(guān)于 proto3 的文檔很快就會(huì)出現(xiàn)。雖然你可以使用 proto2 (當(dāng)前默認(rèn)的 protocol buffers 版本), 我們通常建議你在 gRPC 里使用 proto3,因?yàn)檫@樣你可以使用 gRPC 支持全部范圍的的語言,并且能避免 proto2 客戶端與 proto3 服務(wù)端交互時(shí)出現(xiàn)的兼容性問題,反之亦然。
ps: 我這里使用的都是 protobuf 作為gRPC約定的中間數(shù)據(jù)傳輸格式定義。雖然可以用json,但是我沒看到這方面的教程。
2. 基本步驟
因?yàn)楣俜浇坛逃斜容^全面的grpc的各語言接口的安裝教程,我這里以python為例,來說明對深度學(xué)習(xí)應(yīng)用,我們應(yīng)該如何搭建一個(gè)基于grpc的server–client。
第1步:定義服務(wù)(實(shí)現(xiàn)自己的hellogdh.proto)
一個(gè) RPC 服務(wù)通過參數(shù)和返回類型來指定可以遠(yuǎn)程調(diào)用的方法,gRPC 通過 protocol buffers 來實(shí)現(xiàn)。使用 protocol buffers 接口定義語言來定義服務(wù)方法,用 protocol buffer 來定義參數(shù)和返回類型。客戶端和服務(wù)端均使用服務(wù)定義生成的接口代碼。
本文的
hellogdh.proto
定義如下[2]:
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// 參考資料:python版gRPC快速入門一
// https://blog.csdn.net/Eric_lmy/article/details/81355322
syntax
=
"proto3"
;
option java_multiple_files
=
true
;
option java_package
=
"io.grpc.gdh.proto"
;
option java_outer_classname
=
"GdhProto"
;
option objc_class_prefix
=
"HLW"
;
package
hellogdh
;
// 定義服務(wù).
service Greeter
{
// ① 簡單rpc.
rpc
SayHello
(
HelloRequest
)
returns
(
HelloReply
)
{
}
// ② 應(yīng)答式流rpc.
rpc
LstSayHello
(
HelloRequest
)
returns
(
stream HelloReply
)
{
}
}
// 客戶端傳的消息: HelloRequest.
message HelloRequest
{
string name
=
1
;
}
// 服務(wù)端發(fā)送的消息: HelloReply.
// 不能使用int, 只能使用int32這種.
// 1, 2, 3表示順序.
message HelloReply
{
int32 num_people
=
1
;
// repeated 定義列表對應(yīng)的結(jié)構(gòu).
repeated int32 point
=
2
;
}
其中,
syntax = "proto3"
表示使用proto3版本。option相關(guān)的東西我都沒咋動(dòng);
service Greeter
的意思是定義了一個(gè)叫做
Greeter
的服務(wù)。這個(gè)服務(wù)下面有兩種,關(guān)于gRPC可以定義的服務(wù)有4種,下面會(huì)詳細(xì)說明。
定義完畢之后,生成client和server的代碼(Note: 我之前以為client和server的代碼是自己寫的,實(shí)際實(shí)踐后才知道,是根據(jù)xxx.proto生成的!!根本不需要我們自己寫! )
執(zhí)行這一步需要安裝好一些工具如下:
sudo apt-get install protobuf-compiler-grpc
sudo apt-get install protobuf-compiler
對我的環(huán)境(ubuntu18.04 python3.6) 執(zhí)行:
protoc -I ./grpc --python_out=./grpc --grpc_out=./grpc --plugin=protoc-gen-grpc=`which grpc_python_plugin` hellogdh.proto
在對應(yīng)的目錄下回生成兩個(gè)文件 hellogdh_pb2_grpc.py 和 hellogdh_pb2.py 。
其中, hellogdh_pb2.py包括:
- 定義在hellogdh.proto中的消息類(Message)
-
定義在hellogdh.proto中的服務(wù)的抽象類:
BetaHellogdhServicer
, 定義了Hellogdh 服務(wù)實(shí)現(xiàn)的 接口 。
BetaHellogdhStub
, 定義了可以被客戶端用來激活的Hellogdh RPC的 存根 。 -
應(yīng)用到的函數(shù):
beta_create_Hellogdh_server: 根據(jù)BetaHellogdhServicer對象創(chuàng)建一個(gè)gRPC服務(wù)器(server專用)。
beta_create_Hellogdh_stub: 客戶端用于創(chuàng)建存根stub(client專用)。
第2步:實(shí)現(xiàn)server部分代碼.
本部分分別以 簡單調(diào)用(單項(xiàng)RPC) 和 服務(wù)端流RPC 為例進(jìn)行說明,實(shí)際上,gRPC允許4種類型服務(wù)方法(如果想完整的學(xué)習(xí),還是建議看官方文檔的例子[1]):
對我而言,因?yàn)槲倚枰讯噙M(jìn)程的python程序的最后輸出隊(duì)列封裝給gRPC的server進(jìn)程,
所以我首先需要把待處理的隊(duì)列(Queue)傳入gRPC server的進(jìn)程,再在這個(gè)進(jìn)程中定義好overwrite一些helloworld_pb2.py的方法。
最后,在主進(jìn)程中啟動(dòng)所有的神經(jīng)網(wǎng)絡(luò)任務(wù)進(jìn)程和gRPC進(jìn)程,并 阻塞 (join(),join的作用是保證當(dāng)前進(jìn)程正常結(jié)束, 即不會(huì)因?yàn)橹鬟M(jìn)程先退出而把未執(zhí)行完的子進(jìn)程kill掉。)。
代碼如下,參考自github
grpc/examples/python
下的
route_guide
import
sys
sys
.
path
.
append
(
".."
)
import
grpc
import
hellogdh_pb2
import
hellogdh_pb2_grpc
from
concurrent
import
futures
import
cv2
import
time
import
numpy
as
np
from
utils
.
config
import
*
import
logging
from
capture
import
queue_put
,
queue_get
,
queue_img_put
from
module
.
Worker
import
PoseWorker
import
multiprocessing
as
mp
from
multiprocessing
import
Pool
,
Queue
,
Lock
# 0.0 grpc.
def
grpc_server
(
queue
)
:
class
gdhServicer
(
hellogdh_pb2
.
BetaGreeterServicer
)
:
def
SayHello
(
self
,
request
,
context
)
:
# Note: 傳參的時(shí)候必須要顯式指定參數(shù)名稱, 而不能lynxi_pb2.HelloReply(1, [5, 10])
if
request
.
name
==
'gdh'
:
return
hellogdh_pb2
.
HelloReply
(
num_people
=
1
,
point
=
[
1
,
1
]
)
else
:
return
hellogdh_pb2
.
HelloReply
(
num_people
=
55
,
point
=
[
1
,
1
]
)
def
LstSayHello
(
self
,
request
,
context
)
:
while
request
.
name
==
'gdh'
:
data
=
queue
.
get
(
)
# 因?yàn)槭欠?wù)端流式模式,所以用yield。
yield
hellogdh_pb2
.
HelloReply
(
num_people
=
data
.
num
,
point
=
[
data
.
point
[
0
]
,
data
.
point
[
1
]
]
)
# 1. 之前啟動(dòng)server的方式.
# server = helloworld_gdh_pb2.beta_create_Greeter_server(gdhServicer())
# 2. 在route_guide里面學(xué)到的啟動(dòng)server的方式.
server
=
grpc
.
server
(
futures
.
ThreadPoolExecutor
(
max_workers
=
10
)
)
lynxi_pb2_grpc
.
add_GreeterServicer_to_server
(
gdhServicer
(
)
,
server
)
server
.
add_insecure_port
(
'[::]:50051'
)
# 因?yàn)?start() 不會(huì)阻塞,如果運(yùn)行時(shí)你的代碼沒有其它的事情可做,你可能需要循環(huán)等待。
server
.
start
(
)
try
:
while
True
:
# time.sleep(_ONE_DAY_IN_SECONDS)
time
.
sleep
(
5
)
except
KeyboardInterrupt
:
server
.
stop
(
)
# 2.1 對每個(gè)任務(wù)建立一個(gè)隊(duì)列.
pose_queue_raw
=
Queue
(
)
monitor_queue_raw
=
Queue
(
)
pose_out_queue
=
Queue
(
)
# key: 名稱, val[0]: 隊(duì)列, val[1]: 加載好的模型.
queues
=
{
'pose'
:
pose_queue_raw
,
'monitor'
:
monitor_queue_raw
}
# pose
# /
# /
# 3. 生產(chǎn)者-消費(fèi)者 --- detect
# \
# \
# face_detect
processes
=
[
]
for
key
,
val
in
queues
.
items
(
)
:
processes
.
append
(
mp
.
Process
(
target
=
queue_put
,
args
=
(
val
,
)
)
)
if
key
==
'pose'
:
processes
.
append
(
PoseWorker
(
val
,
pose_out_queue
)
)
else
:
processes
.
append
(
mp
.
Process
(
target
=
queue_get
,
args
=
(
val
,
)
)
)
processes
.
append
(
mp
.
Process
(
target
=
grpc_server
,
args
=
(
pose_out_queue
,
)
)
)
[
process
.
start
(
)
for
process
in
processes
]
[
process
.
join
(
)
for
process
in
processes
]
這段代碼的意思是將PoseWorker處理得到的隊(duì)列
pose_out_queue
喂給gRPC server進(jìn)程,并設(shè)置好根據(jù)client發(fā)來的請求來發(fā)送處理好的數(shù)據(jù)。
queue_put
和
queue_get
是將視頻的幀封裝后放入隊(duì)列A和從隊(duì)列A中讀取并顯示的函數(shù)。
import
cv2
from
multiprocessing
import
Queue
,
Process
from
PIL
import
Image
,
ImageFont
,
ImageDraw
import
cv2
def
queue_put
(
q
,
video_name
=
"/home/samuel/gaodaiheng/handup.mp4"
)
:
cap
=
cv2
.
VideoCapture
(
video_name
)
while
True
:
is_opened
,
frame
=
cap
.
read
(
)
q
.
put
(
frame
)
if
is_opened
else
None
def
queue_get
(
q
,
window_name
=
'image'
)
:
cv2
.
namedWindow
(
window_name
,
flags
=
cv2
.
WINDOW_NORMAL
)
while
True
:
frame
=
q
.
get
(
)
cv2
.
imshow
(
window_name
,
frame
)
cv2
.
waitKey
(
1
)
需要額外注意的是,PoseWorker是繼承
multiprocessing.Process
類的進(jìn)程,其大體定義如下:
from
multiprocessing
import
Queue
,
Process
class
PoseWorker
(
Process
)
:
"""
Pose estimation姿態(tài)估計(jì).
"""
def
__init__
(
self
,
queue
,
out_queue
)
:
Process
.
__init__
(
self
,
name
=
'PoseProcessor'
)
# 輸入隊(duì)列和輸出隊(duì)列.
self
.
in_queue
=
queue
self
.
out_queue
=
out_queue
def
run
(
self
)
:
#set enviornment
os
.
environ
[
"CUDA_VISIBLE_DEVICES"
]
=
"0"
#load models
import
tensorflow
as
tf
.
.
.
model
=
load_model
(
xxx
)
.
.
.
while
True
:
# 從入的隊(duì)列中消費(fèi)數(shù)據(jù).
frame
=
self
.
in_queue
.
get
(
)
# 喂入模型推理得到結(jié)果.
result
=
model
.
inference
(
frame
)
# 將結(jié)果放回到生產(chǎn)者中.
self
.
out_queue
.
put
(
result
)
第3步:實(shí)現(xiàn)server部分代碼.
和第2步類似,代碼如下,參考自github
grpc/examples/python
[3]下的
route_guide
# coding: UTF-8
"""
@author: samuel ko
"""
import
os
import
grpc
import
hellogdh_pb2
as
helloworld_gdh_pb2
import
hellogdh_pb2_grpc
as
helloworld_gdh_pb2_grpc
import
time
_ONE_DAY_IN_SECONDS
=
60
*
60
*
24
# 1. 為了能調(diào)用服務(wù)的方法,我們得先創(chuàng)建一個(gè) 存根。
# 我們使用 .proto 中生成的 route_guide_pb2 模塊的函數(shù)beta_create_RouteGuide_stub。
def
run
(
)
:
with
grpc
.
insecure_channel
(
'localhost:50051'
)
as
channel
:
# 1) 存根方式1.
stub
=
helloworld_gdh_pb2
.
GreeterStub
(
channel
)
# 2) 存根方式2.
# stub = helloworld_gdh_pb2_grpc.GreeterStub(channel)
print
(
"-------------- ① 簡單RPC --------------"
)
# response = stub.SayHello(helloworld_gdh_pb2.HelloRequest(name='gdh'))
features
=
stub
.
SayHello
(
helloworld_gdh_pb2
.
HelloRequest
(
name
=
'gdh'
)
)
print
(
features
)
print
(
"-------------- ② 服務(wù)端流式RPC --------------"
)
features
=
stub
.
LstSayHello
(
helloworld_gdh_pb2
.
HelloRequest
(
name
=
'gdh'
)
)
for
feature
in
features
:
print
(
"哈哈哈 %s at %s, %s"
%
(
feature
.
num_people
,
feature
.
point
[
0
]
,
feature
.
point
[
1
]
)
)
if
__name__
==
"__main__"
:
run
(
)
最后,就會(huì)打印出符合我們服務(wù)端設(shè)定的數(shù)據(jù)結(jié)構(gòu)…
補(bǔ)充知識(shí):protobuf 支持的python數(shù)據(jù)結(jié)構(gòu).
在proto的Message定義中, 我們支持python的string等類型, 在proto中需要顯式標(biāo)明, 我推測是 由于gRPC是支持多種語言接口的,有些語言是 強(qiáng)類型 的(C/C++, Go),所以務(wù)必需要顯式標(biāo)明數(shù)據(jù)類型, 避免帶來不必要的麻煩 :
message HelloRequest {
string name = 1;
}
其中, 1, 2, 3表示的是參數(shù)的順序. 我們支持的數(shù)據(jù)類型如下:
- ① string
- ② float
- ③ int32 / uint32 (不支持int16和int8)
- ④ bool
-
⑤
repeated
int以及我們自定義的Message.
這里需要特別強(qiáng)調(diào), repeated 表示不定長的數(shù)組, 里面可以放built-in的類型,或者自己額外封裝的message. 很靈活. 對應(yīng)python的list.
message BoxInfos
{
message BoxInfo
{
uint32 x0
=
1
;
uint32 y0
=
2
;
uint32 x1
=
3
;
uint32 y1
=
4
;
}
repeated BoxInfo boxInfos
=
1
;
}
- ⑥ bytes 字節(jié)流, 可以用于傳遞圖片. 不過一般性在gRPC中, 每條消息的大小都不大(1MB左右?) 所以一般性都是傳圖片的絕對路徑?
-
⑦
map
字典,對應(yīng)python的dict, 不過需要顯式指定key和value的類型.
總結(jié)
截至目前,一個(gè)封裝多進(jìn)程神經(jīng)網(wǎng)絡(luò)算法的python版 gRPC server-client就已經(jīng)圓滿完成, 因?yàn)槲乙彩莿偨佑|,可能有理解上的偏差,懇請各位指正, 非常感謝~
參考資料
[1] gRPC–python中文官網(wǎng)
[2] python版gRPC快速入門一
[3] grpc/examples/python
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
