handson-ml/19_training_and_deploying_a...

3173 lines
115 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Chapter 19 Training and Deploying TensorFlow Models at Scale**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"_This notebook contains all the sample code and solutions to the exercises in chapter 19._"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<table align=\"left\">\n",
" <td>\n",
" <a href=\"https://colab.research.google.com/github/ageron/handson-ml3/blob/main/19_training_and_deploying_at_scale.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>\n",
" </td>\n",
" <td>\n",
" <a target=\"_blank\" href=\"https://kaggle.com/kernels/welcome?src=https://github.com/ageron/handson-ml3/blob/main/19_training_and_deploying_at_scale.ipynb\"><img src=\"https://kaggle.com/static/images/open-in-kaggle.svg\" /></a>\n",
" </td>\n",
"</table>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "dFXIv9qNpKzt",
"tags": []
},
"source": [
"# Setup"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "8IPbJEmZpKzu"
},
"source": [
"This project requires Python 3.7 or above:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"id": "TFSU3FCOpKzu"
},
"outputs": [],
"source": [
"import sys\n",
"\n",
"assert sys.version_info >= (3, 7)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "GJtVEqxfpKzw"
},
"source": [
"And TensorFlow ≥ 2.8:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"id": "0Piq5se2pKzx"
},
"outputs": [],
"source": [
"from packaging import version\n",
"import tensorflow as tf\n",
"\n",
"assert version.parse(tf.__version__) >= version.parse(\"2.8.0\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If running on Colab or Kaggle, you need to install the Google AI Platform client library, which will be used later in this notebook. You can ignore the warnings about version incompatibilities.\n",
"\n",
"* **Warning**: On Colab, you must restart the Runtime after the installation, and continue with the next cells."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"import sys\n",
"if \"google.colab\" in sys.modules or \"kaggle_secrets\" in sys.modules:\n",
" %pip install -q -U google-cloud-aiplatform"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "YTsawKlapKzy"
},
"source": [
"This chapter discusses how to run or train a model on one or more GPUs, so let's make sure there's at least one, or else issue a warning:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"id": "Ekxzo6pOpKzy"
},
"outputs": [],
"source": [
"if not tf.config.list_physical_devices('GPU'):\n",
" print(\"No GPU was detected. Neural nets can be very slow without a GPU.\")\n",
" if \"google.colab\" in sys.modules:\n",
" print(\"Go to Runtime > Change runtime and select a GPU hardware \"\n",
" \"accelerator.\")\n",
" if \"kaggle_secrets\" in sys.modules:\n",
" print(\"Go to Settings > Accelerator and select GPU.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Serving a TensorFlow Model"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's start by deploying a model using TF Serving, then we'll deploy to Google Vertex AI."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Using TensorFlow Serving"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The first thing we need to do is to build and train a model, and export it to the SavedModel format."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Exporting SavedModels"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's load the MNIST dataset, scale it, and split it."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Epoch 1/10\n",
"1719/1719 [==============================] - 2s 1ms/step - loss: 0.7012 - accuracy: 0.8241 - val_loss: 0.3715 - val_accuracy: 0.9024\n",
"Epoch 2/10\n",
"1719/1719 [==============================] - 2s 943us/step - loss: 0.3536 - accuracy: 0.9020 - val_loss: 0.2990 - val_accuracy: 0.9144\n",
"Epoch 3/10\n",
"1719/1719 [==============================] - 2s 933us/step - loss: 0.3036 - accuracy: 0.9145 - val_loss: 0.2651 - val_accuracy: 0.9272\n",
"Epoch 4/10\n",
"1719/1719 [==============================] - 2s 965us/step - loss: 0.2736 - accuracy: 0.9231 - val_loss: 0.2436 - val_accuracy: 0.9334\n",
"Epoch 5/10\n",
"1719/1719 [==============================] - 2s 946us/step - loss: 0.2509 - accuracy: 0.9296 - val_loss: 0.2257 - val_accuracy: 0.9364\n",
"Epoch 6/10\n",
"1719/1719 [==============================] - 2s 974us/step - loss: 0.2322 - accuracy: 0.9350 - val_loss: 0.2121 - val_accuracy: 0.9396\n",
"Epoch 7/10\n",
"1719/1719 [==============================] - 2s 959us/step - loss: 0.2161 - accuracy: 0.9400 - val_loss: 0.1970 - val_accuracy: 0.9452\n",
"Epoch 8/10\n",
"1719/1719 [==============================] - 2s 944us/step - loss: 0.2021 - accuracy: 0.9432 - val_loss: 0.1880 - val_accuracy: 0.9476\n",
"Epoch 9/10\n",
"1719/1719 [==============================] - 2s 945us/step - loss: 0.1898 - accuracy: 0.9470 - val_loss: 0.1778 - val_accuracy: 0.9524\n",
"Epoch 10/10\n",
"1719/1719 [==============================] - 2s 940us/step - loss: 0.1793 - accuracy: 0.9494 - val_loss: 0.1685 - val_accuracy: 0.9544\n",
"INFO:tensorflow:Assets written to: my_mnist_model/0001/assets\n"
]
}
],
"source": [
"from pathlib import Path\n",
"import tensorflow as tf\n",
"\n",
"# extra code load and split the MNIST dataset\n",
"mnist = tf.keras.datasets.mnist.load_data()\n",
"(X_train_full, y_train_full), (X_test, y_test) = mnist\n",
"X_valid, X_train = X_train_full[:5000], X_train_full[5000:]\n",
"y_valid, y_train = y_train_full[:5000], y_train_full[5000:]\n",
"\n",
"# extra code build & train an MNIST model (also handles image preprocessing)\n",
"tf.random.set_seed(42)\n",
"tf.keras.backend.clear_session()\n",
"model = tf.keras.Sequential([\n",
" tf.keras.layers.Flatten(input_shape=[28, 28], dtype=tf.uint8),\n",
" tf.keras.layers.Rescaling(scale=1 / 255),\n",
" tf.keras.layers.Dense(100, activation=\"relu\"),\n",
" tf.keras.layers.Dense(10, activation=\"softmax\")\n",
"])\n",
"model.compile(loss=\"sparse_categorical_crossentropy\",\n",
" optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),\n",
" metrics=[\"accuracy\"])\n",
"model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))\n",
"\n",
"model_name = \"my_mnist_model\"\n",
"model_version = \"0001\"\n",
"model_path = Path(model_name) / model_version\n",
"model.save(model_path, save_format=\"tf\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's take a look at the file tree (we've discussed what each of these file is used for in chapter 10):"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['my_mnist_model/0001',\n",
" 'my_mnist_model/0001/assets',\n",
" 'my_mnist_model/0001/keras_metadata.pb',\n",
" 'my_mnist_model/0001/saved_model.pb',\n",
" 'my_mnist_model/0001/variables',\n",
" 'my_mnist_model/0001/variables/variables.data-00000-of-00001',\n",
" 'my_mnist_model/0001/variables/variables.index']"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sorted([str(path) for path in model_path.parent.glob(\"**/*\")]) # extra code"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's inspect the SavedModel:"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"The given SavedModel contains the following tag-sets:\n",
"'serve'\n"
]
}
],
"source": [
"!saved_model_cli show --dir '{model_path}'"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"The given SavedModel MetaGraphDef contains SignatureDefs with the following keys:\n",
"SignatureDef key: \"__saved_model_init_op\"\n",
"SignatureDef key: \"serving_default\"\n"
]
}
],
"source": [
"!saved_model_cli show --dir '{model_path}' --tag_set serve"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"The given SavedModel SignatureDef contains the following input(s):\n",
" inputs['flatten_input'] tensor_info:\n",
" dtype: DT_UINT8\n",
" shape: (-1, 28, 28)\n",
" name: serving_default_flatten_input:0\n",
"The given SavedModel SignatureDef contains the following output(s):\n",
" outputs['dense_1'] tensor_info:\n",
" dtype: DT_FLOAT\n",
" shape: (-1, 10)\n",
" name: StatefulPartitionedCall:0\n",
"Method name is: tensorflow/serving/predict\n"
]
}
],
"source": [
"!saved_model_cli show --dir '{model_path}' --tag_set serve \\\n",
" --signature_def serving_default"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For even more details, you can run the following command:\n",
"\n",
"```ipython\n",
"!saved_model_cli show --dir '{model_path}' --all\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Installing and Starting TensorFlow Serving"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you are running this notebook in Colab or Kaggle, TensorFlow Server needs to be installed:"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"if \"google.colab\" in sys.modules or \"kaggle_secrets\" in sys.modules:\n",
" url = \"https://storage.googleapis.com/tensorflow-serving-apt\"\n",
" src = \"stable tensorflow-model-server tensorflow-model-server-universal\"\n",
" !echo 'deb {url} {src}' > /etc/apt/sources.list.d/tensorflow-serving.list\n",
" !curl '{url}/tensorflow-serving.release.pub.gpg' | apt-key add -\n",
" !apt update -q && apt-get install -y tensorflow-model-server\n",
" %pip install -q -U tensorflow-serving-api"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If `tensorflow_model_server` is installed (e.g., if you are running this notebook in Colab), then the following 2 cells will start the server. If your OS is Windows, you may need to run the `tensorflow_model_server` command in a terminal, and replace `${MODEL_DIR}` with the full path to the `my_mnist_model` directory."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"os.environ[\"MODEL_DIR\"] = str(model_path.parent.absolute())"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"%%bash --bg\n",
"tensorflow_model_server \\\n",
" --port=8500 \\\n",
" --rest_api_port=8501 \\\n",
" --model_name=my_mnist_model \\\n",
" --model_base_path=\"${MODEL_DIR}\" >my_server.log 2>&1"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you are running this notebook on your own machine, and you prefer to install TF Serving using Docker, first make sure [Docker](https://docs.docker.com/install/) is installed, then run the following commands in a terminal. You must replace `/path/to/my_mnist_model` with the appropriate absolute path to the `my_mnist_model` directory, but do not modify the container path `/models/my_mnist_model`.\n",
"\n",
"```bash\n",
"docker pull tensorflow/serving # downloads the latest TF Serving image\n",
"\n",
"docker run -it --rm -v \"/path/to/my_mnist_model:/models/my_mnist_model\" \\\n",
" -p 8500:8500 -p 8501:8501 -e MODEL_NAME=my_mnist_model tensorflow/serving\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Querying TF Serving through the REST API"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, let's send a REST query to TF Serving:"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"\n",
"X_new = X_test[:3] # pretend we have 3 new digit images to classify\n",
"request_json = json.dumps({\n",
" \"signature_name\": \"serving_default\",\n",
" \"instances\": X_new.tolist(),\n",
"})"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'{\"signature_name\": \"serving_default\", \"instances\": [[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0..., 0, 0]]]}'"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"request_json[:100] + \"...\" + request_json[-10:]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's use TensorFlow Serving's REST API to make predictions:"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"import requests\n",
"\n",
"server_url = \"http://localhost:8501/v1/models/my_mnist_model:predict\"\n",
"response = requests.post(server_url, data=request_json)\n",
"response.raise_for_status() # raise an exception in case of error\n",
"response = response.json()"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"array([[0. , 0. , 0. , 0. , 0. , 0. , 0. , 1. , 0. , 0. ],\n",
" [0. , 0. , 0.99, 0.01, 0. , 0. , 0. , 0. , 0. , 0. ],\n",
" [0. , 0.97, 0.01, 0. , 0. , 0. , 0. , 0.01, 0. , 0. ]])"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import numpy as np\n",
"\n",
"y_proba = np.array(response[\"predictions\"])\n",
"y_proba.round(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Querying TF Serving through the gRPC API"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"from tensorflow_serving.apis.predict_pb2 import PredictRequest\n",
"\n",
"request = PredictRequest()\n",
"request.model_spec.name = model_name\n",
"request.model_spec.signature_name = \"serving_default\"\n",
"input_name = model.input_names[0] # == \"flatten_input\"\n",
"request.inputs[input_name].CopyFrom(tf.make_tensor_proto(X_new))"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"import grpc\n",
"from tensorflow_serving.apis import prediction_service_pb2_grpc\n",
"\n",
"channel = grpc.insecure_channel('localhost:8500')\n",
"predict_service = prediction_service_pb2_grpc.PredictionServiceStub(channel)\n",
"response = predict_service.Predict(request, timeout=10.0)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Convert the response to a tensor:"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"output_name = model.output_names[0]\n",
"outputs_proto = response.outputs[output_name]\n",
"y_proba = tf.make_ndarray(outputs_proto)"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"array([[0. , 0. , 0. , 0. , 0. , 0. , 0. , 1. , 0. , 0. ],\n",
" [0. , 0. , 0.99, 0.01, 0. , 0. , 0. , 0. , 0. , 0. ],\n",
" [0. , 0.97, 0.01, 0. , 0. , 0. , 0. , 0.01, 0. , 0. ]],\n",
" dtype=float32)"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"y_proba.round(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If your client does not include the TensorFlow library, you can convert the response to a NumPy array like this:"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"array([[0. , 0. , 0. , 0. , 0. , 0. , 0. , 1. , 0. , 0. ],\n",
" [0. , 0. , 0.99, 0.01, 0. , 0. , 0. , 0. , 0. , 0. ],\n",
" [0. , 0.97, 0.01, 0. , 0. , 0. , 0. , 0.01, 0. , 0. ]])"
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# extra code shows how to avoid using tf.make_ndarray()\n",
"output_name = model.output_names[0]\n",
"outputs_proto = response.outputs[output_name]\n",
"shape = [dim.size for dim in outputs_proto.tensor_shape.dim]\n",
"y_proba = np.array(outputs_proto.float_val).reshape(shape)\n",
"y_proba.round(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Deploying a new model version"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Epoch 1/10\n",
"1719/1719 [==============================] - 2s 931us/step - loss: 0.7039 - accuracy: 0.8056 - val_loss: 0.3418 - val_accuracy: 0.9042\n",
"Epoch 2/10\n",
"1719/1719 [==============================] - 1s 855us/step - loss: 0.3204 - accuracy: 0.9082 - val_loss: 0.2674 - val_accuracy: 0.9242\n",
"Epoch 3/10\n",
"1719/1719 [==============================] - 2s 883us/step - loss: 0.2650 - accuracy: 0.9235 - val_loss: 0.2227 - val_accuracy: 0.9368\n",
"Epoch 4/10\n",
"1719/1719 [==============================] - 1s 869us/step - loss: 0.2319 - accuracy: 0.9329 - val_loss: 0.2032 - val_accuracy: 0.9432\n",
"Epoch 5/10\n",
"1719/1719 [==============================] - 1s 870us/step - loss: 0.2089 - accuracy: 0.9399 - val_loss: 0.1833 - val_accuracy: 0.9482\n",
"Epoch 6/10\n",
"1719/1719 [==============================] - 1s 871us/step - loss: 0.1908 - accuracy: 0.9446 - val_loss: 0.1740 - val_accuracy: 0.9498\n",
"Epoch 7/10\n",
"1719/1719 [==============================] - 2s 873us/step - loss: 0.1756 - accuracy: 0.9490 - val_loss: 0.1605 - val_accuracy: 0.9540\n",
"Epoch 8/10\n",
"1719/1719 [==============================] - 2s 877us/step - loss: 0.1631 - accuracy: 0.9524 - val_loss: 0.1543 - val_accuracy: 0.9558\n",
"Epoch 9/10\n",
"1719/1719 [==============================] - 2s 879us/step - loss: 0.1517 - accuracy: 0.9567 - val_loss: 0.1460 - val_accuracy: 0.9570\n",
"Epoch 10/10\n",
"1719/1719 [==============================] - 1s 872us/step - loss: 0.1429 - accuracy: 0.9584 - val_loss: 0.1358 - val_accuracy: 0.9618\n"
]
}
],
"source": [
"# extra code build and train a new MNIST model version\n",
"np.random.seed(42)\n",
"tf.random.set_seed(42)\n",
"model = tf.keras.Sequential([\n",
" tf.keras.layers.Flatten(input_shape=[28, 28], dtype=tf.uint8),\n",
" tf.keras.layers.Rescaling(scale=1 / 255),\n",
" tf.keras.layers.Dense(50, activation=\"relu\"),\n",
" tf.keras.layers.Dense(50, activation=\"relu\"),\n",
" tf.keras.layers.Dense(10, activation=\"softmax\")\n",
"])\n",
"model.compile(loss=\"sparse_categorical_crossentropy\",\n",
" optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),\n",
" metrics=[\"accuracy\"])\n",
"history = model.fit(X_train, y_train, epochs=10,\n",
" validation_data=(X_valid, y_valid))"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Assets written to: my_mnist_model/0002/assets\n"
]
}
],
"source": [
"model_version = \"0002\"\n",
"model_path = Path(model_name) / model_version\n",
"model.save(model_path, save_format=\"tf\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's take a look at the file tree again:"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['my_mnist_model/0001',\n",
" 'my_mnist_model/0001/assets',\n",
" 'my_mnist_model/0001/keras_metadata.pb',\n",
" 'my_mnist_model/0001/saved_model.pb',\n",
" 'my_mnist_model/0001/variables',\n",
" 'my_mnist_model/0001/variables/variables.data-00000-of-00001',\n",
" 'my_mnist_model/0001/variables/variables.index',\n",
" 'my_mnist_model/0002',\n",
" 'my_mnist_model/0002/assets',\n",
" 'my_mnist_model/0002/keras_metadata.pb',\n",
" 'my_mnist_model/0002/saved_model.pb',\n",
" 'my_mnist_model/0002/variables',\n",
" 'my_mnist_model/0002/variables/variables.data-00000-of-00001',\n",
" 'my_mnist_model/0002/variables/variables.index']"
]
},
"execution_count": 24,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sorted([str(path) for path in model_path.parent.glob(\"**/*\")]) # extra code"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Warning**: You may need to wait a minute before the new model is loaded by TensorFlow Serving."
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
"import requests\n",
"\n",
"server_url = \"http://localhost:8501/v1/models/my_mnist_model:predict\"\n",
" \n",
"response = requests.post(server_url, data=request_json)\n",
"response.raise_for_status()\n",
"response = response.json()"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"dict_keys(['predictions'])"
]
},
"execution_count": 26,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"response.keys()"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"array([[0. , 0. , 0. , 0. , 0. , 0. , 0. , 1. , 0. , 0. ],\n",
" [0. , 0. , 0.99, 0.01, 0. , 0. , 0. , 0. , 0. , 0. ],\n",
" [0. , 0.99, 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ]])"
]
},
"execution_count": 27,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"y_proba = np.array(response[\"predictions\"])\n",
"y_proba.round(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Creating a Prediction Service on Vertex AI"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Follow the instructions in the book to create a Google Cloud Platform account and activate the Vertex AI and Cloud Storage APIs. Then, if you're running this notebook in Colab, you can run the following cell to authenticate using the same Google account as you used with Google Cloud Platform, and authorize this Colab to access your data.\n",
"\n",
"**WARNING: only do this if you trust this notebook!**\n",
"* Be extra careful if this is not the official notebook from https://github.com/ageron/handson-ml3: the Colab URL should start with https://colab.research.google.com/github/ageron/handson-ml3. Or else, the code could do whatever it wants with your data.\n",
"\n",
"If you are not running this notebook in Colab, you must follow the instructions in the book to create a service account and generate a key for it, download it to this notebook's directory, and name it `my_service_account_key.json` (or make sure the `GOOGLE_APPLICATION_CREDENTIALS` environment variable points to your key)."
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [],
"source": [
"project_id = \"my_project\" ##### CHANGE THIS TO YOUR PROJECT ID #####\n",
"\n",
"if \"google.colab\" in sys.modules:\n",
" from google.colab import auth\n",
" auth.authenticate_user()\n",
"elif \"kaggle_secrets\" in sys.modules:\n",
" from kaggle_secrets import UserSecretsClient\n",
" UserSecretsClient().set_gcloud_credentials(project=project_id)\n",
"else:\n",
" os.environ[\"GOOGLE_APPLICATION_CREDENTIALS\"] = \"my_service_account_key.json\""
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [],
"source": [
"from google.cloud import storage\n",
"\n",
"bucket_name = \"my_bucket\" ##### CHANGE THIS TO A UNIQUE BUCKET NAME #####\n",
"location = \"us-central1\"\n",
"\n",
"storage_client = storage.Client(project=project_id)\n",
"bucket = storage_client.create_bucket(bucket_name, location=location)\n",
"#bucket = storage_client.bucket(bucket_name) # to reuse a bucket instead"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [],
"source": [
"def upload_directory(bucket, dirpath):\n",
" dirpath = Path(dirpath)\n",
" for filepath in dirpath.glob(\"**/*\"):\n",
" if filepath.is_file():\n",
" blob = bucket.blob(filepath.relative_to(dirpath.parent).as_posix())\n",
" blob.upload_from_filename(filepath)\n",
"\n",
"upload_directory(bucket, \"my_mnist_model\")"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [],
"source": [
"# extra code a much faster multithreaded implementation of upload_directory()\n",
"# which also accepts a prefix for the target path, and prints stuff\n",
"\n",
"from concurrent import futures\n",
"\n",
"def upload_file(bucket, filepath, blob_path):\n",
" blob = bucket.blob(blob_path)\n",
" blob.upload_from_filename(filepath)\n",
"\n",
"def upload_directory(bucket, dirpath, prefix=None, max_workers=50):\n",
" dirpath = Path(dirpath)\n",
" prefix = prefix or dirpath.name\n",
" with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:\n",
" future_to_filepath = {\n",
" executor.submit(\n",
" upload_file,\n",
" bucket, filepath,\n",
" f\"{prefix}/{filepath.relative_to(dirpath).as_posix()}\"\n",
" ): filepath\n",
" for filepath in sorted(dirpath.glob(\"**/*\"))\n",
" if filepath.is_file()\n",
" }\n",
" for future in futures.as_completed(future_to_filepath):\n",
" filepath = future_to_filepath[future]\n",
" try:\n",
" result = future.result()\n",
" except Exception as ex:\n",
" print(f\"Error uploading {filepath!s:60}: {ex}\") # f!s is str(f)\n",
" else:\n",
" print(f\"Uploaded {filepath!s:60}\", end=\"\\r\")\n",
"\n",
" print(f\"Uploaded {dirpath!s:60}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Alternatively, if you installed Google Cloud CLI (it's preinstalled on Colab), then you can use the following `gsutil` command:"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [],
"source": [
"#!gsutil -m cp -r my_mnist_model gs://{bucket_name}/"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Creating Model\n",
"Create Model backing LRO: projects/522977795627/locations/us-central1/models/4798114811986575360/operations/53403898236370944\n",
"Model created. Resource name: projects/522977795627/locations/us-central1/models/4798114811986575360\n",
"To use this Model in another session:\n",
"model = aiplatform.Model('projects/522977795627/locations/us-central1/models/4798114811986575360')\n"
]
}
],
"source": [
"from google.cloud import aiplatform\n",
"\n",
"server_image = \"gcr.io/cloud-aiplatform/prediction/tf2-gpu.2-8:latest\"\n",
"\n",
"aiplatform.init(project=project_id, location=location)\n",
"mnist_model = aiplatform.Model.upload(\n",
" display_name=\"mnist\",\n",
" artifact_uri=f\"gs://{bucket_name}/my_mnist_model/0001\",\n",
" serving_container_image_uri=server_image,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Warning**: this cell may take several minutes to run, as it waits for Vertex AI to provision the compute nodes:"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Creating Endpoint\n",
"Create Endpoint backing LRO: projects/522977795627/locations/us-central1/endpoints/5133373499481522176/operations/4135354010494304256\n",
"Endpoint created. Resource name: projects/522977795627/locations/us-central1/endpoints/5133373499481522176\n",
"To use this Endpoint in another session:\n",
"endpoint = aiplatform.Endpoint('projects/522977795627/locations/us-central1/endpoints/5133373499481522176')\n",
"Deploying Model projects/522977795627/locations/us-central1/models/4798114811986575360 to Endpoint : projects/522977795627/locations/us-central1/endpoints/5133373499481522176\n",
"Deploy Endpoint model backing LRO: projects/522977795627/locations/us-central1/endpoints/5133373499481522176/operations/388359120522051584\n",
"Endpoint model deployed. Resource name: projects/522977795627/locations/us-central1/endpoints/5133373499481522176\n"
]
}
],
"source": [
"endpoint = aiplatform.Endpoint.create(display_name=\"mnist-endpoint\")\n",
"\n",
"endpoint.deploy(\n",
" mnist_model,\n",
" min_replica_count=1,\n",
" max_replica_count=5,\n",
" machine_type=\"n1-standard-4\",\n",
" accelerator_type=\"NVIDIA_TESLA_K80\",\n",
" accelerator_count=1\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {},
"outputs": [],
"source": [
"response = endpoint.predict(instances=X_new.tolist())"
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"array([[0. , 0. , 0. , 0. , 0. , 0. , 0. , 1. , 0. , 0. ],\n",
" [0. , 0. , 0.99, 0.01, 0. , 0. , 0. , 0. , 0. , 0. ],\n",
" [0. , 0.97, 0.01, 0. , 0. , 0. , 0. , 0.01, 0. , 0. ]])"
]
},
"execution_count": 36,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import numpy as np\n",
"\n",
"np.round(response.predictions, 2)"
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Undeploying Endpoint model: projects/522977795627/locations/us-central1/endpoints/5133373499481522176\n",
"Undeploy Endpoint model backing LRO: projects/522977795627/locations/us-central1/endpoints/5133373499481522176/operations/3579722406467469312\n",
"Endpoint model undeployed. Resource name: projects/522977795627/locations/us-central1/endpoints/5133373499481522176\n",
"Deleting Endpoint : projects/522977795627/locations/us-central1/endpoints/5133373499481522176\n",
"Delete Endpoint backing LRO: projects/522977795627/locations/us-central1/operations/4738836360561950720\n",
"Endpoint deleted. . Resource name: projects/522977795627/locations/us-central1/endpoints/5133373499481522176\n"
]
}
],
"source": [
"endpoint.undeploy_all() # undeploy all models from the endpoint\n",
"endpoint.delete()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Running Batch Prediction Jobs on Vertex AI"
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Uploaded my_mnist_batch \n"
]
}
],
"source": [
"batch_path = Path(\"my_mnist_batch\")\n",
"batch_path.mkdir(exist_ok=True)\n",
"with open(batch_path / \"my_mnist_batch.jsonl\", \"w\") as jsonl_file:\n",
" for image in X_test[:100].tolist():\n",
" jsonl_file.write(json.dumps(image))\n",
" jsonl_file.write(\"\\n\")\n",
"\n",
"upload_directory(bucket, batch_path)"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Creating BatchPredictionJob\n",
"BatchPredictionJob created. Resource name: projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544\n",
"To use this BatchPredictionJob in another session:\n",
"bpj = aiplatform.BatchPredictionJob('projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544')\n",
"View Batch Prediction Job:\n",
"https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/4346926367237996544?project=522977795627\n",
"BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state:\n",
"JobState.JOB_STATE_PENDING\n",
"BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state:\n",
"JobState.JOB_STATE_SUCCEEDED\n",
"BatchPredictionJob run completed. Resource name: projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544\n"
]
}
],
"source": [
"batch_prediction_job = mnist_model.batch_predict(\n",
" job_display_name=\"my_batch_prediction_job\",\n",
" machine_type=\"n1-standard-4\",\n",
" starting_replica_count=1,\n",
" max_replica_count=5,\n",
" accelerator_type=\"NVIDIA_TESLA_K80\",\n",
" accelerator_count=1,\n",
" gcs_source=[f\"gs://{bucket_name}/{batch_path.name}/my_mnist_batch.jsonl\"],\n",
" gcs_destination_prefix=f\"gs://{bucket_name}/my_mnist_predictions/\",\n",
" sync=True # set to False if you don't want to wait for completion\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"gcs_output_directory: \"gs://my_bucket/my_mnist_predictions/prediction-mnist-2022_04_12T21_30_08_071Z\""
]
},
"execution_count": 40,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"batch_prediction_job.output_info # extra code shows the output directory"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"my_mnist_predictions/prediction-mnist-2022_04_12T21_30_08_071Z/prediction.errors_stats-00000-of-00001\n",
"my_mnist_predictions/prediction-mnist-2022_04_12T21_30_08_071Z/prediction.results-00000-of-00002\n",
"my_mnist_predictions/prediction-mnist-2022_04_12T21_30_08_071Z/prediction.results-00001-of-00002\n"
]
}
],
"source": [
"y_probas = []\n",
"for blob in batch_prediction_job.iter_outputs():\n",
" print(blob.name) # extra code\n",
" if \"prediction.results\" in blob.name:\n",
" for line in blob.download_as_text().splitlines():\n",
" y_proba = json.loads(line)[\"prediction\"]\n",
" y_probas.append(y_proba)"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [],
"source": [
"y_pred = np.argmax(y_probas, axis=1)\n",
"accuracy = np.sum(y_pred == y_test[:100]) / 100"
]
},
{
"cell_type": "code",
"execution_count": 43,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"0.98"
]
},
"execution_count": 43,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"accuracy"
]
},
{
"cell_type": "code",
"execution_count": 44,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Deleting Model : projects/522977795627/locations/us-central1/models/4798114811986575360\n",
"Delete Model backing LRO: projects/522977795627/locations/us-central1/operations/598902403101622272\n",
"Model deleted. . Resource name: projects/522977795627/locations/us-central1/models/4798114811986575360\n"
]
}
],
"source": [
"mnist_model.delete()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's delete all the directories we created on GCS (i.e., all the blobs with these prefixes):"
]
},
{
"cell_type": "code",
"execution_count": 45,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Deleting BatchPredictionJob : projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544\n",
"Delete BatchPredictionJob backing LRO: projects/522977795627/locations/us-central1/operations/6699028098374959104\n",
"BatchPredictionJob deleted. . Resource name: projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544\n"
]
}
],
"source": [
"for prefix in [\"my_mnist_model/\", \"my_mnist_batch/\", \"my_mnist_predictions/\"]:\n",
" blobs = bucket.list_blobs(prefix=prefix)\n",
" for blob in blobs:\n",
" blob.delete()\n",
"\n",
"#bucket.delete() # uncomment and run if you want to delete the bucket itself\n",
"batch_prediction_job.delete()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Deploying a Model to a Mobile or Embedded Device"
]
},
{
"cell_type": "code",
"execution_count": 46,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2022-04-10 09:03:52.237094: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:357] Ignored output_format.\n",
"2022-04-10 09:03:52.237108: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:360] Ignored drop_control_dependency.\n",
"WARNING:absl:Buffer deduplication procedure will be skipped when flatbuffer library is not properly loaded\n",
"2022-04-10 09:03:52.237830: I tensorflow/cc/saved_model/reader.cc:43] Reading SavedModel from: my_mnist_model/0001\n",
"2022-04-10 09:03:52.238869: I tensorflow/cc/saved_model/reader.cc:78] Reading meta graph with tags { serve }\n",
"2022-04-10 09:03:52.238881: I tensorflow/cc/saved_model/reader.cc:119] Reading SavedModel debug info (if present) from: my_mnist_model/0001\n",
"2022-04-10 09:03:52.242108: I tensorflow/cc/saved_model/loader.cc:228] Restoring SavedModel bundle.\n",
"2022-04-10 09:03:52.263868: I tensorflow/cc/saved_model/loader.cc:212] Running initialization op on SavedModel bundle at path: my_mnist_model/0001\n",
"2022-04-10 09:03:52.271298: I tensorflow/cc/saved_model/loader.cc:301] SavedModel load for tags { serve }; Status: success: OK. Took 33470 microseconds.\n",
"2022-04-10 09:03:52.281694: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:237] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.\n"
]
}
],
"source": [
"converter = tf.lite.TFLiteConverter.from_saved_model(str(model_path))\n",
"tflite_model = converter.convert()\n",
"with open(\"my_converted_savedmodel.tflite\", \"wb\") as f:\n",
" f.write(tflite_model)"
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {},
"outputs": [],
"source": [
"# extra code shows how to convert a Keras model\n",
"converter = tf.lite.TFLiteConverter.from_keras_model(model)"
]
},
{
"cell_type": "code",
"execution_count": 48,
"metadata": {},
"outputs": [],
"source": [
"converter.optimizations = [tf.lite.Optimize.DEFAULT]"
]
},
{
"cell_type": "code",
"execution_count": 49,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Assets written to: /var/folders/wy/h39t6kb11pnbb0pzhksd_fqh0000gq/T/tmp6ffbc1qs/assets\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Assets written to: /var/folders/wy/h39t6kb11pnbb0pzhksd_fqh0000gq/T/tmp6ffbc1qs/assets\n",
"WARNING:absl:Buffer deduplication procedure will be skipped when flatbuffer library is not properly loaded\n",
"2022-04-10 09:26:30.319286: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:357] Ignored output_format.\n",
"2022-04-10 09:26:30.319301: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:360] Ignored drop_control_dependency.\n",
"2022-04-10 09:26:30.319417: I tensorflow/cc/saved_model/reader.cc:43] Reading SavedModel from: /var/folders/wy/h39t6kb11pnbb0pzhksd_fqh0000gq/T/tmp6ffbc1qs\n",
"2022-04-10 09:26:30.320420: I tensorflow/cc/saved_model/reader.cc:78] Reading meta graph with tags { serve }\n",
"2022-04-10 09:26:30.320431: I tensorflow/cc/saved_model/reader.cc:119] Reading SavedModel debug info (if present) from: /var/folders/wy/h39t6kb11pnbb0pzhksd_fqh0000gq/T/tmp6ffbc1qs\n",
"2022-04-10 09:26:30.323773: I tensorflow/cc/saved_model/loader.cc:228] Restoring SavedModel bundle.\n",
"2022-04-10 09:26:30.345416: I tensorflow/cc/saved_model/loader.cc:212] Running initialization op on SavedModel bundle at path: /var/folders/wy/h39t6kb11pnbb0pzhksd_fqh0000gq/T/tmp6ffbc1qs\n",
"2022-04-10 09:26:30.354270: I tensorflow/cc/saved_model/loader.cc:301] SavedModel load for tags { serve }; Status: success: OK. Took 34852 microseconds.\n",
"2022-04-10 09:26:30.392352: I tensorflow/lite/tools/optimize/quantize_weights.cc:225] Skipping quantization of tensor sequential/dense_1/MatMul because it has fewer than 1024 elements (1000).\n"
]
}
],
"source": [
"tflite_model = converter.convert()\n",
"with open(\"my_converted_keras_model.tflite\", \"wb\") as f:\n",
" f.write(tflite_model)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Running a Model in a Web Page"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Code examples for this section are hosted on glitch.com, a website that lets you create Web apps for free.\n",
"\n",
"* https://homl.info/tfjscode: a simple TFJS Web app that loads a pretrained model and classifies an image.\n",
"* https://homl.info/tfjswpa: the same Web app setup as a WPA. Try opening this link on various platforms, including mobile devices.\n",
"** https://homl.info/wpacode: this WPA's source code.\n",
"* https://tensorflow.org/js: The TFJS library.\n",
"** https://www.tensorflow.org/js/demos: some fun demos."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Using GPUs to Speed Up Computations"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's check that TensorFlow can see the GPU:"
]
},
{
"cell_type": "code",
"execution_count": 50,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]\n"
]
}
],
"source": [
"physical_gpus = tf.config.list_physical_devices(\"GPU\")\n",
"physical_gpus"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you want your TensorFlow script to use only GPUs \\#0 and \\#1 (based on PCI order), then you can set the environment variables `CUDA_DEVICE_ORDER=PCI_BUS_ID` and `CUDA_VISIBLE_DEVICES=0,1` before starting your script, or in the script itself before using TensorFlow."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Managing the GPU RAM"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To limit the amount of RAM to 2GB per GPU:"
]
},
{
"cell_type": "code",
"execution_count": 51,
"metadata": {},
"outputs": [],
"source": [
"#for gpu in physical_gpus:\n",
"# tf.config.set_logical_device_configuration(\n",
"# gpu,\n",
"# [tf.config.LogicalDeviceConfiguration(memory_limit=2048)]\n",
"# )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To make TensorFlow grab memory as it needs it (only releasing it when the process shuts down):"
]
},
{
"cell_type": "code",
"execution_count": 52,
"metadata": {},
"outputs": [],
"source": [
"#for gpu in physical_gpus:\n",
"# tf.config.experimental.set_memory_growth(gpu, True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Equivalently, you can set the `TF_FORCE_GPU_ALLOW_GROWTH` environment variable to `true` before using TensorFlow."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To split a physical GPU into two logical GPUs:"
]
},
{
"cell_type": "code",
"execution_count": 53,
"metadata": {},
"outputs": [],
"source": [
"#tf.config.set_logical_device_configuration(\n",
"# physical_gpus[0],\n",
"# [tf.config.LogicalDeviceConfiguration(memory_limit=2048),\n",
"# tf.config.LogicalDeviceConfiguration(memory_limit=2048)]\n",
"#)"
]
},
{
"cell_type": "code",
"execution_count": 54,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[LogicalDevice(name='/device:GPU:0', device_type='GPU')]\n"
]
}
],
"source": [
"logical_gpus = tf.config.list_logical_devices(\"GPU\")\n",
"logical_gpus"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Placing Operations and Variables on Devices"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To log every variable and operation placement (this must be run just after importing TensorFlow):"
]
},
{
"cell_type": "code",
"execution_count": 55,
"metadata": {},
"outputs": [],
"source": [
"#tf.get_logger().setLevel(\"DEBUG\") # log level is INFO by default\n",
"#tf.debugging.set_log_device_placement(True)"
]
},
{
"cell_type": "code",
"execution_count": 56,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'/job:localhost/replica:0/task:0/device:GPU:0'"
]
},
"execution_count": 56,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"a = tf.Variable([1., 2., 3.]) # float32 variable goes to the GPU\n",
"a.device"
]
},
{
"cell_type": "code",
"execution_count": 57,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'/job:localhost/replica:0/task:0/device:CPU:0'"
]
},
"execution_count": 57,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"b = tf.Variable([1, 2, 3]) # int32 variable goes to the CPU\n",
"b.device"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can place variables and operations manually on the desired device using a `tf.device()` context:"
]
},
{
"cell_type": "code",
"execution_count": 58,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'/job:localhost/replica:0/task:0/device:CPU:0'"
]
},
"execution_count": 58,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"with tf.device(\"/cpu:0\"):\n",
" c = tf.Variable([1., 2., 3.])\n",
"\n",
"c.device"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you specify a device that does not exist, or for which there is no kernel, TensorFlow will silently fallback to the default placement:"
]
},
{
"cell_type": "code",
"execution_count": 59,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"\"'/job:localhost/replica:0/task:0/device:GPU:0'\""
]
},
"execution_count": 59,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# extra code\n",
"\n",
"with tf.device(\"/gpu:1234\"):\n",
" d = tf.Variable([1., 2., 3.])\n",
"\n",
"d.device"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you want TensorFlow to throw an exception when you try to use a device that does not exist, instead of falling back to the default device:"
]
},
{
"cell_type": "code",
"execution_count": 60,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Could not satisfy device specification '/job:localhost/replica:0/task:0/device:GPU:1000'. enable_soft_placement=0. Supported device types [CPU]. All available devices [/job:localhost/replica:0/task:0/device:CPU:0].\n"
]
}
],
"source": [
"tf.config.set_soft_device_placement(False)\n",
"\n",
"# extra code\n",
"try:\n",
" with tf.device(\"/gpu:1000\"):\n",
" d = tf.Variable([1., 2., 3.])\n",
"except tf.errors.InvalidArgumentError as ex:\n",
" print(ex)\n",
"\n",
"tf.config.set_soft_device_placement(True) # extra code back to soft placement"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parallel Execution Across Multiple Devices"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you want to set the number of inter-op or intra-op threads (this may be useful if you want to avoid saturating the CPU, or if you want to make TensorFlow single-threaded, to run a perfectly reproducible test case):"
]
},
{
"cell_type": "code",
"execution_count": 61,
"metadata": {},
"outputs": [],
"source": [
"#tf.config.threading.set_inter_op_parallelism_threads(10)\n",
"#tf.config.threading.set_intra_op_parallelism_threads(10)"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"# Training Models Across Multiple Devices"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Training at Scale Using the Distribution Strategies API"
]
},
{
"cell_type": "code",
"execution_count": 62,
"metadata": {},
"outputs": [],
"source": [
"# extra code creates a CNN model for MNIST using Keras\n",
"def create_model():\n",
" return tf.keras.Sequential([\n",
" tf.keras.layers.Reshape([28, 28, 1], input_shape=[28, 28],\n",
" dtype=tf.uint8),\n",
" tf.keras.layers.Rescaling(scale=1 / 255),\n",
" tf.keras.layers.Conv2D(filters=64, kernel_size=7, activation=\"relu\",\n",
" padding=\"same\"),\n",
" tf.keras.layers.MaxPooling2D(pool_size=2),\n",
" tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation=\"relu\",\n",
" padding=\"same\"), \n",
" tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation=\"relu\",\n",
" padding=\"same\"),\n",
" tf.keras.layers.MaxPooling2D(pool_size=2),\n",
" tf.keras.layers.Flatten(),\n",
" tf.keras.layers.Dense(units=64, activation=\"relu\"),\n",
" tf.keras.layers.Dropout(0.5),\n",
" tf.keras.layers.Dense(units=10, activation=\"softmax\"),\n",
" ])"
]
},
{
"cell_type": "code",
"execution_count": 63,
"metadata": {},
"outputs": [],
"source": [
"tf.random.set_seed(42)\n",
"\n",
"strategy = tf.distribute.MirroredStrategy()\n",
"\n",
"with strategy.scope():\n",
" model = create_model() # create a Keras model normally\n",
" model.compile(loss=\"sparse_categorical_crossentropy\",\n",
" optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),\n",
" metrics=[\"accuracy\"]) # compile the model normally\n",
"\n",
"batch_size = 100 # preferably divisible by the number of replicas\n",
"model.fit(X_train, y_train, epochs=10,\n",
" validation_data=(X_valid, y_valid), batch_size=batch_size)"
]
},
{
"cell_type": "code",
"execution_count": 64,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"tensorflow.python.distribute.values.MirroredVariable"
]
},
"execution_count": 64,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"type(model.weights[0])"
]
},
{
"cell_type": "code",
"execution_count": 65,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"array([[0., 0., 0., 0., 0., 0., 0., 1., 0., 0.],\n",
" [0., 0., 1., 0., 0., 0., 0., 0., 0., 0.],\n",
" [0., 1., 0., 0., 0., 0., 0., 0., 0., 0.]], dtype=float32)\n"
]
}
],
"source": [
"model.predict(X_new).round(2) # extra code the batch is split across all replicas"
]
},
{
"cell_type": "code",
"execution_count": 66,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Assets written to: my_mirrored_model/assets\n"
]
},
{
"data": {
"text/plain": [
"tensorflow.python.ops.resource_variable_ops.ResourceVariable"
]
},
"execution_count": 66,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# extra code shows that saving a model does not preserve its distribution\n",
"# strategy\n",
"model.save(\"my_mirrored_model\", save_format=\"tf\")\n",
"model = tf.keras.models.load_model(\"my_mirrored_model\")\n",
"type(model.weights[0])"
]
},
{
"cell_type": "code",
"execution_count": 67,
"metadata": {},
"outputs": [],
"source": [
"with strategy.scope():\n",
" model = tf.keras.models.load_model(\"my_mirrored_model\")"
]
},
{
"cell_type": "code",
"execution_count": 68,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"tensorflow.python.distribute.values.MirroredVariable\n"
]
}
],
"source": [
"type(model.weights[0])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you want to specify the list of GPUs to use:"
]
},
{
"cell_type": "code",
"execution_count": 69,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1\n",
"INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')\n"
]
}
],
"source": [
"strategy = tf.distribute.MirroredStrategy(devices=[\"/gpu:0\", \"/gpu:1\"])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you want to change the default all-reduce algorithm:"
]
},
{
"cell_type": "code",
"execution_count": 70,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)\n"
]
}
],
"source": [
"strategy = tf.distribute.MirroredStrategy(\n",
" cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you want to use the `CentralStorageStrategy`:"
]
},
{
"cell_type": "code",
"execution_count": 71,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:CPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:CPU:0'\n"
]
}
],
"source": [
"strategy = tf.distribute.experimental.CentralStorageStrategy()"
]
},
{
"cell_type": "code",
"execution_count": 72,
"metadata": {},
"outputs": [],
"source": [
"# To train on a TPU in Google Colab:\n",
"#if \"google.colab\" in sys.modules and \"COLAB_TPU_ADDR\" in os.environ:\n",
"# tpu_address = \"grpc://\" + os.environ[\"COLAB_TPU_ADDR\"]\n",
"#else:\n",
"# tpu_address = \"\"\n",
"#resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu_address)\n",
"#tf.config.experimental_connect_to_cluster(resolver)\n",
"#tf.tpu.experimental.initialize_tpu_system(resolver)\n",
"#strategy = tf.distribute.experimental.TPUStrategy(resolver)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Training a Model on a TensorFlow Cluster"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A TensorFlow cluster is a group of TensorFlow processes running in parallel, usually on different machines, and talking to each other to complete some work, for example training or executing a neural network. Each TF process in the cluster is called a \"task\" (or a \"TF server\"). It has an IP address, a port, and a type (also called its role or its job). The type can be `\"worker\"`, `\"chief\"`, `\"ps\"` (parameter server) or `\"evaluator\"`:\n",
"* Each **worker** performs computations, usually on a machine with one or more GPUs.\n",
"* The **chief** performs computations as well, but it also handles extra work such as writing TensorBoard logs or saving checkpoints. There is a single chief in a cluster. If it is not defined, then it is worker #0.\n",
"* A **parameter server** (ps) only keeps track of variable values, it is usually on a CPU-only machine.\n",
"* The **evaluator** obviously takes care of evaluation. There is usually a single evaluator in a cluster.\n",
"\n",
"The set of tasks that share the same type is often called a \"job\". For example, the \"worker\" job is the set of all workers.\n",
"\n",
"To start a TensorFlow cluster, you must first define it. This means specifying all the tasks (IP address, TCP port, and type). For example, the following cluster specification defines a cluster with 3 tasks (2 workers and 1 parameter server). It's a dictionary with one key per job, and the values are lists of task addresses:"
]
},
{
"cell_type": "code",
"execution_count": 73,
"metadata": {},
"outputs": [],
"source": [
"cluster_spec = {\n",
" \"worker\": [\n",
" \"machine-a.example.com:2222\", # /job:worker/task:0\n",
" \"machine-b.example.com:2222\" # /job:worker/task:1\n",
" ],\n",
" \"ps\": [\"machine-a.example.com:2221\"] # /job:ps/task:0\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Every task in the cluster may communicate with every other task in the server, so make sure to configure your firewall to authorize all communications between these machines on these ports (it's usually simpler if you use the same port on every machine).\n",
"\n",
"When a task is started, it needs to be told which one it is: its type and index (the task index is also called the task id). A common way to specify everything at once (both the cluster spec and the current task's type and id) is to set the `TF_CONFIG` environment variable before starting the program. It must be a JSON-encoded dictionary containing a cluster specification (under the `\"cluster\"` key), and the type and index of the task to start (under the `\"task\"` key). For example, the following `TF_CONFIG` environment variable defines the same cluster as above, with 2 workers and 1 parameter server, and specifies that the task to start is worker \\#0:"
]
},
{
"cell_type": "code",
"execution_count": 74,
"metadata": {},
"outputs": [],
"source": [
"os.environ[\"TF_CONFIG\"] = json.dumps({\n",
" \"cluster\": cluster_spec,\n",
" \"task\": {\"type\": \"worker\", \"index\": 0}\n",
"})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Some platforms (e.g., Google Vertex AI) automatically set this environment variable for you."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"TensorFlow's `TFConfigClusterResolver` class reads the cluster configuration from this environment variable:"
]
},
{
"cell_type": "code",
"execution_count": 75,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"ClusterSpec({'ps': ['machine-a.example.com:2221'], 'worker': ['machine-a.example.com:2222', 'machine-b.example.com:2222']})"
]
},
"execution_count": 75,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()\n",
"resolver.cluster_spec()"
]
},
{
"cell_type": "code",
"execution_count": 76,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'worker'"
]
},
"execution_count": 76,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"resolver.task_type"
]
},
{
"cell_type": "code",
"execution_count": 77,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"0"
]
},
"execution_count": 77,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"resolver.task_id"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's run a simpler cluster with just two worker tasks, both running on the local machine. We will use the `MultiWorkerMirroredStrategy` to train a model across these two tasks.\n",
"\n",
"The first step is to write the training code. As this code will be used to run both workers, each in its own process, we write this code to a separate Python file, `my_mnist_multiworker_task.py`. The code is relatively straightforward, but there are a couple important things to note:\n",
"* We create the `MultiWorkerMirroredStrategy` before doing anything else with TensorFlow.\n",
"* Only one of the workers will take care of logging to TensorBoard. As mentioned earlier, this worker is called the *chief*. When it is not defined explicitly, then by convention it is worker #0."
]
},
{
"cell_type": "code",
"execution_count": 78,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Writing my_mnist_multiworker_task.py\n"
]
}
],
"source": [
"%%writefile my_mnist_multiworker_task.py\n",
"\n",
"import tempfile\n",
"import tensorflow as tf\n",
"\n",
"strategy = tf.distribute.MultiWorkerMirroredStrategy() # at the start!\n",
"resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()\n",
"print(f\"Starting task {resolver.task_type} #{resolver.task_id}\")\n",
"\n",
"# extra code Load and split the MNIST dataset\n",
"mnist = tf.keras.datasets.mnist.load_data()\n",
"(X_train_full, y_train_full), (X_test, y_test) = mnist\n",
"X_valid, X_train = X_train_full[:5000], X_train_full[5000:]\n",
"y_valid, y_train = y_train_full[:5000], y_train_full[5000:]\n",
"\n",
"with strategy.scope():\n",
" model = tf.keras.Sequential([\n",
" tf.keras.layers.Reshape([28, 28, 1], input_shape=[28, 28],\n",
" dtype=tf.uint8),\n",
" tf.keras.layers.Rescaling(scale=1 / 255),\n",
" tf.keras.layers.Conv2D(filters=64, kernel_size=7, activation=\"relu\",\n",
" padding=\"same\", input_shape=[28, 28, 1]),\n",
" tf.keras.layers.MaxPooling2D(pool_size=2),\n",
" tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation=\"relu\",\n",
" padding=\"same\"), \n",
" tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation=\"relu\",\n",
" padding=\"same\"),\n",
" tf.keras.layers.MaxPooling2D(pool_size=2),\n",
" tf.keras.layers.Flatten(),\n",
" tf.keras.layers.Dense(units=64, activation=\"relu\"),\n",
" tf.keras.layers.Dropout(0.5),\n",
" tf.keras.layers.Dense(units=10, activation=\"softmax\"),\n",
" ])\n",
" model.compile(loss=\"sparse_categorical_crossentropy\",\n",
" optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),\n",
" metrics=[\"accuracy\"])\n",
"\n",
"model.fit(X_train, y_train, validation_data=(X_valid, y_valid), epochs=10)\n",
"\n",
"if resolver.task_id == 0: # the chief saves the model to the right location\n",
" model.save(\"my_mnist_multiworker_model\", save_format=\"tf\")\n",
"else:\n",
" tmpdir = tempfile.mkdtemp() # other workers save to a temporary directory\n",
" model.save(tmpdir, save_format=\"tf\")\n",
" tf.io.gfile.rmtree(tmpdir) # and we can delete this directory at the end!"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In a real world application, there would typically be a single worker per machine, but in this example we're running both workers on the same machine, so they will both try to use all the available GPU RAM (if this machine has a GPU), and this will likely lead to an Out-Of-Memory (OOM) error. To avoid this, we could use the `CUDA_VISIBLE_DEVICES` environment variable to assign a different GPU to each worker. Alternatively, we can simply disable GPU support, by setting `CUDA_VISIBLE_DEVICES` to an empty string."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We are now ready to start both workers, each in its own process. Notice that we change the task index:"
]
},
{
"cell_type": "code",
"execution_count": 79,
"metadata": {},
"outputs": [],
"source": [
"%%bash --bg\n",
"\n",
"export CUDA_VISIBLE_DEVICES=''\n",
"export TF_CONFIG='{\"cluster\": {\"worker\": [\"127.0.0.1:9901\", \"127.0.0.1:9902\"]},\n",
" \"task\": {\"type\": \"worker\", \"index\": 0}}'\n",
"python my_mnist_multiworker_task.py > my_worker_0.log 2>&1"
]
},
{
"cell_type": "code",
"execution_count": 80,
"metadata": {},
"outputs": [],
"source": [
"%%bash --bg\n",
"\n",
"export CUDA_VISIBLE_DEVICES=''\n",
"export TF_CONFIG='{\"cluster\": {\"worker\": [\"127.0.0.1:9901\", \"127.0.0.1:9902\"]},\n",
" \"task\": {\"type\": \"worker\", \"index\": 1}}'\n",
"python my_mnist_multiworker_task.py > my_worker_1.log 2>&1"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Note**: if you get warnings about `AutoShardPolicy`, you can safely ignore them. See [TF issue #42146](https://github.com/tensorflow/tensorflow/issues/42146) for more details."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"That's it! Our TensorFlow cluster is now running, but we can't see it in this notebook because it's running in separate processes (but you can see the progress in `my_worker_*.log`).\n",
"\n",
"Since the chief (worker #0) is writing to TensorBoard, we use TensorBoard to view the training progress. Run the following cell, then click on the settings button (i.e., the gear icon) in the TensorBoard interface and check the \"Reload data\" box to make TensorBoard automatically refresh every 30s. Once the first epoch of training is finished (which may take a few minutes), and once TensorBoard refreshes, the SCALARS tab will appear. Click on this tab to view the progress of the model's training and validation accuracy."
]
},
{
"cell_type": "code",
"execution_count": 81,
"metadata": {},
"outputs": [],
"source": [
"%load_ext tensorboard\n",
"%tensorboard --logdir=./my_mnist_multiworker_logs --port=6006"
]
},
{
"cell_type": "code",
"execution_count": 82,
"metadata": {},
"outputs": [],
"source": [
"# strategy = tf.distribute.MultiWorkerMirroredStrategy(\n",
"# communication_options=tf.distribute.experimental.CommunicationOptions(\n",
"# implementation=tf.distribute.experimental.CollectiveCommunication.NCCL))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Running Large Training Jobs on Vertex AI"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's copy the training script, but add `import os` and change the save path to be the GCS path that the `AIP_MODEL_DIR` environment variable will point to:"
]
},
{
"cell_type": "code",
"execution_count": 83,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Writing my_vertex_ai_training_task.py\n"
]
}
],
"source": [
"%%writefile my_vertex_ai_training_task.py\n",
"\n",
"import os\n",
"from pathlib import Path\n",
"import tempfile\n",
"import tensorflow as tf\n",
"\n",
"strategy = tf.distribute.MultiWorkerMirroredStrategy() # at the start!\n",
"resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()\n",
"\n",
"if resolver.task_type == \"chief\":\n",
" model_dir = os.getenv(\"AIP_MODEL_DIR\") # paths provided by Vertex AI\n",
" tensorboard_log_dir = os.getenv(\"AIP_TENSORBOARD_LOG_DIR\")\n",
" checkpoint_dir = os.getenv(\"AIP_CHECKPOINT_DIR\")\n",
"else:\n",
" tmp_dir = Path(tempfile.mkdtemp()) # other workers use a temporary dirs\n",
" model_dir = tmp_dir / \"model\"\n",
" tensorboard_log_dir = tmp_dir / \"logs\"\n",
" checkpoint_dir = tmp_dir / \"ckpt\"\n",
"\n",
"callbacks = [tf.keras.callbacks.TensorBoard(tensorboard_log_dir),\n",
" tf.keras.callbacks.ModelCheckpoint(checkpoint_dir)]\n",
"\n",
"# extra code Load and prepare the MNIST dataset\n",
"mnist = tf.keras.datasets.mnist.load_data()\n",
"(X_train_full, y_train_full), (X_test, y_test) = mnist\n",
"X_valid, X_train = X_train_full[:5000], X_train_full[5000:]\n",
"y_valid, y_train = y_train_full[:5000], y_train_full[5000:]\n",
"\n",
"# extra code build and compile the Keras model using the distribution strategy\n",
"with strategy.scope():\n",
" model = tf.keras.Sequential([\n",
" tf.keras.layers.Reshape([28, 28, 1], input_shape=[28, 28],\n",
" dtype=tf.uint8),\n",
" tf.keras.layers.Lambda(lambda X: X / 255),\n",
" tf.keras.layers.Conv2D(filters=64, kernel_size=7, activation=\"relu\",\n",
" padding=\"same\", input_shape=[28, 28, 1]),\n",
" tf.keras.layers.MaxPooling2D(pool_size=2),\n",
" tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation=\"relu\",\n",
" padding=\"same\"), \n",
" tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation=\"relu\",\n",
" padding=\"same\"),\n",
" tf.keras.layers.MaxPooling2D(pool_size=2),\n",
" tf.keras.layers.Flatten(),\n",
" tf.keras.layers.Dense(units=64, activation=\"relu\"),\n",
" tf.keras.layers.Dropout(0.5),\n",
" tf.keras.layers.Dense(units=10, activation=\"softmax\"),\n",
" ])\n",
" model.compile(loss=\"sparse_categorical_crossentropy\",\n",
" optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),\n",
" metrics=[\"accuracy\"])\n",
"\n",
"model.fit(X_train, y_train, validation_data=(X_valid, y_valid), epochs=10,\n",
" callbacks=callbacks)\n",
"model.save(model_dir, save_format=\"tf\")"
]
},
{
"cell_type": "code",
"execution_count": 84,
"metadata": {},
"outputs": [],
"source": [
"custom_training_job = aiplatform.CustomTrainingJob(\n",
" display_name=\"my_custom_training_job\",\n",
" script_path=\"my_vertex_ai_training_task.py\",\n",
" container_uri=\"gcr.io/cloud-aiplatform/training/tf-gpu.2-4:latest\",\n",
" model_serving_container_image_uri=server_image,\n",
" requirements=[\"gcsfs==2022.3.0\"], # not needed, this is just an example\n",
" staging_bucket=f\"gs://{bucket_name}/staging\"\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 85,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Training script copied to:\n",
"gs://my_bucket/aiplatform-2022-04-14-10:08:24.124-aiplatform_custom_trainer_script-0.1.tar.gz.\n",
"Training Output directory:\n",
"gs://my_bucket/aiplatform-custom-training-2022-04-14-10:08:25.226 \n",
"View Training:\n",
"https://console.cloud.google.com/ai/platform/locations/us-central1/training/5407999068506947584?project=522977795627\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state:\n",
"PipelineState.PIPELINE_STATE_PENDING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"View backing custom job:\n",
"https://console.cloud.google.com/ai/platform/locations/us-central1/training/6685701948726837248?project=522977795627\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob run completed. Resource name: projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584\n",
"Model available at projects/522977795627/locations/us-central1/models/9094548856498028544\n"
]
}
],
"source": [
"mnist_model2 = custom_training_job.run(\n",
" machine_type=\"n1-standard-4\",\n",
" replica_count=2,\n",
" accelerator_type=\"NVIDIA_TESLA_K80\",\n",
" accelerator_count=2,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's clean up:"
]
},
{
"cell_type": "code",
"execution_count": 86,
"metadata": {},
"outputs": [],
"source": [
"mnist_model2.delete()\n",
"custom_training_job.delete()\n",
"blobs = bucket.list_blobs(prefix=f\"gs://{bucket_name}/staging/\")\n",
"for blob in blobs:\n",
" blob.delete()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Hyperparameter Tuning on Vertex AI"
]
},
{
"cell_type": "code",
"execution_count": 87,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Writing my_vertex_ai_trial.py\n"
]
}
],
"source": [
"%%writefile my_vertex_ai_trial.py\n",
"\n",
"import argparse\n",
"\n",
"parser = argparse.ArgumentParser()\n",
"parser.add_argument(\"--n_hidden\", type=int, default=2)\n",
"parser.add_argument(\"--n_neurons\", type=int, default=256)\n",
"parser.add_argument(\"--learning_rate\", type=float, default=1e-2)\n",
"parser.add_argument(\"--optimizer\", default=\"adam\")\n",
"args = parser.parse_args()\n",
"\n",
"import tensorflow as tf\n",
"\n",
"def build_model(args):\n",
" with tf.distribute.MirroredStrategy().scope():\n",
" model = tf.keras.Sequential()\n",
" model.add(tf.keras.layers.Flatten(input_shape=[28, 28], dtype=tf.uint8))\n",
" for _ in range(args.n_hidden):\n",
" model.add(tf.keras.layers.Dense(args.n_neurons, activation=\"relu\"))\n",
" model.add(tf.keras.layers.Dense(10, activation=\"softmax\"))\n",
" opt = tf.keras.optimizers.get(args.optimizer)\n",
" opt.learning_rate = args.learning_rate\n",
" model.compile(loss=\"sparse_categorical_crossentropy\", optimizer=opt,\n",
" metrics=[\"accuracy\"])\n",
" return model\n",
"\n",
"# extra code loads and splits the dataset\n",
"mnist = tf.keras.datasets.mnist.load_data()\n",
"(X_train_full, y_train_full), (X_test, y_test) = mnist\n",
"X_valid, X_train = X_train_full[:5000], X_train_full[5000:]\n",
"y_valid, y_train = y_train_full[:5000], y_train_full[5000:]\n",
"\n",
"# extra code use the AIP_* environment variable and create the callbacks\n",
"import os\n",
"model_dir = os.getenv(\"AIP_MODEL_DIR\")\n",
"tensorboard_log_dir = os.getenv(\"AIP_TENSORBOARD_LOG_DIR\")\n",
"checkpoint_dir = os.getenv(\"AIP_CHECKPOINT_DIR\")\n",
"trial_id = os.getenv(\"CLOUD_ML_TRIAL_ID\")\n",
"tensorboard_cb = tf.keras.callbacks.TensorBoard(tensorboard_log_dir)\n",
"early_stopping_cb = tf.keras.callbacks.EarlyStopping(patience=5)\n",
"callbacks = [tensorboard_cb, early_stopping_cb]\n",
"\n",
"model = build_model(args)\n",
"history = model.fit(X_train, y_train, validation_data=(X_valid, y_valid),\n",
" epochs=10, callbacks=callbacks)\n",
"model.save(model_dir, save_format=\"tf\") # extra code\n",
"\n",
"import hypertune\n",
"\n",
"hypertune = hypertune.HyperTune()\n",
"hypertune.report_hyperparameter_tuning_metric(\n",
" hyperparameter_metric_tag=\"accuracy\", # name of the reported metric\n",
" metric_value=max(history.history[\"val_accuracy\"]), # max accuracy value\n",
" global_step=model.optimizer.iterations.numpy(),\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 88,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Training script copied to:\n",
"gs://homl3-mybucket5/staging/aiplatform-2022-04-18-18:14:02.860-aiplatform_custom_trainer_script-0.1.tar.gz.\n"
]
}
],
"source": [
"trial_job = aiplatform.CustomJob.from_local_script(\n",
" display_name=\"my_search_trial_job\",\n",
" script_path=\"my_vertex_ai_trial.py\", # path to your training script\n",
" container_uri=\"gcr.io/cloud-aiplatform/training/tf-gpu.2-4:latest\",\n",
" staging_bucket=f\"gs://{bucket_name}/staging\",\n",
" accelerator_type=\"NVIDIA_TESLA_K80\",\n",
" accelerator_count=2, # in this example, each trial will have 2 GPUs\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 89,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Creating HyperparameterTuningJob\n",
"HyperparameterTuningJob created. Resource name: projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568\n",
"To use this HyperparameterTuningJob in another session:\n",
"hpt_job = aiplatform.HyperparameterTuningJob.get('projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568')\n",
"View HyperparameterTuningJob:\n",
"https://console.cloud.google.com/ai/platform/locations/us-central1/training/5825136187899117568?project=522977795627\n",
"HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state:\n",
"JobState.JOB_STATE_RUNNING\n",
"HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state:\n",
"JobState.JOB_STATE_SUCCEEDED\n",
"HyperparameterTuningJob run completed. Resource name: projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568\n"
]
}
],
"source": [
"from google.cloud.aiplatform import hyperparameter_tuning as hpt\n",
"\n",
"hp_job = aiplatform.HyperparameterTuningJob(\n",
" display_name=\"my_hp_search_job\",\n",
" custom_job=trial_job,\n",
" metric_spec={\"accuracy\": \"maximize\"},\n",
" parameter_spec={\n",
" \"learning_rate\": hpt.DoubleParameterSpec(min=1e-3, max=10, scale=\"log\"),\n",
" \"n_neurons\": hpt.IntegerParameterSpec(min=1, max=300, scale=\"linear\"),\n",
" \"n_hidden\": hpt.IntegerParameterSpec(min=1, max=10, scale=\"linear\"),\n",
" \"optimizer\": hpt.CategoricalParameterSpec([\"sgd\", \"adam\"]),\n",
" },\n",
" max_trial_count=100,\n",
" parallel_trial_count=20,\n",
")\n",
"hp_job.run()"
]
},
{
"cell_type": "code",
"execution_count": 90,
"metadata": {},
"outputs": [],
"source": [
"def get_final_metric(trial, metric_id):\n",
" for metric in trial.final_measurement.metrics:\n",
" if metric.metric_id == metric_id:\n",
" return metric.value\n",
"\n",
"trials = hp_job.trials\n",
"trial_accuracies = [get_final_metric(trial, \"accuracy\") for trial in trials]\n",
"best_trial = trials[np.argmax(trial_accuracies)]"
]
},
{
"cell_type": "code",
"execution_count": 91,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"0.977400004863739"
]
},
"execution_count": 91,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"max(trial_accuracies)"
]
},
{
"cell_type": "code",
"execution_count": 92,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'98'"
]
},
"execution_count": 92,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"best_trial.id"
]
},
{
"cell_type": "code",
"execution_count": 93,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[parameter_id: \"learning_rate\"\n",
"value {\n",
" number_value: 0.001\n",
"}\n",
", parameter_id: \"n_hidden\"\n",
"value {\n",
" number_value: 8.0\n",
"}\n",
", parameter_id: \"n_neurons\"\n",
"value {\n",
" number_value: 216.0\n",
"}\n",
", parameter_id: \"optimizer\"\n",
"value {\n",
" string_value: \"adam\"\n",
"}\n",
"]"
]
},
"execution_count": 93,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"best_trial.parameters"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Extra Material Distributed Keras Tuner on Vertex AI"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Instead of using Vertex AI's hyperparameter tuning service, you can use [Keras Tuner](https://keras.io/keras_tuner/) (introduced in Chapter 10) and run it on Vertex AI VMs. Keras Tuner provides a simple way to scale hyperparameter search by distributing it across multiple machines: it only requires setting three environment variables on each machine, then running your regular Keras Tuner code on each machine. You can use the exact same script on all machines. One of the machines acts as the chief, and the others act as workers. Each worker asks the chief which hyperparameter values to try—it acts as the oracle—then the worker trains the model using these hyperparameter values, and finally it reports the model's performance back to the chief, which can then decide which hyperparameter values the worker should try next.\n",
"\n",
"The three environment variables you need to set on each machine are:\n",
"\n",
"* `KERASTUNER_TUNER_ID`: equal to `\"chief\"` on the chief machine, or a unique identifier on each worker machine, such as `\"worker0\"`, `\"worker1\"`, etc.\n",
"* `KERASTUNER_ORACLE_IP`: the IP address or hostname of the chief machine. The chief itself should generally use `\"0.0.0.0\"` to listen on every IP address on the machine.\n",
"* `KERASTUNER_ORACLE_PORT`: the TCP port that the chief will be listening on.\n",
"\n",
"You can use distributed Keras Tuner on any set of machines. If you want to run it on Vertex AI machines, then you can spawn a regular training job, and just modify the training script to set the three environment variables properly before using Keras Tuner.\n",
"\n",
"For example, the script below starts by parsing the `TF_CONFIG` environment variable, which will be automatically set by Vertex AI, just like earlier. It finds the address of the task of type `\"chief\"`, and it extracts the IP address or hostname, and the TCP port. It then defines the tuner ID as the task type followed by the task index, for example `\"worker0\"`. If the tuner ID is `\"chief0\"`, it changes it to `\"chief\"`, and it sets the IP to `\"0.0.0.0\"`: this will make it listen on all IPv4 address on its machine. Then it defines the environment variables for Keras Tuner. Next, the script creates a tuner, just like in Chapter 10, the it runs the search, and finally it saves the best model to the location given by Vertex AI:"
]
},
{
"cell_type": "code",
"execution_count": 94,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Writing my_keras_tuner_search.py\n"
]
}
],
"source": [
"%%writefile my_keras_tuner_search.py\n",
"\n",
"import json\n",
"import os\n",
"\n",
"tf_config = json.loads(os.environ[\"TF_CONFIG\"])\n",
"\n",
"chief_ip, chief_port = tf_config[\"cluster\"][\"chief\"][0].rsplit(\":\", 1)\n",
"tuner_id = f'{tf_config[\"task\"][\"type\"]}{tf_config[\"task\"][\"index\"]}'\n",
"if tuner_id == \"chief0\":\n",
" tuner_id = \"chief\"\n",
" chief_ip = \"0.0.0.0\"\n",
" # extra code since the chief doesn't work much, you can optimize compute\n",
" # resources by running a worker on the same machine. To do this, you can\n",
" # just make the chief start another process, after tweaking the TF_CONFIG\n",
" # environment variable to set the task type to \"worker\" and the task index\n",
" # to a unique value. Uncomment the next few lines to give this a try:\n",
" # import subprocess\n",
" # import sys\n",
" # tf_config[\"task\"][\"type\"] = \"workerX\" # the worker on the chief's machine\n",
" # os.environ[\"TF_CONFIG\"] = json.dumps(tf_config)\n",
" # subprocess.Popen([sys.executable] + sys.argv,\n",
" # stdout=sys.stdout, stderr=sys.stderr)\n",
"\n",
"os.environ[\"KERASTUNER_TUNER_ID\"] = tuner_id\n",
"os.environ[\"KERASTUNER_ORACLE_IP\"] = chief_ip\n",
"os.environ[\"KERASTUNER_ORACLE_PORT\"] = chief_port\n",
"\n",
"from pathlib import Path\n",
"import keras_tuner as kt\n",
"import tensorflow as tf\n",
"\n",
"gcs_path = \"/gcs/my_bucket/my_hp_search\" # replace with your bucket's name\n",
"\n",
"def build_model(hp):\n",
" n_hidden = hp.Int(\"n_hidden\", min_value=0, max_value=8, default=2)\n",
" n_neurons = hp.Int(\"n_neurons\", min_value=16, max_value=256)\n",
" learning_rate = hp.Float(\"learning_rate\", min_value=1e-4, max_value=1e-2,\n",
" sampling=\"log\")\n",
" optimizer = hp.Choice(\"optimizer\", values=[\"sgd\", \"adam\"])\n",
" if optimizer == \"sgd\":\n",
" optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate)\n",
" else:\n",
" optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)\n",
"\n",
" model = tf.keras.Sequential()\n",
" model.add(tf.keras.layers.Flatten(input_shape=[28, 28], dtype=tf.uint8))\n",
" for _ in range(n_hidden):\n",
" model.add(tf.keras.layers.Dense(n_neurons, activation=\"relu\"))\n",
" model.add(tf.keras.layers.Dense(10, activation=\"softmax\"))\n",
" model.compile(loss=\"sparse_categorical_crossentropy\",\n",
" optimizer=optimizer,\n",
" metrics=[\"accuracy\"])\n",
" return model\n",
"\n",
"hyperband_tuner = kt.Hyperband(\n",
" build_model, objective=\"val_accuracy\", seed=42,\n",
" max_epochs=10, factor=3, hyperband_iterations=2,\n",
" distribution_strategy=tf.distribute.MirroredStrategy(),\n",
" directory=gcs_path, project_name=\"mnist\")\n",
"\n",
"# extra code Load and split the MNIST dataset\n",
"mnist = tf.keras.datasets.mnist.load_data()\n",
"(X_train_full, y_train_full), (X_test, y_test) = mnist\n",
"X_valid, X_train = X_train_full[:5000], X_train_full[5000:]\n",
"y_valid, y_train = y_train_full[:5000], y_train_full[5000:]\n",
"\n",
"tensorboard_log_dir = os.environ[\"AIP_TENSORBOARD_LOG_DIR\"] + \"/\" + tuner_id\n",
"tensorboard_cb = tf.keras.callbacks.TensorBoard(tensorboard_log_dir)\n",
"early_stopping_cb = tf.keras.callbacks.EarlyStopping(patience=5)\n",
"hyperband_tuner.search(X_train, y_train, epochs=10,\n",
" validation_data=(X_valid, y_valid),\n",
" callbacks=[tensorboard_cb, early_stopping_cb])\n",
"\n",
"if tuner_id == \"chief\":\n",
" best_hp = hyperband_tuner.get_best_hyperparameters()[0]\n",
" best_model = hyperband_tuner.hypermodel.build(best_hp)\n",
" best_model.save(os.getenv(\"AIP_MODEL_DIR\"), save_format=\"tf\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that Vertex AI automatically mounts the `/gcs` directory to GCS, using the open source [GCS Fuse adapter](https://cloud.google.com/storage/docs/gcs-fuse). This gives us a shared directory across the workers and the chief, which is required by Keras Tuner. Also note that we set the distribution strategy to a `MirroredStrategy`. This will allow each worker to use all the GPUs on its machine, if there's more than one.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Replace `/gcs/my_bucket/` with <code>/gcs/<i>{bucket_name}</i>/</code>:"
]
},
{
"cell_type": "code",
"execution_count": 95,
"metadata": {},
"outputs": [],
"source": [
"with open(\"my_keras_tuner_search.py\") as f:\n",
" script = f.read()\n",
"\n",
"with open(\"my_keras_tuner_search.py\", \"w\") as f:\n",
" f.write(script.replace(\"/gcs/my_bucket/\", f\"/gcs/{bucket_name}/\"))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now all we need to do is to start a custom training job based on this script, exactly like in the previous section. Don't forget to add `keras-tuner` to the list of `requirements`:"
]
},
{
"cell_type": "code",
"execution_count": 96,
"metadata": {},
"outputs": [],
"source": [
"hp_search_job = aiplatform.CustomTrainingJob(\n",
" display_name=\"my_hp_search_job\",\n",
" script_path=\"my_keras_tuner_search.py\",\n",
" container_uri=\"gcr.io/cloud-aiplatform/training/tf-gpu.2-4:latest\",\n",
" model_serving_container_image_uri=server_image,\n",
" requirements=[\"keras-tuner~=1.1.2\"],\n",
" staging_bucket=f\"gs://{bucket_name}/staging\",\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 97,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Training script copied to:\n",
"gs://my_bucket/staging/aiplatform-2022-04-15-13:34:32.591-aiplatform_custom_trainer_script-0.1.tar.gz.\n",
"Training Output directory:\n",
"gs://my_bucket/staging/aiplatform-custom-training-2022-04-15-13:34:34.453 \n",
"View Training:\n",
"https://console.cloud.google.com/ai/platform/locations/us-central1/training/8601543785521872896?project=522977795627\n",
"View backing custom job:\n",
"https://console.cloud.google.com/ai/platform/locations/us-central1/training/5022607048831926272?project=522977795627\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:\n",
"PipelineState.PIPELINE_STATE_RUNNING\n",
"CustomTrainingJob run completed. Resource name: projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896\n",
"Model available at projects/522977795627/locations/us-central1/models/8176544832480168612\n",
"\n"
]
}
],
"source": [
"mnist_model3 = hp_search_job.run(\n",
" machine_type=\"n1-standard-4\",\n",
" replica_count=3,\n",
" accelerator_type=\"NVIDIA_TESLA_K80\",\n",
" accelerator_count=2,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"And we have a model!"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's clean up:"
]
},
{
"cell_type": "code",
"execution_count": 98,
"metadata": {},
"outputs": [],
"source": [
"mnist_model3.delete()\n",
"hp_search_job.delete()\n",
"blobs = bucket.list_blobs(prefix=f\"gs://{bucket_name}/staging/\")\n",
"for blob in blobs:\n",
" blob.delete()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Extra Material Using AutoML to Train a Model"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's start by exporting the MNIST dataset to PNG images, and prepare an `import.csv` pointing to each image, and indicating the split (training, validation, or test) and the label:"
]
},
{
"cell_type": "code",
"execution_count": 99,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"70000/70000"
]
}
],
"source": [
"import matplotlib.pyplot as plt\n",
"\n",
"mnist_path = Path(\"datasets/mnist\")\n",
"mnist_path.mkdir(parents=True, exist_ok=True)\n",
"idx = 0\n",
"with open(mnist_path / \"import.csv\", \"w\") as import_csv:\n",
" for split, X, y in zip((\"training\", \"validation\", \"test\"),\n",
" (X_train, X_valid, X_test),\n",
" (y_train, y_valid, y_test)):\n",
" for image, label in zip(X, y):\n",
" print(f\"\\r{idx + 1}/70000\", end=\"\")\n",
" filename = f\"{idx:05d}.png\"\n",
" plt.imsave(mnist_path / filename, np.tile(image, 3))\n",
" line = f\"{split},gs://{bucket_name}/mnist/{filename},{label}\\n\"\n",
" import_csv.write(line)\n",
" idx += 1"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's upload this dataset to GCS:"
]
},
{
"cell_type": "code",
"execution_count": 100,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Uploaded datasets/mnist \n"
]
}
],
"source": [
"upload_directory(bucket, mnist_path)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's create a managed image dataset on Vertex AI:"
]
},
{
"cell_type": "code",
"execution_count": 101,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Creating ImageDataset\n",
"Create ImageDataset backing LRO: projects/522977795627/locations/us-central1/datasets/7532459492777132032/operations/3812233931370004480\n",
"ImageDataset created. Resource name: projects/522977795627/locations/us-central1/datasets/7532459492777132032\n",
"To use this ImageDataset in another session:\n",
"ds = aiplatform.ImageDataset('projects/522977795627/locations/us-central1/datasets/7532459492777132032')\n",
"Importing ImageDataset data: projects/522977795627/locations/us-central1/datasets/7532459492777132032\n",
"Import ImageDataset data backing LRO: projects/522977795627/locations/us-central1/datasets/7532459492777132032/operations/3010593197698056192\n",
"ImageDataset data imported. Resource name: projects/522977795627/locations/us-central1/datasets/7532459492777132032\n"
]
}
],
"source": [
"from aiplatform.schema.dataset.ioformat.image import single_label_classification\n",
"\n",
"mnist_dataset = aiplatform.ImageDataset.create(\n",
" display_name=\"mnist-dataset\",\n",
" gcs_source=[f\"gs://{bucket_name}/mnist/import.csv\"],\n",
" project=project_id,\n",
" import_schema_uri=single_label_classification,\n",
" sync=True,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create an AutoML training job on this dataset:"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**TODO**"
]
},
{
"cell_type": "markdown",
"metadata": {
"tags": []
},
"source": [
"# Exercise Solutions"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. to 8."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"1. A SavedModel contains a TensorFlow model, including its architecture (a computation graph) and its weights. It is stored as a directory containing a _saved_model.pb_ file, which defines the computation graph (represented as a serialized protocol buffer), and a _variables_ subdirectory containing the variable values. For models containing a large number of weights, these variable values may be split across multiple files. A SavedModel also includes an _assets_ subdirectory that may contain additional data, such as vocabulary files, class names, or some example instances for this model. To be more accurate, a SavedModel can contain one or more _metagraphs_. A metagraph is a computation graph plus some function signature definitions (including their input and output names, types, and shapes). Each metagraph is identified by a set of tags. To inspect a SavedModel, you can use the command-line tool `saved_model_cli` or just load it using `tf.saved_model.load()` and inspect it in Python.\n",
"2. TF Serving allows you to deploy multiple TensorFlow models (or multiple versions of the same model) and make them accessible to all your applications easily via a REST API or a gRPC API. Using your models directly in your applications would make it harder to deploy a new version of a model across all applications. Implementing your own microservice to wrap a TF model would require extra work, and it would be hard to match TF Serving's features. TF Serving has many features: it can monitor a directory and autodeploy the models that are placed there, and you won't have to change or even restart any of your applications to benefit from the new model versions; it's fast, well tested, and scales very well; and it supports A/B testing of experimental models and deploying a new model version to just a subset of your users (in this case the model is called a _canary_). TF Serving is also capable of grouping individual requests into batches to run them jointly on the GPU. To deploy TF Serving, you can install it from source, but it is much simpler to install it using a Docker image. To deploy a cluster of TF Serving Docker images, you can use an orchestration tool such as Kubernetes, or use a fully hosted solution such as Google Vertex AI.\n",
"3. To deploy a model across multiple TF Serving instances, all you need to do is configure these TF Serving instances to monitor the same _models_ directory, and then export your new model as a SavedModel into a subdirectory.\n",
"4. The gRPC API is more efficient than the REST API. However, its client libraries are not as widely available, and if you activate compression when using the REST API, you can get almost the same performance. So, the gRPC API is most useful when you need the highest possible performance and the clients are not limited to the REST API.\n",
"5. To reduce a model's size so it can run on a mobile or embedded device, TFLite uses several techniques:\n",
" * It provides a converter which can optimize a SavedModel: it shrinks the model and reduces its latency. To do this, it prunes all the operations that are not needed to make predictions (such as training operations), and it optimizes and fuses operations whenever possible.\n",
" * The converter can also perform post-training quantization: this technique dramatically reduces the models size, so its much faster to download and store.\n",
" * It saves the optimized model using the FlatBuffer format, which can be loaded to RAM directly, without parsing. This reduces the loading time and memory footprint.\n",
"6. Quantization-aware training consists in adding fake quantization operations to the model during training. This allows the model to learn to ignore the quantization noise; the final weights will be more robust to quantization.\n",
"7. Model parallelism means chopping your model into multiple parts and running them in parallel across multiple devices, hopefully speeding up the model during training or inference. Data parallelism means creating multiple exact replicas of your model and deploying them across multiple devices. At each iteration during training, each replica is given a different batch of data, and it computes the gradients of the loss with regard to the model parameters. In synchronous data parallelism, the gradients from all replicas are then aggregated and the optimizer performs a Gradient Descent step. The parameters may be centralized (e.g., on parameter servers) or replicated across all replicas and kept in sync using AllReduce. In asynchronous data parallelism, the parameters are centralized and the replicas run independently from each other, each updating the central parameters directly at the end of each training iteration, without having to wait for the other replicas. To speed up training, data parallelism turns out to work better than model parallelism, in general. This is mostly because it requires less communication across devices. Moreover, it is much easier to implement, and it works the same way for any model, whereas model parallelism requires analyzing the model to determine the best way to chop it into pieces. That said, research in this domain is making quick progress (e.g., PipeDream or Pathways), so a mix of model parallelism and data parallelism is probably the way forward.\n",
"8. When training a model across multiple servers, you can use the following distribution strategies:\n",
" * The `MultiWorkerMirroredStrategy` performs mirrored data parallelism. The model is replicated across all available servers and devices, and each replica gets a different batch of data at each training iteration and computes its own gradients. The mean of the gradients is computed and shared across all replicas using a distributed AllReduce implementation (NCCL by default), and all replicas perform the same Gradient Descent step. This strategy is the simplest to use since all servers and devices are treated in exactly the same way, and it performs fairly well. In general, you should use this strategy. Its main limitation is that it requires the model to fit in RAM on every replica.\n",
" * The `ParameterServerStrategy` performs asynchronous data parallelism. The model is replicated across all devices on all workers, and the parameters are sharded across all parameter servers. Each worker has its own training loop, running asynchronously with the other workers; at each training iteration, each worker gets its own batch of data and fetches the latest version of the model parameters from the parameter servers, then it computes the gradients of the loss with regard to these parameters, and it sends them to the parameter servers. Lastly, the parameter servers perform a Gradient Descent step using these gradients. This strategy is generally slower than the previous strategy, and a bit harder to deploy, since it requires managing parameter servers. However, it can be useful in some situations, especially when you can take advantage of the asynchronous updates, for example to reduce I/O bottlenecks. This depends on many factors, including hardware, network topology, number of servers, model size, and more, so your mileage may vary."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 9.\n",
"_Exercise: Train a model (any model you like) and deploy it to TF Serving or Google Vertex AI. Write the client code to query it using the REST API or the gRPC API. Update the model and deploy the new version. Your client code will now query the new version. Roll back to the first version._"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Please follow the steps in the <a href=\"#Deploying-TensorFlow-models-to-TensorFlow-Serving-(TFS)\">Deploying TensorFlow models to TensorFlow Serving</a> section above."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 10.\n",
"_Exercise: Train any model across multiple GPUs on the same machine using the `MirroredStrategy` (if you do not have access to GPUs, you can use Colaboratory with a GPU Runtime and create two virtual GPUs). Train the model again using the `CentralStorageStrategy `and compare the training time._"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Please follow the steps in the [Distributed Training](#Distributed-Training) section above."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 11.\n",
"_Exercise: Train a small model on Google Vertex AI, using TensorFlow Cloud Tuner for hyperparameter tuning._"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Please follow the instructions in the _Hyperparameter Tuning using TensorFlow Cloud Tuner_ section in the book."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Congratulations!"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You've reached the end of the book! I hope you found it useful. 😊"
]
}
],
"metadata": {
"accelerator": "GPU",
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.6"
}
},
"nbformat": 4,
"nbformat_minor": 4
}