最近在使用 Tensorflow serving 时踩了一些坑,这里聊作记录,以供后来者参考。
Tensorflow serving 是用来帮助把模型部署到 Server 上的,模型格式为 SavedModel。 之前我的模型都是最后部署到移动端的,而这次的项目需要部署在服务端,也因此是首次使用 serving。
Serving Model 有以下步骤: 1、将模型转换为 SavedModel 格式 (主要讲两个例子,一个是会讲怎么把已有 checkpoint 截取部分转成用来推演的 SavedModel,另外讲一例在 serving 中使用 assets的麻烦事) 2、使用 tf-serving 把模型跑起来(这里使用docker方式) 3、通过 restful API 或 gRPC API 使用模型(这里只讲后者)
Extract a subgraph and export to SavedModel 首先,读者至少要有能力区分 tf 三类模型主要保存方式,即 checkpoint、frozen model(.pb, protobuffer)、SavedModel。 Serving 需要使用 SavedModel 类型的模型文件,如果训练时不是使用 Estimator, 估计很少会有人用 SavedModel 保存最终的网络,因此这里就需要把之前保存好的 pb 或 checkpoint 额外转一下格式。
简单的模型,看下这里官方教程 Serving a TensorFlow Model 一般足以搞定 SavedModel。
如果是 .pb 文件,使用 tf.import_graph_def, 如果是 ckpt, 使用 tf.train.import_meta_graph,总结起来可以用下面的代码方便地导入 graph:https://gist.github.com/BenZstory/3ef2d6e59dc8ff133708c8b6122738b1#file-load_model-python
在本节的例子中,我们的目标是将一个已有模型 ckpt 截取部分来实现 serving,具体来说,我希望把 TwinGAN 的 encoder-decoder 架构中,只固化和截取 encoder 部分,然后导出为 SavedModel 格式,从而减少模型参数。而在这个过程中,假设手上并没有整个模型的架构代码,而只根据 ckpt 中现有的节点名指定输入输出来做截断,那就需要下面的方法了。
整个步骤,包括
1、加载graph 2、指定要截取子图的输入输出节点并固化 3、重新加载子图再导出为 SavedModel
关键就在于第2步的实现方式。如果有熟悉 SavedModel 的同学可能会疑问何必这么麻烦,导出 SavedModel 的时候可以设定好输入输出,足以使用子图。但是我实验下来,#build_signature_def
时设定的inputs
、outputs
并不会自动帮助截取最小子图,这造成输出的 SavedModel 里有太多冗余内容,以我的实际问题为例,增加手动截子图后,把SavedModel大小从 270mb 降低到 11mb。
第1步,使用上文提到的load_model加载好ckpt。 第2步,截取子图并固化的方法参考自这里 ,核心就三行代码:
1 2 3 g = tf.graph_util.convert_variables_to_constants(sess, sess.graph_def, [ENDPOINTS_OP_NAME]) g = tf.graph_util.extract_sub_graph(g, [ENDPOINTS_OP_NAME]) g = tf.graph_util.remove_training_nodes(g, protected_nodes=["images_ph", ENDPOINTS_OP_NAME])
在上面代码中,graph 在 sess 中已经整个加载好,"images_ph"
为输入节点,[ENDPOINTS_OP_NAME]
为输出节点列表。最终 g 是截好的子图,只有子图和固化好的参数。接下来把 sess 中的图 切到子图 g,为保险起见,我这里是把 g 输出后再重新加载:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 TMP_SUBGRAPH_PB_NAME = 'twingan_subgraph_tmp.pb' with open(os.path.join(DATA_ROOT, TMP_SUBGRAPH_PB_NAME), 'wb') as fout: fout.write(g.SerializeToString()) tf.reset_default_graph() pb_path = os.path.join(DATA_ROOT, TMP_SUBGRAPH_PB_NAME) gd = tf.GraphDef() with open(pb_path, 'rb') as f: gd.ParseFromString(f.read()) tf.import_graph_def(gd, name='') now_graph = tf.get_default_graph() images_placeholder = now_graph.get_tensor_by_name('images_ph:0') encoder_endpoints_tensor = now_graph.get_tensor_by_name('encoder_content_4/downsample_to_4x4x256/AvgPool:0')
sess、图、输入输出节点等环境都准备好了,进入第3步,终于导出为 SavedModel 格式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 EXPORT_DIR = os.path.join(DATA_ROOT, 'twingan_latent_encoder', str(int(time.time()))) if os.path.exists(EXPORT_DIR): shutil.rmtree(EXPORT_DIR) builder = tf.saved_model.builder.SavedModelBuilder(EXPORT_DIR) model_signature = tf.saved_model.signature_def_utils.build_signature_def( inputs={ "input_images": tf.saved_model.utils.build_tensor_info(images_placeholder) }, outputs={ "output": tf.saved_model.utils.build_tensor_info(encoder_endpoints_tensor) }, method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME) builder.add_meta_graph_and_variables( sess, [tf.saved_model.tag_constants.SERVING], signature_def_map={ tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: model_signature, }, clear_devices=True ) builder.save()
(Hidden Content)
Export with assets 接下来同样是对模型导出 SavedModel 技巧的讲解,但这一例中要用到 assets。 在我们平常训练、推演模型的时候,由于一般直接在 python 环境下,所以好多信息是通过 python 模块如 numpy 加载数据成为常量后给到模型的,在固化后以 tf.constants 保存,如果导出为 SavedModel,会和整个图结构存在 protobuffer 中。
再回顾下 SavedModel 的目录结构 :
1 2 3 4 5 6 assets/ assets.extra/ variables/ variables.data-?????-of-????? variables.index saved_model.pb|saved_model.pbtxt
参数会保存在 variables 下,constants 和图保存在 .pb 中,那 assets 用来干什么的呢,官网解释如下:
assets 是包含辅助(外部)文件(如词汇表)的子文件夹。资源被复制到 SavedModel 的位置,并且可以在加载特定的 MetaGraphDef 时被读取。
熟悉 Android 开发的都知道项目里有 /assets
用来放资源文件,这里概念也类似。资源文件的分出,从设计上,基本都是因为放不下、加载慢才剥出来,而我这里,正是由于常量太大在转为SavedModel时报错 "A protocol message was rejected because it was too big"
,我的模型会超过2G,在 github-issue 里有相关讨论 。
正确的做法是把不必固化在图里的常量放到 assets 中,在加载图后再加载常量。这里以我的模型为例,讲下是什么要放到 assets 中以及具体操作步骤。
模型目标 我的模型实现一个简单的 top-k 算法来查找特征相似图片。将图片库中每个文件使用特定模型(就是上一例模型)计算得到其隐含空间编码(latent embeddings),对新文件做同样编码,在已知图片空间中搜索得到距离最近的k个图片并返回。已知图片的编码都是预处理好的,topk 计算时会把整个矩阵塞进去,而就是这个数据量太大就会造成我们的问题。要使用 ssets 解决该问题,就需要把这些已知的编码使用 numpy 保存成一个文件,放到 assets 目录中去再使用。
模型实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 anime_latents_file = os.path.join(DATA_ROOT, 'latent', 'anime_latents_bytes') anime_latents_asset_path = os.path.join(DATA_ROOT, 'latent', 'anime_latents_bytes.assets') anime_latents_index_asset_path = os.path.join(DATA_ROOT, 'latent', 'anime_latents_indexes.assets') LATENT_SHAPE = [1, 4, 4, 256] graph = tf.Graph() with graph.as_default(): original_latents_asset_path = tf.constant(anime_latents_asset_path) latents_asset_path = tf.Variable(original_latents_asset_path, name='latents_asset_path', trainable=False, collections=[]) assign_latents_asset_path = latents_asset_path.assign(original_latents_asset_path) original_index_asset_path = tf.constant(anime_latents_index_asset_path) index_asset_path = tf.Variable(original_index_asset_path, name='index_asset_path', trainable=False, collections=[]) assign_index_asset_path = index_asset_path.assign(original_index_asset_path) x_input = tf.read_file(latents_asset_path) x_input = tf.decode_raw(x_input, tf.float32) # decode_raw means the file should be written out like this `f.write(np.array(anime_latents).tobytes())` x_input = tf.reshape(x_input, [-1, *LATENT_SHAPE]) y_input_ph = tf.placeholder(tf.float32, shape=[1, *LATENT_SHAPE], name='y_ph') k_num_ph = tf.placeholder(tf.int32) x_reshaped = tf.layers.flatten(x_input) y_reshaped = tf.layers.flatten(y_input_ph) distance = tf.norm(tf.subtract(x_reshaped, y_reshaped), axis=1) top_k_xvals, top_k_indices = tf.nn.top_k(tf.negative(distance), k=k_num_ph) table = tf.contrib.lookup.index_to_string_table_from_file(vocabulary_file=index_asset_path, default_value="UNKNOWN") nearest_filenames = table.lookup(tf.cast(top_k_indices, tf.int64)) tf.add_to_collection(tf.GraphKeys.ASSET_FILEPATHS, original_latents_asset_path) tf.add_to_collection(tf.GraphKeys.ASSET_FILEPATHS, original_index_asset_path) sess = tf.InteractiveSession(graph=graph, config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=True)) init_global_val = tf.global_variables_initializer() init_assign = tf.group([assign_latents_asset_path, assign_index_asset_path]) init_table = tf.group([tf.tables_initializer()]) init_op = tf.group(init_global_val, init_assign, init_table) sess.run(init_op)
top_k 在 29 行,整个 graph 构建部分,主要进行 资源文件路径和内容的载入,reshape 等预处理,计算top_k,把结果id映射到对应文件名并结束。
处理题述问题及使用 assets 的核心在于 9~11 行代码。资源文件通过路径指明,随后在17行 tf.read_file 加载,而路径则首先用一个 constant 保存具体路径值,然后再使用 tf.assign
方法写入预定的 variable 中,最后使用 variable 中的路径值。为什么要绕一圈这么麻烦而不直接写死路径呢,因为我们的目标是要生成 SavedModel 的,打包放到另外目录时 assets 文件路径当然也会变化,因而路径不能写死,还要借助 34 行 tf.add_to_collection
以及后面导出 SavedModel 时指明 assets_collection
来实现模型导出时路径值的自然过渡。
通过这样的方法,基本实现了 SavedModel 构建模型时 assets 的使用。
读者在上面的代码中应该能明显意识到模型这里其实使用了两个 assets ,除了 anime_latents_asset
,还有一个 anime_latents_index
。后者是为了在计算得到文件 id 后,便利地映射到对应文件名中去,具体映射的实现借助 31 行代码 [tf.contrib.lookup.index_to_string_table_from_file
] 提供的一个 table 数据结构来完成,可以查阅相关文档 了解更多具体用法。这个 api 是被设计用来把 word_id 映射到字典中具体 String 的,做 NLP 的会更多用到它,我也是从一个讨论怎样 Serving Seq2Seq 的github-issue 中才知道这个 api 的。
不过在结合使用 index_to_string_table_from_file 和 serving 的时候,还是另外踩进了一个深坑:
这只 api 的实现需要创建一种 tf.lookup.StaticHashTable
格式,而这种格式需要一个额外的初始化动作,即 tf.tables_initializer(),这也是为什么在官方serving基础教程 的转换代码中有写 main_op=tf.tables_initializer()
。OK,那我们在使用模型以及转为 SavedModel 的时候都注意下要增加它的初始化操作,即上面代码中 44 行,把各个 initilizer 一个不落组合在一起,并在下面 模型导出
一节的代码中,传给legacy_init_op(或main_op)来实现初始化。然后,问题出现了,在实际 serving 跑起来时,会有概率报以下错误:
1 2 E tensorflow_serving/util/retrier.cc:37] Loading servable: {name: latent_search_model version: 1565713790} failed: Failed precondition: Attempting to use uninitialized value index_asset_path [[{{node index_asset_path/read}}]]
在创建 look_table 的时候,发现 index_asset_path 这个 variable 节点还没初始化而报错。而有时候运气好的话又会正常运行。
这是因为,虽然我们把 variable 和 table 的初始化都给到了初始化参数中,但是这个初始化只接受一个operator,我们使用 tf.group() 把多个 initilizer 组合在一起,在实际初始化时,是并行无序做的。但我们的模型中,确实又有 table 对某个 variable 的依赖,进而报错。
对这个问题,我尝试在构建模型时和 tf.group 各个 initilizer 时增加 tf.control_dependencies()
来控制初始化顺序,但改来改去还是没效果,最是初始化还是并行跑的。也考虑如果简单的话干脆改下 serving 初始化源码,但是因为有概率 serving 能直接跑起来,只要初始化顺利后就能稳定运行,tf-serving 模块又是用 c++ 实现的,于是就没动力继续改了。我认为这个应该是 tf 的设计缺陷,并已经在 tf-serving 提了issue 。
更新,该 issue 已有结论,需要结合使用 ResourceVariable
以及 read_value
来为 index_path 的 variable 和 assigner 绑定依赖,具体讨论细节详见 issue,针对本项目,修改后的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 from tensorflow.python.ops import resource_variable_ops as rr graph = tf.Graph() with graph.as_default(): original_latents_asset_path = tf.constant(anime_latents_asset_path) latents_asset_path = rr.ResourceVariable(original_latents_asset_path, name='latents_asset_path', trainable=False, collections=[]) assign_latents_asset_path = latents_asset_path.assign(original_latents_asset_path) original_index_asset_path = tf.constant(anime_latents_index_asset_path) index_asset_path = rr.ResourceVariable(original_index_asset_path, name='index_asset_path', trainable=False, collections=[]) assign_index_asset_path = index_asset_path.assign(original_index_asset_path) with tf.control_dependencies([assign_latents_asset_path]): x_input = tf.read_file(latents_asset_path.read_value()) x_input = tf.decode_raw(x_input, tf.float32) x_input = tf.reshape(x_input, [-1, *LATENT_SHAPE]) y_input_ph = tf.placeholder(tf.float32, shape=[1, *LATENT_SHAPE], name='y_ph') k_num_ph = tf.placeholder(tf.int32) x_reshaped = tf.layers.flatten(x_input) y_reshaped = tf.layers.flatten(y_input_ph) distance = tf.norm(tf.subtract(x_reshaped, y_reshaped), axis=1) top_k_xvals, top_k_indices = tf.nn.top_k(tf.negative(distance), k=k_num_ph) with tf.control_dependencies([assign_index_asset_path]): table = tf.contrib.lookup.index_to_string_table_from_file(vocabulary_file=index_asset_path.read_value(), default_value="UNKNOWN") nearest_filenames = table.lookup(tf.cast(top_k_indices, tf.int64)) tf.add_to_collection(tf.GraphKeys.ASSET_FILEPATHS, original_latents_asset_path) tf.add_to_collection(tf.GraphKeys.ASSET_FILEPATHS, original_index_asset_path) init_op = tf.group(tf.global_variables_initializer(), tf.tables_initializer())
模型导出 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 EXPORT_DIR = os.path.join(DATA_ROOT, 'latent_search_model', str(int(time.time()))) if os.path.exists(EXPORT_DIR): shutil.rmtree(EXPORT_DIR) builder = tf.saved_model.builder.SavedModelBuilder(EXPORT_DIR) model_signature = tf.saved_model.signature_def_utils.build_signature_def( inputs={ "k_num": tf.saved_model.utils.build_tensor_info(k_num_ph), "target_latent": tf.saved_model.utils.build_tensor_info(y_input_ph) }, outputs={ "xvals": tf.saved_model.utils.build_tensor_info(top_k_xvals), "indices": tf.saved_model.utils.build_tensor_info(top_k_indices), "filenames": tf.saved_model.utils.build_tensor_info(nearest_filenames) }, method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME) builder.add_meta_graph_and_variables( sess, [tf.saved_model.tag_constants.SERVING], signature_def_map={ tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: model_signature, }, assets_collection=tf.get_collection(tf.GraphKeys.ASSET_FILEPATHS), legacy_init_op=init_op # please try changing `legacy_init_op` to `main_op` if you tf version is not that early ) builder.save()
这里与一般导出 SavedModel 时代码不同就主要在于 assets_collection
和 legacy_init_op
参数。
前者用于 assets 文件处理,会帮助把指定路径文件拷贝到 SavedModel 目录下,并自动更正路径。 后者初始化参数在较新的 api 中应改用 main_op
,平时不传值的话,会自动增加 tf.global_variables_initilizer()
、tf.local_variable_initilizer()
等初始化动作,而我们这里要使用 assets 和 lookup_table,因而不能漏掉。
Start serving
serving 目录样例
serving_models.config
1 2 3 4 5 6 7 8 sudo docker run -p 8500:8500 -p 8501:8501 \ --mount type=bind,source=path_to_serving_folder/twingan_latent_encoder,target=/models/twingan_latent_encoder \ --mount type=bind,source=path_to_serving_folder/latent_search_model,target=/models/latent_search_model \ --mount type=bind,source=path_to_serving_folder/ugatit_selfie2anime,target=/models/ugatit_selfie2anime \ --mount type=bind,source=path_to_serving_folder/face_detection,target=/models/face_detection \ --mount type=bind,source=path_to_serving_folder/serving_models.config,target=/models/serving_models.config \ -t tensorflow/serving \ --model_config_file=/models/serving_models.config
使用 docker 来执行 serving ,需要根据文档安装好 docker 以及 pull 下来 tf/serving 的 docker。(在 windows 环境下如果有 mount 问题,注意要在 docker 客户端内 Settings->Shared Drivers 把权限之类的勾上)
--mount
来传递文件信息,如果要同时 serving 多个模型,则要配置 --model_config_file
,注意这个 config 文件也要用 --mount
挂载到 docker 空间内。
在 model_config_file
对应文件中,每个模型一个 config:name
-> 这个可以自行设定,后面使用模型时 model_spec.name
使用该值;base_path
-> mount
选项中对应 target 的值,model_platform
-> 就是 “tensorflow”, 未找到相关文档。
Execute with flask 模型跑起来了,我们还需要使用 tf 提供的 restful API 或 gRPC API 来与对应端口通信,进而执行模型。这里讲 gRPC API 的使用。
简化版的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # 导入依赖 import grpc from tensorflow_serving.apis import predict_pb2 from tensorflow_serving.apis import prediction_service_pb2_grpc # 然后建立好通信: channel = grpc.insecure_channel("localhost:8500") stub = prediction_service_pb2_grpc.PredictionServiceStub(channel) # 设定模型参数、输入节点及内容,最后执行predict request = predict_pb2.PredictRequest() request.model_spec.name = 'your_model_name' request.model_spec.signature_name = 'serving_default' # tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY request.inputs['input_y'].CopyFrom(tf.contrib.util.make_tensor_proto(test_endpoints, shape=test_endpoints.shape)) result = stub.Predict(request, 10.0) # 10 secs timeout print(result)
我的项目是使用 flask 来运行和部署的,以其中人脸检测(使用mobilenet_ssd进行face_detection)模型的部分代码为例的模板代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 app = Flask(__name__, static_url_path='/static') def allowed_img_file(filename): return '.' in filename and filename.rsplit('.', 1)[1] in IMG_EXTENSIONS class ServingModel: def __init__(self, model_name, signature_name='serving_default'): channel = grpc.insecure_channel("localhost:8500") self.stub = prediction_service_pb2_grpc.PredictionServiceStub(channel) self.request = predict_pb2.PredictRequest() self.request.model_spec.name = model_name self.request.model_spec.signature_name = signature_name def run(self, input_dict, time_out=30.0): for key, value in input_dict.items(): self.request.inputs[key].CopyFrom(tf.contrib.util.make_tensor_proto(value, shape=value.shape)) result = self.stub.Predict(self.request, time_out) return result def parsed_prediction(result, key): if not key in result.outputs: return None output_proto = result.outputs[key] shape = [d.size for d in output_proto.tensor_shape.dim] dtype = tf.dtypes.DType(output_proto.dtype) if dtype.is_floating: parsed_data = np.reshape(output_proto.float_val, shape) elif dtype.is_integer: parsed_data = np.reshape(output_proto.int_val, shape) else: print("#parsed_prediction do not support type {} yet:".format(dtype)) return None return parsed_data modelFaceDetection = ServingModel(model_name='face_detection') def detect_face(img_path, thereshold=0.9, scale=1.75): assert os.path.exists(img_path), "Image not found on path : " + str(img_path) image = Image.open(img_path) image = image.convert('RGB') image_np = np.array(image) #.astype(np.float32) image_np = np.expand_dims(image_np, axis=0) input_dict = {'input_images': image_np} result = modelFaceDetection.run(input_dict=input_dict) boxes = parsed_prediction(result, "boxes") scores = parsed_prediction(result, "scores") classes = parsed_prediction(result, "classes") num_detections = parsed_prediction(result, "num_detections") if np.squeeze(scores)[0] < thereshold: return None l, t, r, b = get_absolute_bbox(np.squeeze(boxes)[0], image.size[0], image.size[1], scale=scale) face = image.crop((l, t, r, b)) cv2_face = cv2.cvtColor(np.asarray(face), cv2.COLOR_RGB2BGR) return cv2_face @app.route('/detect_face/image', methods=['POST']) def api_detect_face(): file = request.files['image_file'] if not (file and allowed_img_file(file.filename)): return jsonify(packed_response(1001, "请检查上传的图片类型,仅限于png、PNG、jpg、JPG、bmp")) # temporally store the original uploaded images img_save_path = os.path.join(img_storage_dir, file.filename) file.save(img_save_path) img_save_path_no_static = img_save_path.replace("static/", "") face = detect_face(img_save_path) face_image_path = os.path.join(img_storage_dir, 'detected_face_' + file.filename) cv2.imwrite(face_image_path, face) face_image_path_no_static = face_image_path.replace("static/", "") return render_template('index.html', uploaded_image_path=img_save_path_no_static, face_image_path=face_image_path_no_static) if __name__ == "__main__": app.run(host="your_server_ip", port=5000)
其中,ServingModel 用于构建模型 Stub, 需要 inference 时,直接配置 input_dict 并 run 即可。 gRPC 的返回是 protobuffer 格式的,解析有时稍麻烦些,这里写了个 parsed_prediction(), 可以比较方便地解析 int、float 类型数据,基本够用了。
本文所涉及项目,几个模型全跑通后,演示效果一例如图: