handson-ml/19_training_and_deploying_a...

1090 lines
32 KiB
Plaintext
Raw Normal View History

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Chapter 19 Training and Deploying TensorFlow Models at Scale**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"_This notebook contains all the sample code in chapter 19._"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Setup\n",
"First, let's import a few common modules, ensure MatplotLib plots figures inline and prepare a function to save the figures. We also check that Python 3.5 or later is installed (although Python 2.x may work, it is deprecated so we strongly recommend you use Python 3 instead), as well as Scikit-Learn ≥0.20 and TensorFlow ≥2.0-preview.\n"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# Python ≥3.5 is required\n",
"import sys\n",
"assert sys.version_info >= (3, 5)\n",
"\n",
"# Scikit-Learn ≥0.20 is required\n",
"import sklearn\n",
"assert sklearn.__version__ >= \"0.20\"\n",
"\n",
"# TensorFlow ≥2.0-preview is required\n",
"import tensorflow as tf\n",
"from tensorflow import keras\n",
"assert tf.__version__ >= \"2.0\"\n",
"\n",
"# Common imports\n",
"import numpy as np\n",
"import os\n",
"\n",
"# to make this notebook's output stable across runs\n",
"np.random.seed(42)\n",
"tf.random.set_seed(42)\n",
"\n",
"# To plot pretty figures\n",
"%matplotlib inline\n",
"import matplotlib as mpl\n",
"import matplotlib.pyplot as plt\n",
"mpl.rc('axes', labelsize=14)\n",
"mpl.rc('xtick', labelsize=12)\n",
"mpl.rc('ytick', labelsize=12)\n",
"\n",
"# Where to save the figures\n",
"PROJECT_ROOT_DIR = \".\"\n",
"CHAPTER_ID = \"deploy\"\n",
"IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, \"images\", CHAPTER_ID)\n",
"os.makedirs(IMAGES_PATH, exist_ok=True)\n",
"\n",
"def save_fig(fig_id, tight_layout=True, fig_extension=\"png\", resolution=300):\n",
" path = os.path.join(IMAGES_PATH, fig_id + \".\" + fig_extension)\n",
" print(\"Saving figure\", fig_id)\n",
" if tight_layout:\n",
" plt.tight_layout()\n",
" plt.savefig(path, format=fig_extension, dpi=resolution)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Deploying TensorFlow models to TensorFlow Serving (TFS)\n",
"We will use the REST API or the gRPC API."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Save/Load a `SavedModel`"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()\n",
"X_train_full = X_train_full[..., np.newaxis].astype(np.float32) / 255.\n",
"X_test = X_test[..., np.newaxis].astype(np.float32) / 255.\n",
"X_valid, X_train = X_train_full[:5000], X_train_full[5000:]\n",
"y_valid, y_train = y_train_full[:5000], y_train_full[5000:]\n",
"X_new = X_test[:3]"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"np.random.seed(42)\n",
"tf.random.set_seed(42)\n",
"\n",
"model = keras.models.Sequential([\n",
" keras.layers.Flatten(input_shape=[28, 28, 1]),\n",
" keras.layers.Dense(100, activation=\"relu\"),\n",
" keras.layers.Dense(10, activation=\"softmax\")\n",
"])\n",
"model.compile(loss=\"sparse_categorical_crossentropy\",\n",
" optimizer=keras.optimizers.SGD(lr=1e-2),\n",
" metrics=[\"accuracy\"])\n",
"model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"np.round(model.predict(X_new), 2)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"model_version = \"0001\"\n",
"model_name = \"my_mnist_model\"\n",
"model_path = os.path.join(model_name, model_version)\n",
"model_path"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"!rm -rf {model_name}"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"tf.saved_model.save(model, model_path)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"for root, dirs, files in os.walk(model_name):\n",
" indent = ' ' * root.count(os.sep)\n",
" print('{}{}/'.format(indent, os.path.basename(root)))\n",
" for filename in files:\n",
" print('{}{}'.format(indent + ' ', filename))"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"!saved_model_cli show --dir {model_path}"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"!saved_model_cli show --dir {model_path} --tag_set serve"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"!saved_model_cli show --dir {model_path} --tag_set serve \\\n",
" --signature_def serving_default"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"!saved_model_cli show --dir {model_path} --all"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's write the new instances to a `npy` file so we can pass them easily to our model:"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"np.save(\"my_mnist_tests.npy\", X_new)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"input_name = model.input_names[0]\n",
"input_name"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"And now let's use `saved_model_cli` to make predictions for the instances we just saved:"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"!saved_model_cli run --dir {model_path} --tag_set serve \\\n",
" --signature_def serving_default \\\n",
" --inputs {input_name}=my_mnist_tests.npy"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"np.round([[1.1739199e-04, 1.1239604e-07, 6.0210604e-04, 2.0804715e-03, 2.5779348e-06,\n",
" 6.4079795e-05, 2.7411186e-08, 9.9669880e-01, 3.9654213e-05, 3.9471846e-04],\n",
" [1.2294615e-03, 2.9207937e-05, 9.8599273e-01, 9.6755642e-03, 8.8930705e-08,\n",
" 2.9156188e-04, 1.5831805e-03, 1.1311053e-09, 1.1980456e-03, 1.1113169e-07],\n",
" [6.4066830e-05, 9.6359509e-01, 9.0598064e-03, 2.9872139e-03, 5.9552520e-04,\n",
" 3.7478798e-03, 2.5074568e-03, 1.1462728e-02, 5.5553433e-03, 4.2495009e-04]], 2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## TensorFlow Serving"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Install [Docker](https://docs.docker.com/install/) if you don't have it already. Then run:\n",
"\n",
"```bash\n",
"docker pull tensorflow/serving\n",
"\n",
"export ML_PATH=$HOME/ml # or wherever this project is\n",
"docker run -it --rm -p 8500:8500 -p 8501:8501 \\\n",
" -v \"$ML_PATH/my_mnist_model:/models/my_mnist_model\" \\\n",
" -e MODEL_NAME=my_mnist_model \\\n",
" tensorflow/serving\n",
"```\n",
"Once you are finished using it, press Ctrl-C to shut down the server."
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"\n",
"input_data_json = json.dumps({\n",
" \"signature_name\": \"serving_default\",\n",
" \"instances\": X_new.tolist(),\n",
"})"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"repr(input_data_json)[:1500] + \"...\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's use TensorFlow Serving's REST API to make predictions:"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"import requests\n",
"\n",
"SERVER_URL = 'http://localhost:8501/v1/models/my_mnist_model:predict'\n",
"response = requests.post(SERVER_URL, data=input_data_json)\n",
"response.raise_for_status() # raise an exception in case of error\n",
"response = response.json()"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [],
"source": [
"response.keys()"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"y_proba = np.array(response[\"predictions\"])\n",
"y_proba.round(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Using the gRPC API"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"from tensorflow_serving.apis.predict_pb2 import PredictRequest\n",
"\n",
"request = PredictRequest()\n",
"request.model_spec.name = model_name\n",
"request.model_spec.signature_name = \"serving_default\"\n",
"input_name = model.input_names[0]\n",
"request.inputs[input_name].CopyFrom(tf.make_tensor_proto(X_new))"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
"import grpc\n",
"from tensorflow_serving.apis import prediction_service_pb2_grpc\n",
"\n",
"channel = grpc.insecure_channel('localhost:8500')\n",
"predict_service = prediction_service_pb2_grpc.PredictionServiceStub(channel)\n",
"response = predict_service.Predict(request, timeout=10.0)"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [],
"source": [
"response"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Convert the response to a tensor:"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"output_name = model.output_names[0]\n",
"outputs_proto = response.outputs[output_name]\n",
"y_proba = tf.make_ndarray(outputs_proto)\n",
"y_proba.numpy().round(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Or to a NumPy array if your client does not include the TensorFlow library:"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [],
"source": [
"output_name = model.output_names[0]\n",
"outputs_proto = response.outputs[output_name]\n",
"shape = [dim.size for dim in outputs_proto.tensor_shape.dim]\n",
"y_proba = np.array(outputs_proto.float_val).reshape(shape)\n",
"y_proba.round(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Deploying a new model version"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"np.random.seed(42)\n",
"tf.random.set_seed(42)\n",
"\n",
"model = keras.models.Sequential([\n",
" keras.layers.Flatten(input_shape=[28, 28]),\n",
" keras.layers.Dense(50, activation=\"relu\"),\n",
" keras.layers.Dense(50, activation=\"relu\"),\n",
" keras.layers.Dense(10, activation=\"softmax\")\n",
"])\n",
"model.compile(loss=\"sparse_categorical_crossentropy\",\n",
" optimizer=keras.optimizers.SGD(lr=1e-2),\n",
" metrics=[\"accuracy\"])\n",
"history = model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [],
"source": [
"model_version = \"0002\"\n",
"model_name = \"my_mnist_model\"\n",
"model_path = os.path.join(model_name, model_version)\n",
"model_path"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [],
"source": [
"tf.saved_model.save(model, model_path)"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [],
"source": [
"for root, dirs, files in os.walk(model_name):\n",
" indent = ' ' * root.count(os.sep)\n",
" print('{}{}/'.format(indent, os.path.basename(root)))\n",
" for filename in files:\n",
" print('{}{}'.format(indent + ' ', filename))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Warning**: You may need to wait a minute before the new model is loaded by TensorFlow Serving."
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [],
"source": [
"import requests\n",
"\n",
"SERVER_URL = 'http://localhost:8501/v1/models/my_mnist_model:predict'\n",
" \n",
"response = requests.post(SERVER_URL, data=input_data_json)\n",
"response.raise_for_status()\n",
"response = response.json()"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [],
"source": [
"response.keys()"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [],
"source": [
"y_proba = np.array(response[\"predictions\"])\n",
"y_proba.round(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Deploy the model to Google Cloud AI Platform"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Follow the instructions in the book to deploy the model to Google Cloud AI Platform, download the service account's private key and save it to the `my_service_account_private_key.json` in the project directory. Also, update the `project_id`:"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [],
"source": [
"project_id = \"onyx-smoke-242003\""
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {},
"outputs": [],
"source": [
"import googleapiclient.discovery\n",
"\n",
"os.environ[\"GOOGLE_APPLICATION_CREDENTIALS\"] = \"my_service_account_private_key.json\"\n",
"model_id = \"my_mnist_model\"\n",
"model_path = \"projects/{}/models/{}\".format(project_id, model_id)\n",
"model_path += \"/versions/v0001/\" # if you want to run a specific version\n",
"ml_resource = googleapiclient.discovery.build(\"ml\", \"v1\").projects()"
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [],
"source": [
"def predict(X):\n",
" input_data_json = {\"signature_name\": \"serving_default\",\n",
" \"instances\": X.tolist()}\n",
" request = ml_resource.predict(name=model_path, body=input_data_json)\n",
" response = request.execute()\n",
" if \"error\" in response:\n",
" raise RuntimeError(response[\"error\"])\n",
" return np.array([pred[output_name] for pred in response[\"predictions\"]])"
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {},
"outputs": [],
"source": [
"Y_probas = predict(X_new)\n",
"np.round(Y_probas, 2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Using GPUs"
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {},
"outputs": [],
"source": [
"tf.test.is_gpu_available()"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [],
"source": [
"tf.test.gpu_device_name()"
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [],
"source": [
"tf.test.is_built_with_cuda()"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {},
"outputs": [],
"source": [
"from tensorflow.python.client.device_lib import list_local_devices\n",
"\n",
"devices = list_local_devices()\n",
"devices"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Distributed Training"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [],
"source": [
"keras.backend.clear_session()\n",
"tf.random.set_seed(42)\n",
"np.random.seed(42)"
]
},
{
"cell_type": "code",
"execution_count": 43,
"metadata": {},
"outputs": [],
"source": [
"def create_model():\n",
" return keras.models.Sequential([\n",
" keras.layers.Conv2D(filters=64, kernel_size=7, activation=\"relu\",\n",
" padding=\"same\", input_shape=[28, 28, 1]),\n",
" keras.layers.MaxPooling2D(pool_size=2),\n",
" keras.layers.Conv2D(filters=128, kernel_size=3, activation=\"relu\",\n",
" padding=\"same\"), \n",
" keras.layers.Conv2D(filters=128, kernel_size=3, activation=\"relu\",\n",
" padding=\"same\"),\n",
" keras.layers.MaxPooling2D(pool_size=2),\n",
" keras.layers.Flatten(),\n",
" keras.layers.Dense(units=64, activation='relu'),\n",
" keras.layers.Dropout(0.5),\n",
" keras.layers.Dense(units=10, activation='softmax'),\n",
" ])"
]
},
{
"cell_type": "code",
"execution_count": 44,
"metadata": {},
"outputs": [],
"source": [
"batch_size = 100\n",
"model = create_model()\n",
"model.compile(loss=\"sparse_categorical_crossentropy\",\n",
" optimizer=keras.optimizers.SGD(lr=1e-2),\n",
" metrics=[\"accuracy\"])\n",
"model.fit(X_train, y_train, epochs=10,\n",
" validation_data=(X_valid, y_valid), batch_size=batch_size)"
]
},
{
"cell_type": "code",
"execution_count": 45,
"metadata": {},
"outputs": [],
"source": [
"keras.backend.clear_session()\n",
"tf.random.set_seed(42)\n",
"np.random.seed(42)\n",
"\n",
"distribution = tf.distribute.MirroredStrategy()\n",
"\n",
"# Change the default all-reduce algorithm:\n",
"#distribution = tf.distribute.MirroredStrategy(\n",
"# cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())\n",
"\n",
"# Specify the list of GPUs to use:\n",
"#distribution = tf.distribute.MirroredStrategy(devices=[\"/gpu:0\", \"/gpu:1\"])\n",
"\n",
"# Use the central storage strategy instead:\n",
"#distribution = tf.distribute.experimental.CentralStorageStrategy()\n",
"\n",
"#resolver = tf.distribute.cluster_resolver.TPUClusterResolver()\n",
"#tf.tpu.experimental.initialize_tpu_system(resolver)\n",
"#distribution = tf.distribute.experimental.TPUStrategy(resolver)\n",
"\n",
"with distribution.scope():\n",
" model = create_model()\n",
" model.compile(loss=\"sparse_categorical_crossentropy\",\n",
" optimizer=keras.optimizers.SGD(lr=1e-2),\n",
" metrics=[\"accuracy\"])"
]
},
{
"cell_type": "code",
"execution_count": 46,
"metadata": {},
"outputs": [],
"source": [
"batch_size = 100 # must be divisible by the number of workers\n",
"model.fit(X_train, y_train, epochs=10,\n",
" validation_data=(X_valid, y_valid), batch_size=batch_size)"
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {},
"outputs": [],
"source": [
"model.predict(X_new)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Custom training loop:"
]
},
{
"cell_type": "code",
"execution_count": 48,
"metadata": {},
"outputs": [],
"source": [
"keras.backend.clear_session()\n",
"tf.random.set_seed(42)\n",
"np.random.seed(42)\n",
"\n",
"K = keras.backend\n",
"\n",
"distribution = tf.distribute.MirroredStrategy()\n",
"\n",
"with distribution.scope():\n",
" model = create_model()\n",
" optimizer = keras.optimizers.SGD()\n",
"\n",
"with distribution.scope():\n",
" dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).repeat().batch(batch_size)\n",
" input_iterator = distribution.make_dataset_iterator(dataset)\n",
" \n",
"@tf.function\n",
"def train_step():\n",
" def step_fn(inputs):\n",
" X, y = inputs\n",
" with tf.GradientTape() as tape:\n",
" Y_proba = model(X)\n",
" loss = K.sum(keras.losses.sparse_categorical_crossentropy(y, Y_proba)) / batch_size\n",
"\n",
" grads = tape.gradient(loss, model.trainable_variables)\n",
" optimizer.apply_gradients(zip(grads, model.trainable_variables))\n",
" return loss\n",
"\n",
" per_replica_losses = distribution.experimental_run(step_fn, input_iterator)\n",
" mean_loss = distribution.reduce(tf.distribute.ReduceOp.SUM,\n",
" per_replica_losses, axis=None)\n",
" return mean_loss\n",
"\n",
"n_epochs = 10\n",
"with distribution.scope():\n",
" input_iterator.initialize()\n",
" for epoch in range(n_epochs):\n",
" print(\"Epoch {}/{}\".format(epoch + 1, n_epochs))\n",
" for iteration in range(len(X_train) // batch_size):\n",
" print(\"\\rLoss: {:.3f}\".format(train_step().numpy()), end=\"\")\n",
" print()"
]
},
{
"cell_type": "code",
"execution_count": 49,
"metadata": {},
"outputs": [],
"source": [
"batch_size = 100 # must be divisible by the number of workers\n",
"model.fit(X_train, y_train, epochs=10,\n",
" validation_data=(X_valid, y_valid), batch_size=batch_size)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Training across multiple servers"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A TensorFlow cluster is a group of TensorFlow processes running in parallel, usually on different machines, and talking to each other to complete some work, for example training or executing a neural network. Each TF process in the cluster is called a \"task\" (or a \"TF server\"). It has an IP address, a port, and a type (also called its role or its job). The type can be `\"worker\"`, `\"chief\"`, `\"ps\"` (parameter server) or `\"evaluator\"`:\n",
"* Each **worker** performs computations, usually on a machine with one or more GPUs.\n",
"* The **chief** performs computations as well, but it also handles extra work such as writing TensorBoard logs or saving checkpoints. There is a single chief in a cluster. If no chief is specified, then the first worker is the chief.\n",
"* A **parameter server** (ps) only keeps track of variable values, it is usually on a CPU-only machine.\n",
"* The **evaluator** obviously takes care of evaluation. There is usually a single evaluator in a cluster.\n",
"\n",
"The set of tasks that share the same type is often called a \"job\". For example, the \"worker\" job is the set of all workers.\n",
"\n",
"To start a TensorFlow cluster, you must first specify it. This means defining all the tasks (IP address, TCP port, and type). For example, the following cluster specification defines a cluster with 3 tasks (2 workers and 1 parameter server). It's a dictionary with one key per job, and the values are lists of task addresses:\n",
"\n",
"```\n",
"{\n",
" \"worker\": [\"my-worker0.example.com:9876\", \"my-worker1.example.com:9876\"],\n",
" \"ps\": [\"my-ps0.example.com:9876\"]\n",
"}\n",
"```\n",
"\n",
"Every task in the cluster may communicate with every other task in the server, so make sure to configure your firewall to authorize all communications between these machines on these ports (it's usually simpler if you use the same port on every machine).\n",
"\n",
"When a task is started, it needs to be told which one it is: its type and index (the task index is also called the task id). A common way to specify everything at once (both the cluster spec and the current task's type and id) is to set the `TF_CONFIG` environment variable before starting the program. It must be a JSON-encoded dictionary containing a cluster specification (under the `\"cluster\"` key), and the type and index of the task to start (under the `\"task\"` key). For example, the following `TF_CONFIG` environment variable defines a simple cluster with 2 workers and 1 parameter server, and specifies that the task to start is the first worker:"
]
},
{
"cell_type": "code",
"execution_count": 50,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import json\n",
"\n",
"os.environ[\"TF_CONFIG\"] = json.dumps({\n",
" \"cluster\": {\n",
" \"worker\": [\"my-work0.example.com:9876\", \"my-work1.example.com:9876\"],\n",
" \"ps\": [\"my-ps0.example.com:9876\"]\n",
" },\n",
" \"task\": {\"type\": \"worker\", \"index\": 0}\n",
"})\n",
"print(\"TF_CONFIG='{}'\".format(os.environ[\"TF_CONFIG\"]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Some platforms (e.g., Google Cloud ML Engine) automatically set this environment variable for you."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Then you would write a short Python script to start a task. The same script can be used on every machine, since it will load the `TF_CONFIG` variable, which will tell it which task to start:"
]
},
{
"cell_type": "code",
"execution_count": 51,
"metadata": {},
"outputs": [],
"source": [
"import tensorflow as tf\n",
"\n",
"resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()\n",
"worker0 = tf.distribute.Server(resolver.cluster_spec(),\n",
" job_name=resolver.task_type,\n",
" task_index=resolver.task_id)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Another way to specify the cluster specification is directly in Python, rather than through an environment variable:"
]
},
{
"cell_type": "code",
"execution_count": 52,
"metadata": {},
"outputs": [],
"source": [
"cluster_spec = tf.train.ClusterSpec({\n",
" \"worker\": [\"127.0.0.1:9901\", \"127.0.0.1:9902\"],\n",
" \"ps\": [\"127.0.0.1:9903\"]\n",
"})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can then start a server simply by passing it the cluster spec and indicating its type and index. Let's start the two remaining tasks (remember that in general you would only start a single task per machine; we are starting 3 tasks on the localhost just for the purpose of this code example):"
]
},
{
"cell_type": "code",
"execution_count": 53,
"metadata": {},
"outputs": [],
"source": [
"#worker1 = tf.distribute.Server(cluster_spec, job_name=\"worker\", task_index=1)\n",
"ps0 = tf.distribute.Server(cluster_spec, job_name=\"ps\", task_index=0)"
]
},
{
"cell_type": "code",
"execution_count": 54,
"metadata": {},
"outputs": [],
"source": [
"os.environ[\"TF_CONFIG\"] = json.dumps({\n",
" \"cluster\": {\n",
" \"worker\": [\"127.0.0.1:9901\", \"127.0.0.1:9902\"],\n",
" \"ps\": [\"127.0.0.1:9903\"]\n",
" },\n",
" \"task\": {\"type\": \"worker\", \"index\": 1}\n",
"})\n",
"print(repr(os.environ[\"TF_CONFIG\"]))"
]
},
{
"cell_type": "code",
"execution_count": 55,
"metadata": {},
"outputs": [],
"source": [
"distribution = tf.distribute.experimental.MultiWorkerMirroredStrategy()\n",
"\n",
"keras.backend.clear_session()\n",
"tf.random.set_seed(42)\n",
"np.random.seed(42)\n",
"\n",
"os.environ[\"TF_CONFIG\"] = json.dumps({\n",
" \"cluster\": {\n",
" \"worker\": [\"127.0.0.1:9901\", \"127.0.0.1:9902\"],\n",
" \"ps\": [\"127.0.0.1:9903\"]\n",
" },\n",
" \"task\": {\"type\": \"worker\", \"index\": 1}\n",
"})\n",
"#CUDA_VISIBLE_DEVICES=0 \n",
"\n",
"with distribution.scope():\n",
" model = create_model()\n",
" model.compile(loss=\"sparse_categorical_crossentropy\",\n",
" optimizer=keras.optimizers.SGD(lr=1e-2),\n",
" metrics=[\"accuracy\"])"
]
},
{
"cell_type": "code",
"execution_count": 56,
"metadata": {},
"outputs": [],
"source": [
"import tensorflow as tf\n",
"from tensorflow import keras\n",
"import numpy as np\n",
"\n",
"# At the beginning of the program (restart the kernel before running this cell)\n",
"distribution = tf.distribute.experimental.MultiWorkerMirroredStrategy()\n",
"\n",
"(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()\n",
"X_train_full = X_train_full[..., np.newaxis] / 255.\n",
"X_test = X_test[..., np.newaxis] / 255.\n",
"X_valid, X_train = X_train_full[:5000], X_train_full[5000:]\n",
"y_valid, y_train = y_train_full[:5000], y_train_full[5000:]\n",
"X_new = X_test[:3]\n",
"\n",
"n_workers = 2\n",
"batch_size = 32 * n_workers\n",
"dataset = tf.data.Dataset.from_tensor_slices((X_train[..., np.newaxis], y_train)).repeat().batch(batch_size)\n",
" \n",
"def create_model():\n",
" return keras.models.Sequential([\n",
" keras.layers.Conv2D(filters=64, kernel_size=7, activation=\"relu\",\n",
" padding=\"same\", input_shape=[28, 28, 1]),\n",
" keras.layers.MaxPooling2D(pool_size=2),\n",
" keras.layers.Conv2D(filters=128, kernel_size=3, activation=\"relu\",\n",
" padding=\"same\"), \n",
" keras.layers.Conv2D(filters=128, kernel_size=3, activation=\"relu\",\n",
" padding=\"same\"),\n",
" keras.layers.MaxPooling2D(pool_size=2),\n",
" keras.layers.Flatten(),\n",
" keras.layers.Dense(units=64, activation='relu'),\n",
" keras.layers.Dropout(0.5),\n",
" keras.layers.Dense(units=10, activation='softmax'),\n",
" ])\n",
"\n",
"with distribution.scope():\n",
" model = create_model()\n",
" model.compile(loss=\"sparse_categorical_crossentropy\",\n",
" optimizer=keras.optimizers.SGD(lr=1e-2),\n",
" metrics=[\"accuracy\"])\n",
"\n",
"model.fit(dataset, steps_per_epoch=len(X_train)//batch_size, epochs=10)"
]
},
{
"cell_type": "code",
"execution_count": 57,
"metadata": {},
"outputs": [],
"source": [
"# Hyperparameter tuning\n",
"\n",
"# Only talk to ps server\n",
"config_proto = tf.ConfigProto(device_filters=['/job:ps', '/job:worker/task:%d' % tf_config['task']['index']])\n",
"config = tf.estimator.RunConfig(session_config=config_proto)\n",
"# default since 1.10"
]
},
{
"cell_type": "code",
"execution_count": 58,
"metadata": {},
"outputs": [],
"source": [
"strategy.num_replicas_in_sync"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.8"
}
},
"nbformat": 4,
"nbformat_minor": 2
}