{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "**Chapter 19 – Training and Deploying TensorFlow Models at Scale**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "_This notebook contains all the sample code and solutions to the exercises in chapter 19._" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", " \n", " \n", "
\n", " \"Open\n", " \n", " \n", "
" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "# WORK IN PROGRESS\n", "\n", "\n", "**I'm still working on updating this chapter to the 3rd edition. Please come back in a few weeks.**" ] }, { "cell_type": "markdown", "metadata": { "id": "dFXIv9qNpKzt", "tags": [] }, "source": [ "# Setup" ] }, { "cell_type": "markdown", "metadata": { "id": "8IPbJEmZpKzu" }, "source": [ "This project requires Python 3.7 or above:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "TFSU3FCOpKzu" }, "outputs": [], "source": [ "import sys\n", "\n", "assert sys.version_info >= (3, 7)" ] }, { "cell_type": "markdown", "metadata": { "id": "TAlKky09pKzv" }, "source": [ "It also requires Scikit-Learn ≥ 1.0.1:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "YqCwW7cMpKzw" }, "outputs": [], "source": [ "import sklearn\n", "\n", "assert sklearn.__version__ >= \"1.0.1\"" ] }, { "cell_type": "markdown", "metadata": { "id": "GJtVEqxfpKzw" }, "source": [ "And TensorFlow ≥ 2.6:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "0Piq5se2pKzx" }, "outputs": [], "source": [ "import tensorflow as tf\n", "\n", "assert tf.__version__ >= \"2.6.0\"" ] }, { "cell_type": "markdown", "metadata": { "id": "DDaDoLQTpKzx" }, "source": [ "As we did in earlier chapters, let's define the default font sizes to make the figures prettier:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "8d4TH3NbpKzx" }, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "\n", "plt.rc('font', size=14)\n", "plt.rc('axes', labelsize=14, titlesize=14)\n", "plt.rc('legend', fontsize=14)\n", "plt.rc('xtick', labelsize=10)\n", "plt.rc('ytick', labelsize=10)" ] }, { "cell_type": "markdown", "metadata": { "id": "RcoUIRsvpKzy" }, "source": [ "And let's create the `images/deploy` folder (if it doesn't already exist), and define the `save_fig()` function which is used through this notebook to save the figures in high-res for the book:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "PQFH5Y9PpKzy" }, "outputs": [], "source": [ "from pathlib import Path\n", "\n", "IMAGES_PATH = Path() / \"images\" / \"deploy\"\n", "IMAGES_PATH.mkdir(parents=True, exist_ok=True)\n", "\n", "def save_fig(fig_id, tight_layout=True, fig_extension=\"png\", resolution=300):\n", " path = IMAGES_PATH / f\"{fig_id}.{fig_extension}\"\n", " if tight_layout:\n", " plt.tight_layout()\n", " plt.savefig(path, format=fig_extension, dpi=resolution)" ] }, { "cell_type": "markdown", "metadata": { "id": "YTsawKlapKzy" }, "source": [ "This chapter can be very slow without a GPU, so let's make sure there's one, or else issue a warning:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Ekxzo6pOpKzy" }, "outputs": [], "source": [ "if not tf.config.list_physical_devices('GPU'):\n", " print(\"No GPU was detected. Neural nets can be very slow without a GPU.\")\n", " if \"google.colab\" in sys.modules:\n", " print(\"Go to Runtime > Change runtime and select a GPU hardware \"\n", " \"accelerator.\")\n", " if \"kaggle_secrets\" in sys.modules:\n", " print(\"Go to Settings > Accelerator and select GPU.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you are running this notebook in Colab or Kaggle, let's install TensorFlow Server:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if \"google.colab\" in sys.modules or \"kaggle_secrets\" in sys.modules:\n", " !echo \"deb http://storage.googleapis.com/tensorflow-serving-apt stable tensorflow-model-server tensorflow-model-server-universal\" > /etc/apt/sources.list.d/tensorflow-serving.list\n", " !curl https://storage.googleapis.com/tensorflow-serving-apt/tensorflow-serving.release.pub.gpg | apt-key add -\n", " !apt update && apt-get install -y tensorflow-model-server\n", " %pip install -q -U tensorflow-serving-api" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Deploying TensorFlow models to TensorFlow Serving (TFS)\n", "We will use the REST API or the gRPC API." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Save/Load a `SavedModel`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "(X_train_full, y_train_full), (X_test, y_test) = tf.keras.datasets.mnist.load_data()\n", "X_train_full = X_train_full[..., np.newaxis].astype(np.float32) / 255.\n", "X_test = X_test[..., np.newaxis].astype(np.float32) / 255.\n", "X_valid, X_train = X_train_full[:5000], X_train_full[5000:]\n", "y_valid, y_train = y_train_full[:5000], y_train_full[5000:]\n", "X_new = X_test[:3]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "np.random.seed(42)\n", "tf.random.set_seed(42)\n", "\n", "model = tf.keras.Sequential([\n", " tf.keras.layers.Flatten(input_shape=[28, 28, 1]),\n", " tf.keras.layers.Dense(100, activation=\"relu\"),\n", " tf.keras.layers.Dense(10, activation=\"softmax\")\n", "])\n", "model.compile(loss=\"sparse_categorical_crossentropy\",\n", " optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),\n", " metrics=[\"accuracy\"])\n", "model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model.predict(X_new).round(2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_version = \"0001\"\n", "model_name = \"my_mnist_model\"\n", "model_path = Path() / model_name / model_version\n", "model_path" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!rm -rf {model_name}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tf.saved_model.save(model, str(model_path))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's define a `tree()` function to view the structure of the `my_mnist_model` directory:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def tree(path, level=0, indent=4):\n", " if level == 0:\n", " print(f\"{path}/\")\n", " level += 1\n", " sub_paths = sorted(path.iterdir())\n", " sub_dirs = [sub_path for sub_path in sub_paths if sub_path.is_dir()]\n", " filepaths = [sub_path for sub_path in sub_paths if not sub_path in sub_dirs]\n", " indent_str = \" \" * indent * level\n", " for sub_dir in sub_dirs:\n", " print(f\"{indent_str}{sub_dir.name}/\")\n", " tree(sub_dir, level + 1, indent)\n", " for filepath in filepaths:\n", " print(f\"{indent_str}{filepath.name}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tree(model_path.parent)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!saved_model_cli show --dir {model_path}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!saved_model_cli show --dir {model_path} --tag_set serve" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!saved_model_cli show --dir {model_path} --tag_set serve \\\n", " --signature_def serving_default" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!saved_model_cli show --dir {model_path} --all" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's write the new instances to a `npy` file so we can pass them easily to our model:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "np.save(\"my_mnist_tests.npy\", X_new)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "input_name = model.input_names[0]\n", "input_name" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And now let's use `saved_model_cli` to make predictions for the instances we just saved:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!saved_model_cli run --dir {model_path} --tag_set serve \\\n", " --signature_def serving_default \\\n", " --inputs {input_name}=my_mnist_tests.npy" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "np.round(\n", " [[1.14172166e-04, 1.51857336e-07, 9.79080913e-04, 2.77538411e-03,\n", " 3.75553282e-06, 7.66718149e-05, 3.91490929e-08, 9.95566308e-01,\n", " 5.34432293e-05, 4.30987304e-04],\n", " [8.14584550e-04, 3.54881959e-05, 9.88290966e-01, 7.04165967e-03,\n", " 1.27466748e-07, 2.31963830e-04, 2.55776616e-03, 9.73469416e-10,\n", " 1.02734682e-03, 8.74494361e-08],\n", " [4.42889832e-05, 9.70350444e-01, 9.02883708e-03, 2.26117787e-03,\n", " 4.85437602e-04, 2.87237833e-03, 2.26676138e-03, 8.35481752e-03,\n", " 4.03870409e-03, 2.97143910e-04]], 2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## TensorFlow Serving" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Install [Docker](https://docs.docker.com/install/) if you don't have it already. Then run:\n", "\n", "```bash\n", "docker pull tensorflow/serving\n", "\n", "export ML_PATH=$HOME/ml # or wherever this project is\n", "docker run -it --rm -p 8500:8500 -p 8501:8501 \\\n", " -v \"$ML_PATH/my_mnist_model:/models/my_mnist_model\" \\\n", " -e MODEL_NAME=my_mnist_model \\\n", " tensorflow/serving\n", "```\n", "Once you are finished using it, press Ctrl-C to shut down the server." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Alternatively, if `tensorflow_model_server` is installed (e.g., if you are running this notebook in Colab), then the following 3 cells will start the server:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "os.environ[\"MODEL_DIR\"] = str(model_path.absolute().parent)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash --bg\n", "nohup tensorflow_model_server \\\n", " --rest_api_port=8501 \\\n", " --model_name=my_mnist_model \\\n", " --model_base_path=\"${MODEL_DIR}\" >server.log 2>&1" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!tail server.log" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "\n", "input_data_json = json.dumps({\n", " \"signature_name\": \"serving_default\",\n", " \"instances\": X_new.tolist(),\n", "})" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "repr(input_data_json)[:1500] + \"...\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's use TensorFlow Serving's REST API to make predictions:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import requests\n", "\n", "SERVER_URL = 'http://localhost:8501/v1/models/my_mnist_model:predict'\n", "response = requests.post(SERVER_URL, data=input_data_json)\n", "response.raise_for_status() # raise an exception in case of error\n", "response = response.json()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response.keys()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "y_proba = np.array(response[\"predictions\"])\n", "y_proba.round(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Using the gRPC API" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from tensorflow_serving.apis.predict_pb2 import PredictRequest\n", "\n", "request = PredictRequest()\n", "request.model_spec.name = model_name\n", "request.model_spec.signature_name = \"serving_default\"\n", "input_name = model.input_names[0]\n", "request.inputs[input_name].CopyFrom(tf.make_tensor_proto(X_new))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import grpc\n", "from tensorflow_serving.apis import prediction_service_pb2_grpc\n", "\n", "channel = grpc.insecure_channel('localhost:8500')\n", "predict_service = prediction_service_pb2_grpc.PredictionServiceStub(channel)\n", "response = predict_service.Predict(request, timeout=10.0)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Convert the response to a tensor:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "output_name = model.output_names[0]\n", "outputs_proto = response.outputs[output_name]\n", "y_proba = tf.make_ndarray(outputs_proto)\n", "y_proba.round(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Or to a NumPy array if your client does not include the TensorFlow library:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "output_name = model.output_names[0]\n", "outputs_proto = response.outputs[output_name]\n", "shape = [dim.size for dim in outputs_proto.tensor_shape.dim]\n", "y_proba = np.array(outputs_proto.float_val).reshape(shape)\n", "y_proba.round(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Deploying a new model version" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "np.random.seed(42)\n", "tf.random.set_seed(42)\n", "\n", "model = tf.keras.Sequential([\n", " tf.keras.layers.Flatten(input_shape=[28, 28, 1]),\n", " tf.keras.layers.Dense(50, activation=\"relu\"),\n", " tf.keras.layers.Dense(50, activation=\"relu\"),\n", " tf.keras.layers.Dense(10, activation=\"softmax\")\n", "])\n", "model.compile(loss=\"sparse_categorical_crossentropy\",\n", " optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),\n", " metrics=[\"accuracy\"])\n", "history = model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_version = \"0002\"\n", "model_name = \"my_mnist_model\"\n", "model_path = Path() / model_name / model_version\n", "model_path" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tf.saved_model.save(model, str(model_path))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tree(model_path.parent)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Warning**: You may need to wait a minute before the new model is loaded by TensorFlow Serving." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import requests\n", "\n", "SERVER_URL = 'http://localhost:8501/v1/models/my_mnist_model:predict'\n", " \n", "response = requests.post(SERVER_URL, data=input_data_json)\n", "response.raise_for_status()\n", "response = response.json()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response.keys()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "y_proba = np.array(response[\"predictions\"])\n", "y_proba.round(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Deploy the model to Google Cloud AI Platform" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Follow the instructions in the book to deploy the model to Google Cloud AI Platform, download the service account's private key and save it to the `my_service_account_private_key.json` in the project directory. Also, update the `project_id`:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "project_id = \"onyx-smoke-242003\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import googleapiclient.discovery\n", "\n", "os.environ[\"GOOGLE_APPLICATION_CREDENTIALS\"] = \"my_service_account_private_key.json\"\n", "model_id = \"my_mnist_model\"\n", "model_path = \"projects/{}/models/{}\".format(project_id, model_id)\n", "model_path += \"/versions/v0001/\" # if you want to run a specific version\n", "ml_resource = googleapiclient.discovery.build(\"ml\", \"v1\").projects()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def predict(X):\n", " input_data_json = {\"signature_name\": \"serving_default\",\n", " \"instances\": X.tolist()}\n", " request = ml_resource.predict(name=model_path, body=input_data_json)\n", " response = request.execute()\n", " if \"error\" in response:\n", " raise RuntimeError(response[\"error\"])\n", " return np.array([pred[output_name] for pred in response[\"predictions\"]])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Y_probas = predict(X_new)\n", "Y_probas.round(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Using GPUs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Note**: `tf.test.is_gpu_available()` is deprecated. Instead, please use `tf.config.list_physical_devices('GPU')`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#tf.test.is_gpu_available() # deprecated\n", "tf.config.list_physical_devices('GPU')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tf.test.gpu_device_name()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tf.test.is_built_with_cuda()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from tensorflow.python.client.device_lib import list_local_devices\n", "\n", "devices = list_local_devices()\n", "devices" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Distributed Training" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tf.keras.backend.clear_session()\n", "tf.random.set_seed(42)\n", "np.random.seed(42)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def create_model():\n", " return tf.keras.Sequential([\n", " tf.keras.layers.Conv2D(filters=64, kernel_size=7, activation=\"relu\",\n", " padding=\"same\", input_shape=[28, 28, 1]),\n", " tf.keras.layers.MaxPooling2D(pool_size=2),\n", " tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation=\"relu\",\n", " padding=\"same\"), \n", " tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation=\"relu\",\n", " padding=\"same\"),\n", " tf.keras.layers.MaxPooling2D(pool_size=2),\n", " tf.keras.layers.Flatten(),\n", " tf.keras.layers.Dense(units=64, activation='relu'),\n", " tf.keras.layers.Dropout(0.5),\n", " tf.keras.layers.Dense(units=10, activation='softmax'),\n", " ])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "batch_size = 100\n", "model = create_model()\n", "model.compile(loss=\"sparse_categorical_crossentropy\",\n", " optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),\n", " metrics=[\"accuracy\"])\n", "model.fit(X_train, y_train, epochs=10,\n", " validation_data=(X_valid, y_valid), batch_size=batch_size)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tf.keras.backend.clear_session()\n", "tf.random.set_seed(42)\n", "np.random.seed(42)\n", "\n", "distribution = tf.distribute.MirroredStrategy()\n", "\n", "# Change the default all-reduce algorithm:\n", "#distribution = tf.distribute.MirroredStrategy(\n", "# cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())\n", "\n", "# Specify the list of GPUs to use:\n", "#distribution = tf.distribute.MirroredStrategy(devices=[\"/gpu:0\", \"/gpu:1\"])\n", "\n", "# Use the central storage strategy instead:\n", "#distribution = tf.distribute.experimental.CentralStorageStrategy()\n", "\n", "#if IS_COLAB and \"COLAB_TPU_ADDR\" in os.environ:\n", "# tpu_address = \"grpc://\" + os.environ[\"COLAB_TPU_ADDR\"]\n", "#else:\n", "# tpu_address = \"\"\n", "#resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu_address)\n", "#tf.config.experimental_connect_to_cluster(resolver)\n", "#tf.tpu.experimental.initialize_tpu_system(resolver)\n", "#distribution = tf.distribute.experimental.TPUStrategy(resolver)\n", "\n", "with distribution.scope():\n", " model = create_model()\n", " model.compile(loss=\"sparse_categorical_crossentropy\",\n", " optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),\n", " metrics=[\"accuracy\"])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "batch_size = 100 # must be divisible by the number of workers\n", "model.fit(X_train, y_train, epochs=10,\n", " validation_data=(X_valid, y_valid), batch_size=batch_size)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model.predict(X_new)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Custom training loop:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tf.keras.backend.clear_session()\n", "tf.random.set_seed(42)\n", "np.random.seed(42)\n", "\n", "K = tf.keras.backend\n", "\n", "distribution = tf.distribute.MirroredStrategy()\n", "\n", "with distribution.scope():\n", " model = create_model()\n", " optimizer = tf.keras.optimizers.SGD()\n", "\n", "with distribution.scope():\n", " dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).repeat().batch(batch_size)\n", " input_iterator = distribution.make_dataset_iterator(dataset)\n", " \n", "@tf.function\n", "def train_step():\n", " def step_fn(inputs):\n", " X, y = inputs\n", " with tf.GradientTape() as tape:\n", " Y_proba = model(X)\n", " loss = K.sum(tf.keras.losses.sparse_categorical_crossentropy(y, Y_proba)) / batch_size\n", "\n", " grads = tape.gradient(loss, model.trainable_variables)\n", " optimizer.apply_gradients(zip(grads, model.trainable_variables))\n", " return loss\n", "\n", " per_replica_losses = distribution.experimental_run(step_fn, input_iterator)\n", " mean_loss = distribution.reduce(tf.distribute.ReduceOp.SUM,\n", " per_replica_losses, axis=None)\n", " return mean_loss\n", "\n", "n_epochs = 10\n", "with distribution.scope():\n", " input_iterator.initialize()\n", " for epoch in range(n_epochs):\n", " print(\"Epoch {}/{}\".format(epoch + 1, n_epochs))\n", " for iteration in range(len(X_train) // batch_size):\n", " print(\"\\rLoss: {:.3f}\".format(train_step().numpy()), end=\"\")\n", " print()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Training across multiple servers" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A TensorFlow cluster is a group of TensorFlow processes running in parallel, usually on different machines, and talking to each other to complete some work, for example training or executing a neural network. Each TF process in the cluster is called a \"task\" (or a \"TF server\"). It has an IP address, a port, and a type (also called its role or its job). The type can be `\"worker\"`, `\"chief\"`, `\"ps\"` (parameter server) or `\"evaluator\"`:\n", "* Each **worker** performs computations, usually on a machine with one or more GPUs.\n", "* The **chief** performs computations as well, but it also handles extra work such as writing TensorBoard logs or saving checkpoints. There is a single chief in a cluster, typically the first worker (i.e., worker #0).\n", "* A **parameter server** (ps) only keeps track of variable values, it is usually on a CPU-only machine.\n", "* The **evaluator** obviously takes care of evaluation. There is usually a single evaluator in a cluster.\n", "\n", "The set of tasks that share the same type is often called a \"job\". For example, the \"worker\" job is the set of all workers.\n", "\n", "To start a TensorFlow cluster, you must first define it. This means specifying all the tasks (IP address, TCP port, and type). For example, the following cluster specification defines a cluster with 3 tasks (2 workers and 1 parameter server). It's a dictionary with one key per job, and the values are lists of task addresses:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cluster_spec = {\n", " \"worker\": [\n", " \"machine-a.example.com:2222\", # /job:worker/task:0\n", " \"machine-b.example.com:2222\" # /job:worker/task:1\n", " ],\n", " \"ps\": [\"machine-c.example.com:2222\"] # /job:ps/task:0\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Every task in the cluster may communicate with every other task in the server, so make sure to configure your firewall to authorize all communications between these machines on these ports (it's usually simpler if you use the same port on every machine).\n", "\n", "When a task is started, it needs to be told which one it is: its type and index (the task index is also called the task id). A common way to specify everything at once (both the cluster spec and the current task's type and id) is to set the `TF_CONFIG` environment variable before starting the program. It must be a JSON-encoded dictionary containing a cluster specification (under the `\"cluster\"` key), and the type and index of the task to start (under the `\"task\"` key). For example, the following `TF_CONFIG` environment variable defines the same cluster as above, with 2 workers and 1 parameter server, and specifies that the task to start is worker #1:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "\n", "os.environ[\"TF_CONFIG\"] = json.dumps({\n", " \"cluster\": cluster_spec,\n", " \"task\": {\"type\": \"worker\", \"index\": 1}\n", "})\n", "os.environ[\"TF_CONFIG\"]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Some platforms (e.g., Google Cloud ML Engine) automatically set this environment variable for you." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "TensorFlow's `TFConfigClusterResolver` class reads the cluster configuration from this environment variable:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import tensorflow as tf\n", "\n", "resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()\n", "resolver.cluster_spec()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "resolver.task_type" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "resolver.task_id" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's run a simpler cluster with just two worker tasks, both running on the local machine. We will use the `MultiWorkerMirroredStrategy` to train a model across these two tasks.\n", "\n", "The first step is to write the training code. As this code will be used to run both workers, each in its own process, we write this code to a separate Python file, `my_mnist_multiworker_task.py`. The code is relatively straightforward, but there are a couple important things to note:\n", "* We create the `MultiWorkerMirroredStrategy` before doing anything else with TensorFlow.\n", "* Only one of the workers will take care of logging to TensorBoard and saving checkpoints. As mentioned earlier, this worker is called the *chief*, and by convention it is usually worker #0." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile my_mnist_multiworker_task.py\n", "\n", "import numpy as np\n", "import tensorflow as tf\n", "import time\n", "\n", "# At the beginning of the program\n", "distribution = tf.distribute.MultiWorkerMirroredStrategy()\n", "\n", "resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()\n", "print(\"Starting task {}{}\".format(resolver.task_type, resolver.task_id))\n", "\n", "# Only worker #0 will write checkpoints and log to TensorBoard\n", "if resolver.task_id == 0:\n", " root_logdir = Path() / \"my_mnist_multiworker_logs\"\n", " run_id = time.strftime(\"run_%Y_%m_%d-%H_%M_%S\")\n", " run_dir = root_logdir / run_id\n", " callbacks = [\n", " tf.keras.callbacks.TensorBoard(run_dir),\n", " tf.keras.callbacks.ModelCheckpoint(\"my_mnist_multiworker_model.h5\",\n", " save_best_only=True),\n", " ]\n", "else:\n", " callbacks = []\n", "\n", "# Load and prepare the MNIST dataset\n", "(X_train_full, y_train_full), (X_test, y_test) = tf.keras.datasets.mnist.load_data()\n", "X_train_full = X_train_full[..., np.newaxis] / 255.\n", "X_valid, X_train = X_train_full[:5000], X_train_full[5000:]\n", "y_valid, y_train = y_train_full[:5000], y_train_full[5000:]\n", "\n", "with distribution.scope():\n", " model = tf.keras.Sequential([\n", " tf.keras.layers.Conv2D(filters=64, kernel_size=7, activation=\"relu\",\n", " padding=\"same\", input_shape=[28, 28, 1]),\n", " tf.keras.layers.MaxPooling2D(pool_size=2),\n", " tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation=\"relu\",\n", " padding=\"same\"), \n", " tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation=\"relu\",\n", " padding=\"same\"),\n", " tf.keras.layers.MaxPooling2D(pool_size=2),\n", " tf.keras.layers.Flatten(),\n", " tf.keras.layers.Dense(units=64, activation='relu'),\n", " tf.keras.layers.Dropout(0.5),\n", " tf.keras.layers.Dense(units=10, activation='softmax'),\n", " ])\n", " model.compile(loss=\"sparse_categorical_crossentropy\",\n", " optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),\n", " metrics=[\"accuracy\"])\n", "\n", "model.fit(X_train, y_train, validation_data=(X_valid, y_valid),\n", " epochs=10, callbacks=callbacks)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In a real world application, there would typically be a single worker per machine, but in this example we're running both workers on the same machine, so they will both try to use all the available GPU RAM (if this machine has a GPU), and this will likely lead to an Out-Of-Memory (OOM) error. To avoid this, we could use the `CUDA_VISIBLE_DEVICES` environment variable to assign a different GPU to each worker. Alternatively, we can simply disable GPU support, like this:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "os.environ[\"CUDA_VISIBLE_DEVICES\"] = \"-1\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We are now ready to start both workers, each in its own process, using Python's `subprocess` module. Before we start each process, we need to set the `TF_CONFIG` environment variable appropriately, changing only the task index:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import subprocess\n", "\n", "cluster_spec = {\"worker\": [\"127.0.0.1:9901\", \"127.0.0.1:9902\"]}\n", "\n", "for index, worker_address in enumerate(cluster_spec[\"worker\"]):\n", " os.environ[\"TF_CONFIG\"] = json.dumps({\n", " \"cluster\": cluster_spec,\n", " \"task\": {\"type\": \"worker\", \"index\": index}\n", " })\n", " subprocess.Popen(\"python my_mnist_multiworker_task.py\", shell=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "That's it! Our TensorFlow cluster is now running, but we can't see it in this notebook because it's running in separate processes (but if you are running this notebook in Jupyter, you can see the worker logs in Jupyter's server logs).\n", "\n", "Since the chief (worker #0) is writing to TensorBoard, we use TensorBoard to view the training progress. Run the following cell, then click on the settings button (i.e., the gear icon) in the TensorBoard interface and check the \"Reload data\" box to make TensorBoard automatically refresh every 30s. Once the first epoch of training is finished (which may take a few minutes), and once TensorBoard refreshes, the SCALARS tab will appear. Click on this tab to view the progress of the model's training and validation accuracy." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%load_ext tensorboard\n", "%tensorboard --logdir=./my_mnist_multiworker_logs --port=6006" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "That's it! Once training is over, the best checkpoint of the model will be available in the `my_mnist_multiworker_model.h5` file. You can load it using `tf.keras.models.load_model()` and use it for predictions, as usual:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model = tf.keras.models.load_model(\"my_mnist_multiworker_model.h5\")\n", "Y_pred = model.predict(X_new)\n", "Y_pred.argmax(axis=-1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And that's all for today! Hope you found this useful. 😊" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Exercise Solutions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1. to 8." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "1. A SavedModel contains a TensorFlow model, including its architecture (a computation graph) and its weights. It is stored as a directory containing a _saved_model.pb_ file, which defines the computation graph (represented as a serialized protocol buffer), and a _variables_ subdirectory containing the variable values. For models containing a large number of weights, these variable values may be split across multiple files. A SavedModel also includes an _assets_ subdirectory that may contain additional data, such as vocabulary files, class names, or some example instances for this model. To be more accurate, a SavedModel can contain one or more _metagraphs_. A metagraph is a computation graph plus some function signature definitions (including their input and output names, types, and shapes). Each metagraph is identified by a set of tags. To inspect a SavedModel, you can use the command-line tool `saved_model_cli` or just load it using `tf.saved_model.load()` and inspect it in Python.\n", "2. TF Serving allows you to deploy multiple TensorFlow models (or multiple versions of the same model) and make them accessible to all your applications easily via a REST API or a gRPC API. Using your models directly in your applications would make it harder to deploy a new version of a model across all applications. Implementing your own microservice to wrap a TF model would require extra work, and it would be hard to match TF Serving's features. TF Serving has many features: it can monitor a directory and autodeploy the models that are placed there, and you won't have to change or even restart any of your applications to benefit from the new model versions; it's fast, well tested, and scales very well; and it supports A/B testing of experimental models and deploying a new model version to just a subset of your users (in this case the model is called a _canary_). TF Serving is also capable of grouping individual requests into batches to run them jointly on the GPU. To deploy TF Serving, you can install it from source, but it is much simpler to install it using a Docker image. To deploy a cluster of TF Serving Docker images, you can use an orchestration tool such as Kubernetes, or use a fully hosted solution such as Google Cloud AI Platform.\n", "3. To deploy a model across multiple TF Serving instances, all you need to do is configure these TF Serving instances to monitor the same _models_ directory, and then export your new model as a SavedModel into a subdirectory.\n", "4. The gRPC API is more efficient than the REST API. However, its client libraries are not as widely available, and if you activate compression when using the REST API, you can get almost the same performance. So, the gRPC API is most useful when you need the highest possible performance and the clients are not limited to the REST API.\n", "5. To reduce a model's size so it can run on a mobile or embedded device, TFLite uses several techniques:\n", " * It provides a converter which can optimize a SavedModel: it shrinks the model and reduces its latency. To do this, it prunes all the operations that are not needed to make predictions (such as training operations), and it optimizes and fuses operations whenever possible.\n", " * The converter can also perform post-training quantization: this technique dramatically reduces the model’s size, so it’s much faster to download and store.\n", " * It saves the optimized model using the FlatBuffer format, which can be loaded to RAM directly, without parsing. This reduces the loading time and memory footprint.\n", "6. Quantization-aware training consists in adding fake quantization operations to the model during training. This allows the model to learn to ignore the quantization noise; the final weights will be more robust to quantization.\n", "7. Model parallelism means chopping your model into multiple parts and running them in parallel across multiple devices, hopefully speeding up the model during training or inference. Data parallelism means creating multiple exact replicas of your model and deploying them across multiple devices. At each iteration during training, each replica is given a different batch of data, and it computes the gradients of the loss with regard to the model parameters. In synchronous data parallelism, the gradients from all replicas are then aggregated and the optimizer performs a Gradient Descent step. The parameters may be centralized (e.g., on parameter servers) or replicated across all replicas and kept in sync using AllReduce. In asynchronous data parallelism, the parameters are centralized and the replicas run independently from each other, each updating the central parameters directly at the end of each training iteration, without having to wait for the other replicas. To speed up training, data parallelism turns out to work better than model parallelism, in general. This is mostly because it requires less communication across devices. Moreover, it is much easier to implement, and it works the same way for any model, whereas model parallelism requires analyzing the model to determine the best way to chop it into pieces.\n", "8. When training a model across multiple servers, you can use the following distribution strategies:\n", " * The `MultiWorkerMirroredStrategy` performs mirrored data parallelism. The model is replicated across all available servers and devices, and each replica gets a different batch of data at each training iteration and computes its own gradients. The mean of the gradients is computed and shared across all replicas using a distributed AllReduce implementation (NCCL by default), and all replicas perform the same Gradient Descent step. This strategy is the simplest to use since all servers and devices are treated in exactly the same way, and it performs fairly well. In general, you should use this strategy. Its main limitation is that it requires the model to fit in RAM on every replica.\n", " * The `ParameterServerStrategy` performs asynchronous data parallelism. The model is replicated across all devices on all workers, and the parameters are sharded across all parameter servers. Each worker has its own training loop, running asynchronously with the other workers; at each training iteration, each worker gets its own batch of data and fetches the latest version of the model parameters from the parameter servers, then it computes the gradients of the loss with regard to these parameters, and it sends them to the parameter servers. Lastly, the parameter servers perform a Gradient Descent step using these gradients. This strategy is generally slower than the previous strategy, and a bit harder to deploy, since it requires managing parameter servers. However, it can be useful in some situations, especially when you can take advantage of the asynchronous updates, for example to reduce I/O bottlenecks. This depends on many factors, including hardware, network topology, number of servers, model size, and more, so your mileage may vary." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 9.\n", "_Exercise: Train a model (any model you like) and deploy it to TF Serving or Google Cloud AI Platform. Write the client code to query it using the REST API or the gRPC API. Update the model and deploy the new version. Your client code will now query the new version. Roll back to the first version._" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Please follow the steps in the Deploying TensorFlow models to TensorFlow Serving section above." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 10.\n", "_Exercise: Train any model across multiple GPUs on the same machine using the `MirroredStrategy` (if you do not have access to GPUs, you can use Colaboratory with a GPU Runtime and create two virtual GPUs). Train the model again using the `CentralStorageStrategy `and compare the training time._" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Please follow the steps in the [Distributed Training](#Distributed-Training) section above." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 11.\n", "_Exercise: Train a small model on Google Cloud AI Platform, using black box hyperparameter tuning._" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Please follow the instructions on pages 716-717 of the book. You can also read [this documentation page](https://cloud.google.com/ai-platform/training/docs/hyperparameter-tuning-overview) and go through the example in this nice [blog post](https://towardsdatascience.com/how-to-do-bayesian-hyper-parameter-tuning-on-a-blackbox-model-882009552c6d) by Lak Lakshmanan." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.12" } }, "nbformat": 4, "nbformat_minor": 4 }