Tensorflow Serving 要点

最近在使用 Tensorflow serving 时踩了一些坑,这里聊作记录,以供后来者参考。

Tensorflow serving 是用来帮助把模型部署到 Server 上的,模型格式为 SavedModel。
之前我的模型都是最后部署到移动端的,而这次的项目需要部署在服务端,也因此是首次使用 serving。

Serving Model 有以下步骤:
1、将模型转换为 SavedModel 格式
(主要讲两个例子,一个是会讲怎么把已有 checkpoint 截取部分转成用来推演的 SavedModel,另外讲一例在 serving 中使用 assets的麻烦事)
2、使用 tf-serving 把模型跑起来(这里使用docker方式)
3、通过 restful API 或 gRPC API 使用模型(这里只讲后者)

Extract a subgraph and export to SavedModel

首先,读者至少要有能力区分 tf 三类模型主要保存方式,即 checkpoint、frozen model(.pb, protobuffer)、SavedModel。
Serving 需要使用 SavedModel 类型的模型文件,如果训练时不是使用 Estimator, 估计很少会有人用 SavedModel 保存最终的网络,因此这里就需要把之前保存好的 pb 或 checkpoint 额外转一下格式。

简单的模型,看下这里官方教程 Serving a TensorFlow Model 一般足以搞定 SavedModel。

如果是 .pb 文件,使用 tf.import_graph_def, 如果是 ckpt, 使用 tf.train.import_meta_graph,总结起来可以用下面的代码方便地导入 graph:
https://gist.github.com/BenZstory/3ef2d6e59dc8ff133708c8b6122738b1#file-load_model-python

在本节的例子中,我们的目标是将一个已有模型 ckpt 截取部分来实现 serving,具体来说,我希望把 TwinGAN 的 encoder-decoder 架构中,只固化和截取 encoder 部分,然后导出为 SavedModel 格式,从而减少模型参数。而在这个过程中,假设手上并没有整个模型的架构代码,而只根据 ckpt 中现有的节点名指定输入输出来做截断,那就需要下面的方法了。

整个步骤,包括

1、加载graph
2、指定要截取子图的输入输出节点并固化
3、重新加载子图再导出为 SavedModel

关键就在于第2步的实现方式。如果有熟悉 SavedModel 的同学可能会疑问何必这么麻烦,导出 SavedModel 的时候可以设定好输入输出,足以使用子图。但是我实验下来,#build_signature_def时设定的inputsoutputs并不会自动帮助截取最小子图,这造成输出的 SavedModel 里有太多冗余内容,以我的实际问题为例,增加手动截子图后,把SavedModel大小从 270mb 降低到 11mb。

第1步,使用上文提到的load_model加载好ckpt。
第2步,截取子图并固化的方法参考自这里,核心就三行代码:

1
2
3
g = tf.graph_util.convert_variables_to_constants(sess, sess.graph_def, [ENDPOINTS_OP_NAME])
g = tf.graph_util.extract_sub_graph(g, [ENDPOINTS_OP_NAME])
g = tf.graph_util.remove_training_nodes(g, protected_nodes=["images_ph", ENDPOINTS_OP_NAME])

在上面代码中,graph 在 sess 中已经整个加载好,"images_ph" 为输入节点,[ENDPOINTS_OP_NAME] 为输出节点列表。最终 g 是截好的子图,只有子图和固化好的参数。接下来把 sess 中的图 切到子图 g,为保险起见,我这里是把 g 输出后再重新加载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
TMP_SUBGRAPH_PB_NAME = 'twingan_subgraph_tmp.pb'
with open(os.path.join(DATA_ROOT, TMP_SUBGRAPH_PB_NAME), 'wb') as fout:
fout.write(g.SerializeToString())

tf.reset_default_graph()
pb_path = os.path.join(DATA_ROOT, TMP_SUBGRAPH_PB_NAME)
gd = tf.GraphDef()
with open(pb_path, 'rb') as f:
gd.ParseFromString(f.read())
tf.import_graph_def(gd, name='')

now_graph = tf.get_default_graph()

images_placeholder = now_graph.get_tensor_by_name('images_ph:0')
encoder_endpoints_tensor = now_graph.get_tensor_by_name('encoder_content_4/downsample_to_4x4x256/AvgPool:0')

sess、图、输入输出节点等环境都准备好了,进入第3步,终于导出为 SavedModel 格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
EXPORT_DIR = os.path.join(DATA_ROOT, 'twingan_latent_encoder', str(int(time.time())))
if os.path.exists(EXPORT_DIR):
shutil.rmtree(EXPORT_DIR)
builder = tf.saved_model.builder.SavedModelBuilder(EXPORT_DIR)

model_signature = tf.saved_model.signature_def_utils.build_signature_def(
inputs={
"input_images": tf.saved_model.utils.build_tensor_info(images_placeholder)
},
outputs={
"output": tf.saved_model.utils.build_tensor_info(encoder_endpoints_tensor)
},
method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME)

builder.add_meta_graph_and_variables(
sess, [tf.saved_model.tag_constants.SERVING],
signature_def_map={
tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:
model_signature,
},
clear_devices=True
)
builder.save()

(Hidden Content)

Export with assets

接下来同样是对模型导出 SavedModel 技巧的讲解,但这一例中要用到 assets。
在我们平常训练、推演模型的时候,由于一般直接在 python 环境下,所以好多信息是通过 python 模块如 numpy 加载数据成为常量后给到模型的,在固化后以 tf.constants 保存,如果导出为 SavedModel,会和整个图结构存在 protobuffer 中。

再回顾下 SavedModel 的目录结构

1
2
3
4
5
6
assets/
assets.extra/
variables/
variables.data-?????-of-?????
variables.index
saved_model.pb|saved_model.pbtxt

参数会保存在 variables 下,constants 和图保存在 .pb 中,那 assets 用来干什么的呢,官网解释如下:

assets 是包含辅助(外部)文件(如词汇表)的子文件夹。资源被复制到 SavedModel 的位置,并且可以在加载特定的 MetaGraphDef 时被读取。

熟悉 Android 开发的都知道项目里有 /assets 用来放资源文件,这里概念也类似。资源文件的分出,从设计上,基本都是因为放不下、加载慢才剥出来,而我这里,正是由于常量太大在转为SavedModel时报错 "A protocol message was rejected because it was too big",我的模型会超过2G,在 github-issue 里有相关讨论

正确的做法是把不必固化在图里的常量放到 assets 中,在加载图后再加载常量。这里以我的模型为例,讲下是什么要放到 assets 中以及具体操作步骤。

模型目标

我的模型实现一个简单的 top-k 算法来查找特征相似图片。将图片库中每个文件使用特定模型(就是上一例模型)计算得到其隐含空间编码(latent embeddings),对新文件做同样编码,在已知图片空间中搜索得到距离最近的k个图片并返回。已知图片的编码都是预处理好的,topk 计算时会把整个矩阵塞进去,而就是这个数据量太大就会造成我们的问题。要使用 ssets 解决该问题,就需要把这些已知的编码使用 numpy 保存成一个文件,放到 assets 目录中去再使用。

模型实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
anime_latents_file = os.path.join(DATA_ROOT, 'latent', 'anime_latents_bytes')
anime_latents_asset_path = os.path.join(DATA_ROOT, 'latent', 'anime_latents_bytes.assets')
anime_latents_index_asset_path = os.path.join(DATA_ROOT, 'latent', 'anime_latents_indexes.assets')
LATENT_SHAPE = [1, 4, 4, 256]

graph = tf.Graph()
with graph.as_default():

original_latents_asset_path = tf.constant(anime_latents_asset_path)
latents_asset_path = tf.Variable(original_latents_asset_path, name='latents_asset_path', trainable=False, collections=[])
assign_latents_asset_path = latents_asset_path.assign(original_latents_asset_path)

original_index_asset_path = tf.constant(anime_latents_index_asset_path)
index_asset_path = tf.Variable(original_index_asset_path, name='index_asset_path', trainable=False, collections=[])
assign_index_asset_path = index_asset_path.assign(original_index_asset_path)

x_input = tf.read_file(latents_asset_path)

x_input = tf.decode_raw(x_input, tf.float32) # decode_raw means the file should be written out like this `f.write(np.array(anime_latents).tobytes())`
x_input = tf.reshape(x_input, [-1, *LATENT_SHAPE])

y_input_ph = tf.placeholder(tf.float32, shape=[1, *LATENT_SHAPE], name='y_ph')
k_num_ph = tf.placeholder(tf.int32)

x_reshaped = tf.layers.flatten(x_input)
y_reshaped = tf.layers.flatten(y_input_ph)

distance = tf.norm(tf.subtract(x_reshaped, y_reshaped), axis=1)
top_k_xvals, top_k_indices = tf.nn.top_k(tf.negative(distance), k=k_num_ph)

table = tf.contrib.lookup.index_to_string_table_from_file(vocabulary_file=index_asset_path, default_value="UNKNOWN")
nearest_filenames = table.lookup(tf.cast(top_k_indices, tf.int64))

tf.add_to_collection(tf.GraphKeys.ASSET_FILEPATHS, original_latents_asset_path)
tf.add_to_collection(tf.GraphKeys.ASSET_FILEPATHS, original_index_asset_path)


sess = tf.InteractiveSession(graph=graph, config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=True))

init_global_val = tf.global_variables_initializer()
init_assign = tf.group([assign_latents_asset_path, assign_index_asset_path])
init_table = tf.group([tf.tables_initializer()])

init_op = tf.group(init_global_val, init_assign, init_table)

sess.run(init_op)

top_k 在 29 行,整个 graph 构建部分,主要进行 资源文件路径和内容的载入,reshape 等预处理,计算top_k,把结果id映射到对应文件名并结束。

处理题述问题及使用 assets 的核心在于 9~11 行代码。资源文件通过路径指明,随后在17行 tf.read_file 加载,而路径则首先用一个 constant 保存具体路径值,然后再使用 tf.assign 方法写入预定的 variable 中,最后使用 variable 中的路径值。为什么要绕一圈这么麻烦而不直接写死路径呢,因为我们的目标是要生成 SavedModel 的,打包放到另外目录时 assets 文件路径当然也会变化,因而路径不能写死,还要借助 34 行 tf.add_to_collection 以及后面导出 SavedModel 时指明 assets_collection 来实现模型导出时路径值的自然过渡。

通过这样的方法,基本实现了 SavedModel 构建模型时 assets 的使用。

读者在上面的代码中应该能明显意识到模型这里其实使用了两个 assets ,除了 anime_latents_asset,还有一个 anime_latents_index。后者是为了在计算得到文件 id 后,便利地映射到对应文件名中去,具体映射的实现借助 31 行代码 [tf.contrib.lookup.index_to_string_table_from_file] 提供的一个 table 数据结构来完成,可以查阅相关文档了解更多具体用法。这个 api 是被设计用来把 word_id 映射到字典中具体 String 的,做 NLP 的会更多用到它,我也是从一个讨论怎样 Serving Seq2Seq 的github-issue 中才知道这个 api 的。

不过在结合使用 index_to_string_table_from_file 和 serving 的时候,还是另外踩进了一个深坑:

这只 api 的实现需要创建一种 tf.lookup.StaticHashTable 格式,而这种格式需要一个额外的初始化动作,即 tf.tables_initializer(),这也是为什么在官方serving基础教程的转换代码中有写 main_op=tf.tables_initializer()。OK,那我们在使用模型以及转为 SavedModel 的时候都注意下要增加它的初始化操作,即上面代码中 44 行,把各个 initilizer 一个不落组合在一起,并在下面 模型导出 一节的代码中,传给legacy_init_op(或main_op)来实现初始化。然后,问题出现了,在实际 serving 跑起来时,会有概率报以下错误:

1
2
E tensorflow_serving/util/retrier.cc:37] Loading servable: {name: latent_search_model version: 1565713790} failed: Failed precondition: Attempting to use uninitialized value index_asset_path
[[{{node index_asset_path/read}}]]

在创建 look_table 的时候,发现 index_asset_path 这个 variable 节点还没初始化而报错。而有时候运气好的话又会正常运行。

这是因为,虽然我们把 variable 和 table 的初始化都给到了初始化参数中,但是这个初始化只接受一个operator,我们使用 tf.group() 把多个 initilizer 组合在一起,在实际初始化时,是并行无序做的。但我们的模型中,确实又有 table 对某个 variable 的依赖,进而报错。

对这个问题,我尝试在构建模型时和 tf.group 各个 initilizer 时增加 tf.control_dependencies() 来控制初始化顺序,但改来改去还是没效果,最是初始化还是并行跑的。也考虑如果简单的话干脆改下 serving 初始化源码,但是因为有概率 serving 能直接跑起来,只要初始化顺利后就能稳定运行,tf-serving 模块又是用 c++ 实现的,于是就没动力继续改了。我认为这个应该是 tf 的设计缺陷,并已经在 tf-serving 提了issue

更新,该 issue 已有结论,需要结合使用 ResourceVariable 以及 read_value 来为 index_path 的 variable 和 assigner 绑定依赖,具体讨论细节详见 issue,针对本项目,修改后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from tensorflow.python.ops import resource_variable_ops as rr

graph = tf.Graph()
with graph.as_default():

original_latents_asset_path = tf.constant(anime_latents_asset_path)
latents_asset_path = rr.ResourceVariable(original_latents_asset_path, name='latents_asset_path', trainable=False, collections=[])
assign_latents_asset_path = latents_asset_path.assign(original_latents_asset_path)

original_index_asset_path = tf.constant(anime_latents_index_asset_path)
index_asset_path = rr.ResourceVariable(original_index_asset_path, name='index_asset_path', trainable=False, collections=[])
assign_index_asset_path = index_asset_path.assign(original_index_asset_path)

with tf.control_dependencies([assign_latents_asset_path]):
x_input = tf.read_file(latents_asset_path.read_value())
x_input = tf.decode_raw(x_input, tf.float32)
x_input = tf.reshape(x_input, [-1, *LATENT_SHAPE])

y_input_ph = tf.placeholder(tf.float32, shape=[1, *LATENT_SHAPE], name='y_ph')
k_num_ph = tf.placeholder(tf.int32)

x_reshaped = tf.layers.flatten(x_input)
y_reshaped = tf.layers.flatten(y_input_ph)

distance = tf.norm(tf.subtract(x_reshaped, y_reshaped), axis=1)
top_k_xvals, top_k_indices = tf.nn.top_k(tf.negative(distance), k=k_num_ph)

with tf.control_dependencies([assign_index_asset_path]):
table = tf.contrib.lookup.index_to_string_table_from_file(vocabulary_file=index_asset_path.read_value(), default_value="UNKNOWN")
nearest_filenames = table.lookup(tf.cast(top_k_indices, tf.int64))

tf.add_to_collection(tf.GraphKeys.ASSET_FILEPATHS, original_latents_asset_path)
tf.add_to_collection(tf.GraphKeys.ASSET_FILEPATHS, original_index_asset_path)

init_op = tf.group(tf.global_variables_initializer(), tf.tables_initializer())

模型导出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
EXPORT_DIR = os.path.join(DATA_ROOT, 'latent_search_model', str(int(time.time())))
if os.path.exists(EXPORT_DIR):
shutil.rmtree(EXPORT_DIR)
builder = tf.saved_model.builder.SavedModelBuilder(EXPORT_DIR)

model_signature = tf.saved_model.signature_def_utils.build_signature_def(
inputs={
"k_num": tf.saved_model.utils.build_tensor_info(k_num_ph),
"target_latent": tf.saved_model.utils.build_tensor_info(y_input_ph)
},
outputs={
"xvals": tf.saved_model.utils.build_tensor_info(top_k_xvals),
"indices": tf.saved_model.utils.build_tensor_info(top_k_indices),
"filenames": tf.saved_model.utils.build_tensor_info(nearest_filenames)
},
method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME)

builder.add_meta_graph_and_variables(
sess, [tf.saved_model.tag_constants.SERVING],
signature_def_map={
tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:
model_signature,
},
assets_collection=tf.get_collection(tf.GraphKeys.ASSET_FILEPATHS),
legacy_init_op=init_op # please try changing `legacy_init_op` to `main_op` if you tf version is not that early
)
builder.save()

这里与一般导出 SavedModel 时代码不同就主要在于 assets_collectionlegacy_init_op 参数。

前者用于 assets 文件处理,会帮助把指定路径文件拷贝到 SavedModel 目录下,并自动更正路径。
后者初始化参数在较新的 api 中应改用 main_op,平时不传值的话,会自动增加 tf.global_variables_initilizer()tf.local_variable_initilizer() 等初始化动作,而我们这里要使用 assets 和 lookup_table,因而不能漏掉。

Start serving

serving 目录样例 serving_models.config
dir_structure_of_portrait2anime_serving_ serving_model_config_content_of_portrait2anime
1
2
3
4
5
6
7
8
sudo docker run -p 8500:8500 -p 8501:8501 \
--mount type=bind,source=path_to_serving_folder/twingan_latent_encoder,target=/models/twingan_latent_encoder \
--mount type=bind,source=path_to_serving_folder/latent_search_model,target=/models/latent_search_model \
--mount type=bind,source=path_to_serving_folder/ugatit_selfie2anime,target=/models/ugatit_selfie2anime \
--mount type=bind,source=path_to_serving_folder/face_detection,target=/models/face_detection \
--mount type=bind,source=path_to_serving_folder/serving_models.config,target=/models/serving_models.config \
-t tensorflow/serving \
--model_config_file=/models/serving_models.config

使用 docker 来执行 serving,需要根据文档安装好 docker 以及 pull 下来 tf/serving 的 docker。(在 windows 环境下如果有 mount 问题,注意要在 docker 客户端内 Settings->Shared Drivers 把权限之类的勾上)

--mount 来传递文件信息,如果要同时 serving 多个模型,则要配置 --model_config_file,注意这个 config 文件也要用 --mount 挂载到 docker 空间内。

model_config_file 对应文件中,每个模型一个 config:
name-> 这个可以自行设定,后面使用模型时 model_spec.name 使用该值;
base_path-> mount 选项中对应 target 的值,
model_platform -> 就是 “tensorflow”, 未找到相关文档。

Execute with flask

模型跑起来了,我们还需要使用 tf 提供的 restful API 或 gRPC API 来与对应端口通信,进而执行模型。这里讲 gRPC API 的使用。

简化版的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 导入依赖
import grpc
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc

# 然后建立好通信:
channel = grpc.insecure_channel("localhost:8500")
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)

# 设定模型参数、输入节点及内容,最后执行predict
request = predict_pb2.PredictRequest()
request.model_spec.name = 'your_model_name'
request.model_spec.signature_name = 'serving_default' # tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY
request.inputs['input_y'].CopyFrom(tf.contrib.util.make_tensor_proto(test_endpoints, shape=test_endpoints.shape))
result = stub.Predict(request, 10.0) # 10 secs timeout
print(result)

我的项目是使用 flask 来运行和部署的,以其中人脸检测(使用mobilenet_ssd进行face_detection)模型的部分代码为例的模板代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
app = Flask(__name__, static_url_path='/static')

def allowed_img_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1] in IMG_EXTENSIONS

class ServingModel:
def __init__(self, model_name, signature_name='serving_default'):
channel = grpc.insecure_channel("localhost:8500")
self.stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)

self.request = predict_pb2.PredictRequest()
self.request.model_spec.name = model_name
self.request.model_spec.signature_name = signature_name

def run(self, input_dict, time_out=30.0):
for key, value in input_dict.items():
self.request.inputs[key].CopyFrom(tf.contrib.util.make_tensor_proto(value, shape=value.shape))
result = self.stub.Predict(self.request, time_out)
return result


def parsed_prediction(result, key):
if not key in result.outputs:
return None
output_proto = result.outputs[key]
shape = [d.size for d in output_proto.tensor_shape.dim]
dtype = tf.dtypes.DType(output_proto.dtype)
if dtype.is_floating:
parsed_data = np.reshape(output_proto.float_val, shape)
elif dtype.is_integer:
parsed_data = np.reshape(output_proto.int_val, shape)
else:
print("#parsed_prediction do not support type {} yet:".format(dtype))
return None
return parsed_data

modelFaceDetection = ServingModel(model_name='face_detection')


def detect_face(img_path, thereshold=0.9, scale=1.75):
assert os.path.exists(img_path), "Image not found on path : " + str(img_path)

image = Image.open(img_path)
image = image.convert('RGB')
image_np = np.array(image) #.astype(np.float32)
image_np = np.expand_dims(image_np, axis=0)

input_dict = {'input_images': image_np}
result = modelFaceDetection.run(input_dict=input_dict)

boxes = parsed_prediction(result, "boxes")
scores = parsed_prediction(result, "scores")
classes = parsed_prediction(result, "classes")
num_detections = parsed_prediction(result, "num_detections")

if np.squeeze(scores)[0] < thereshold:
return None
l, t, r, b = get_absolute_bbox(np.squeeze(boxes)[0], image.size[0], image.size[1], scale=scale)
face = image.crop((l, t, r, b))

cv2_face = cv2.cvtColor(np.asarray(face), cv2.COLOR_RGB2BGR)
return cv2_face


@app.route('/detect_face/image', methods=['POST'])
def api_detect_face():
file = request.files['image_file']

if not (file and allowed_img_file(file.filename)):
return jsonify(packed_response(1001, "请检查上传的图片类型,仅限于png、PNG、jpg、JPG、bmp"))

# temporally store the original uploaded images
img_save_path = os.path.join(img_storage_dir, file.filename)
file.save(img_save_path)
img_save_path_no_static = img_save_path.replace("static/", "")

face = detect_face(img_save_path)

face_image_path = os.path.join(img_storage_dir, 'detected_face_' + file.filename)
cv2.imwrite(face_image_path, face)
face_image_path_no_static = face_image_path.replace("static/", "")

return render_template('index.html', uploaded_image_path=img_save_path_no_static,
face_image_path=face_image_path_no_static)


if __name__ == "__main__":
app.run(host="your_server_ip", port=5000)

其中,ServingModel 用于构建模型 Stub, 需要 inference 时,直接配置 input_dict 并 run 即可。
gRPC 的返回是 protobuffer 格式的,解析有时稍麻烦些,这里写了个 parsed_prediction(), 可以比较方便地解析 int、float 类型数据,基本够用了。

本文所涉及项目,几个模型全跑通后,演示效果一例如图:
result_portrait2anime_of_Kim_Jong_Un