Update libraries to latest version, including TensorFlow 2.4.1 and Scikit-Learn 0.24.1
parent
749817ccfa
commit
7223978ae6
|
@ -286,12 +286,12 @@
|
|||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"np.round([[1.1739199e-04, 1.1239604e-07, 6.0210604e-04, 2.0804715e-03, 2.5779348e-06,\n",
|
||||
" 6.4079795e-05, 2.7411186e-08, 9.9669880e-01, 3.9654213e-05, 3.9471846e-04],\n",
|
||||
" [1.2294615e-03, 2.9207937e-05, 9.8599273e-01, 9.6755642e-03, 8.8930705e-08,\n",
|
||||
" 2.9156188e-04, 1.5831805e-03, 1.1311053e-09, 1.1980456e-03, 1.1113169e-07],\n",
|
||||
" [6.4066830e-05, 9.6359509e-01, 9.0598064e-03, 2.9872139e-03, 5.9552520e-04,\n",
|
||||
" 3.7478798e-03, 2.5074568e-03, 1.1462728e-02, 5.5553433e-03, 4.2495009e-04]], 2)"
|
||||
"np.round([[1.1347984e-04, 1.5187356e-07, 9.7032893e-04, 2.7640699e-03, 3.7826971e-06,\n",
|
||||
" 7.6876910e-05, 3.9140293e-08, 9.9559116e-01, 5.3502394e-05, 4.2665208e-04],\n",
|
||||
" [8.2443521e-04, 3.5493889e-05, 9.8826385e-01, 7.0466995e-03, 1.2957400e-07,\n",
|
||||
" 2.3389691e-04, 2.5639210e-03, 9.5886099e-10, 1.0314899e-03, 8.7952529e-08],\n",
|
||||
" [4.4693781e-05, 9.7028232e-01, 9.0526715e-03, 2.2641101e-03, 4.8766597e-04,\n",
|
||||
" 2.8800720e-03, 2.2714981e-03, 8.3753867e-03, 4.0439744e-03, 2.9759688e-04]], 2)"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -682,13 +682,21 @@
|
|||
"# Using GPUs"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"**Note**: `tf.test.is_gpu_available()` is deprecated. Instead, please use `tf.config.list_physical_devices('GPU')`."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 41,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"tf.test.is_gpu_available()"
|
||||
"#tf.test.is_gpu_available() # deprecated\n",
|
||||
"tf.config.list_physical_devices('GPU')"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -799,7 +807,12 @@
|
|||
"# Use the central storage strategy instead:\n",
|
||||
"#distribution = tf.distribute.experimental.CentralStorageStrategy()\n",
|
||||
"\n",
|
||||
"#resolver = tf.distribute.cluster_resolver.TPUClusterResolver()\n",
|
||||
"#if IS_COLAB and \"COLAB_TPU_ADDR\" in os.environ:\n",
|
||||
"# tpu_address = \"grpc://\" + os.environ[\"COLAB_TPU_ADDR\"]\n",
|
||||
"#else:\n",
|
||||
"# tpu_address = \"\"\n",
|
||||
"#resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu_address)\n",
|
||||
"#tf.config.experimental_connect_to_cluster(resolver)\n",
|
||||
"#tf.tpu.experimental.initialize_tpu_system(resolver)\n",
|
||||
"#distribution = tf.distribute.experimental.TPUStrategy(resolver)\n",
|
||||
"\n",
|
||||
|
@ -886,17 +899,6 @@
|
|||
" print()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 52,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"batch_size = 100 # must be divisible by the number of workers\n",
|
||||
"model.fit(X_train, y_train, epochs=10,\n",
|
||||
" validation_data=(X_valid, y_valid), batch_size=batch_size)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
|
@ -910,24 +912,37 @@
|
|||
"source": [
|
||||
"A TensorFlow cluster is a group of TensorFlow processes running in parallel, usually on different machines, and talking to each other to complete some work, for example training or executing a neural network. Each TF process in the cluster is called a \"task\" (or a \"TF server\"). It has an IP address, a port, and a type (also called its role or its job). The type can be `\"worker\"`, `\"chief\"`, `\"ps\"` (parameter server) or `\"evaluator\"`:\n",
|
||||
"* Each **worker** performs computations, usually on a machine with one or more GPUs.\n",
|
||||
"* The **chief** performs computations as well, but it also handles extra work such as writing TensorBoard logs or saving checkpoints. There is a single chief in a cluster. If no chief is specified, then the first worker is the chief.\n",
|
||||
"* The **chief** performs computations as well, but it also handles extra work such as writing TensorBoard logs or saving checkpoints. There is a single chief in a cluster, typically the first worker (i.e., worker #0).\n",
|
||||
"* A **parameter server** (ps) only keeps track of variable values, it is usually on a CPU-only machine.\n",
|
||||
"* The **evaluator** obviously takes care of evaluation. There is usually a single evaluator in a cluster.\n",
|
||||
"\n",
|
||||
"The set of tasks that share the same type is often called a \"job\". For example, the \"worker\" job is the set of all workers.\n",
|
||||
"\n",
|
||||
"To start a TensorFlow cluster, you must first specify it. This means defining all the tasks (IP address, TCP port, and type). For example, the following cluster specification defines a cluster with 3 tasks (2 workers and 1 parameter server). It's a dictionary with one key per job, and the values are lists of task addresses:\n",
|
||||
"\n",
|
||||
"```\n",
|
||||
"{\n",
|
||||
" \"worker\": [\"my-worker0.example.com:9876\", \"my-worker1.example.com:9876\"],\n",
|
||||
" \"ps\": [\"my-ps0.example.com:9876\"]\n",
|
||||
"}\n",
|
||||
"```\n",
|
||||
"\n",
|
||||
"To start a TensorFlow cluster, you must first define it. This means specifying all the tasks (IP address, TCP port, and type). For example, the following cluster specification defines a cluster with 3 tasks (2 workers and 1 parameter server). It's a dictionary with one key per job, and the values are lists of task addresses:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 52,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"cluster_spec = {\n",
|
||||
" \"worker\": [\n",
|
||||
" \"machine-a.example.com:2222\", # /job:worker/task:0\n",
|
||||
" \"machine-b.example.com:2222\" # /job:worker/task:1\n",
|
||||
" ],\n",
|
||||
" \"ps\": [\"machine-c.example.com:2222\"] # /job:ps/task:0\n",
|
||||
"}"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Every task in the cluster may communicate with every other task in the server, so make sure to configure your firewall to authorize all communications between these machines on these ports (it's usually simpler if you use the same port on every machine).\n",
|
||||
"\n",
|
||||
"When a task is started, it needs to be told which one it is: its type and index (the task index is also called the task id). A common way to specify everything at once (both the cluster spec and the current task's type and id) is to set the `TF_CONFIG` environment variable before starting the program. It must be a JSON-encoded dictionary containing a cluster specification (under the `\"cluster\"` key), and the type and index of the task to start (under the `\"task\"` key). For example, the following `TF_CONFIG` environment variable defines a simple cluster with 2 workers and 1 parameter server, and specifies that the task to start is the first worker:"
|
||||
"When a task is started, it needs to be told which one it is: its type and index (the task index is also called the task id). A common way to specify everything at once (both the cluster spec and the current task's type and id) is to set the `TF_CONFIG` environment variable before starting the program. It must be a JSON-encoded dictionary containing a cluster specification (under the `\"cluster\"` key), and the type and index of the task to start (under the `\"task\"` key). For example, the following `TF_CONFIG` environment variable defines the same cluster as above, with 2 workers and 1 parameter server, and specifies that the task to start is worker #1:"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -940,13 +955,10 @@
|
|||
"import json\n",
|
||||
"\n",
|
||||
"os.environ[\"TF_CONFIG\"] = json.dumps({\n",
|
||||
" \"cluster\": {\n",
|
||||
" \"worker\": [\"my-work0.example.com:9876\", \"my-work1.example.com:9876\"],\n",
|
||||
" \"ps\": [\"my-ps0.example.com:9876\"]\n",
|
||||
" },\n",
|
||||
" \"task\": {\"type\": \"worker\", \"index\": 0}\n",
|
||||
" \"cluster\": cluster_spec,\n",
|
||||
" \"task\": {\"type\": \"worker\", \"index\": 1}\n",
|
||||
"})\n",
|
||||
"print(\"TF_CONFIG='{}'\".format(os.environ[\"TF_CONFIG\"]))"
|
||||
"os.environ[\"TF_CONFIG\"]"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -960,7 +972,7 @@
|
|||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Then you would write a short Python script to start a task. The same script can be used on every machine, since it will load the `TF_CONFIG` variable, which will tell it which task to start:"
|
||||
"TensorFlow's `TFConfigClusterResolver` class reads the cluster configuration from this environment variable:"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -972,16 +984,7 @@
|
|||
"import tensorflow as tf\n",
|
||||
"\n",
|
||||
"resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()\n",
|
||||
"worker0 = tf.distribute.Server(resolver.cluster_spec(),\n",
|
||||
" job_name=resolver.task_type,\n",
|
||||
" task_index=resolver.task_id)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Another way to specify the cluster specification is directly in Python, rather than through an environment variable:"
|
||||
"resolver.cluster_spec()"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -990,17 +993,7 @@
|
|||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"cluster_spec = tf.train.ClusterSpec({\n",
|
||||
" \"worker\": [\"127.0.0.1:9901\", \"127.0.0.1:9902\"],\n",
|
||||
" \"ps\": [\"127.0.0.1:9903\"]\n",
|
||||
"})"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"You can then start a server simply by passing it the cluster spec and indicating its type and index. Let's start the two remaining tasks (remember that in general you would only start a single task per machine; we are starting 3 tasks on the localhost just for the purpose of this code example):"
|
||||
"resolver.task_type"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -1009,8 +1002,18 @@
|
|||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"#worker1 = tf.distribute.Server(cluster_spec, job_name=\"worker\", task_index=1)\n",
|
||||
"ps0 = tf.distribute.Server(cluster_spec, job_name=\"ps\", task_index=0)"
|
||||
"resolver.task_id"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Now let's run a simpler cluster with just two worker tasks, both running on the local machine. We will use the `MultiWorkerMirroredStrategy` to train a model across these two tasks.\n",
|
||||
"\n",
|
||||
"The first step is to write the training code. As this code will be used to run both workers, each in its own process, we write this code to a separate Python file, `my_mnist_multiworker_task.py`. The code is relatively straightforward, but there are a couple important things to note:\n",
|
||||
"* We create the `MultiWorkerMirroredStrategy` before doing anything else with TensorFlow.\n",
|
||||
"* Only one of the workers will take care of logging to TensorBoard and saving checkpoints. As mentioned earlier, this worker is called the *chief*, and by convention it is usually worker #0."
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -1019,70 +1022,41 @@
|
|||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"os.environ[\"TF_CONFIG\"] = json.dumps({\n",
|
||||
" \"cluster\": {\n",
|
||||
" \"worker\": [\"127.0.0.1:9901\", \"127.0.0.1:9902\"],\n",
|
||||
" \"ps\": [\"127.0.0.1:9903\"]\n",
|
||||
" },\n",
|
||||
" \"task\": {\"type\": \"worker\", \"index\": 1}\n",
|
||||
"})\n",
|
||||
"print(repr(os.environ[\"TF_CONFIG\"]))"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 58,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"distribution = tf.distribute.experimental.MultiWorkerMirroredStrategy()\n",
|
||||
"%%writefile my_mnist_multiworker_task.py\n",
|
||||
"\n",
|
||||
"keras.backend.clear_session()\n",
|
||||
"tf.random.set_seed(42)\n",
|
||||
"np.random.seed(42)\n",
|
||||
"\n",
|
||||
"os.environ[\"TF_CONFIG\"] = json.dumps({\n",
|
||||
" \"cluster\": {\n",
|
||||
" \"worker\": [\"127.0.0.1:9901\", \"127.0.0.1:9902\"],\n",
|
||||
" \"ps\": [\"127.0.0.1:9903\"]\n",
|
||||
" },\n",
|
||||
" \"task\": {\"type\": \"worker\", \"index\": 1}\n",
|
||||
"})\n",
|
||||
"#CUDA_VISIBLE_DEVICES=0 \n",
|
||||
"\n",
|
||||
"with distribution.scope():\n",
|
||||
" model = create_model()\n",
|
||||
" model.compile(loss=\"sparse_categorical_crossentropy\",\n",
|
||||
" optimizer=keras.optimizers.SGD(lr=1e-2),\n",
|
||||
" metrics=[\"accuracy\"])"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 59,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import os\n",
|
||||
"import numpy as np\n",
|
||||
"import tensorflow as tf\n",
|
||||
"from tensorflow import keras\n",
|
||||
"import numpy as np\n",
|
||||
"import time\n",
|
||||
"\n",
|
||||
"# At the beginning of the program (restart the kernel before running this cell)\n",
|
||||
"distribution = tf.distribute.experimental.MultiWorkerMirroredStrategy()\n",
|
||||
"# At the beginning of the program\n",
|
||||
"distribution = tf.distribute.MultiWorkerMirroredStrategy()\n",
|
||||
"\n",
|
||||
"resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()\n",
|
||||
"print(\"Starting task {}{}\".format(resolver.task_type, resolver.task_id))\n",
|
||||
"\n",
|
||||
"# Only worker #0 will write checkpoints and log to TensorBoard\n",
|
||||
"if resolver.task_id == 0:\n",
|
||||
" root_logdir = os.path.join(os.curdir, \"my_mnist_multiworker_logs\")\n",
|
||||
" run_id = time.strftime(\"run_%Y_%m_%d-%H_%M_%S\")\n",
|
||||
" run_dir = os.path.join(root_logdir, run_id)\n",
|
||||
" callbacks = [\n",
|
||||
" keras.callbacks.TensorBoard(run_dir),\n",
|
||||
" keras.callbacks.ModelCheckpoint(\"my_mnist_multiworker_model.h5\",\n",
|
||||
" save_best_only=True),\n",
|
||||
" ]\n",
|
||||
"else:\n",
|
||||
" callbacks = []\n",
|
||||
"\n",
|
||||
"# Load and prepare the MNIST dataset\n",
|
||||
"(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()\n",
|
||||
"X_train_full = X_train_full[..., np.newaxis] / 255.\n",
|
||||
"X_test = X_test[..., np.newaxis] / 255.\n",
|
||||
"X_valid, X_train = X_train_full[:5000], X_train_full[5000:]\n",
|
||||
"y_valid, y_train = y_train_full[:5000], y_train_full[5000:]\n",
|
||||
"X_new = X_test[:3]\n",
|
||||
"\n",
|
||||
"n_workers = 2\n",
|
||||
"batch_size = 32 * n_workers\n",
|
||||
"dataset = tf.data.Dataset.from_tensor_slices((X_train[..., np.newaxis], y_train)).repeat().batch(batch_size)\n",
|
||||
" \n",
|
||||
"def create_model():\n",
|
||||
" return keras.models.Sequential([\n",
|
||||
"with distribution.scope():\n",
|
||||
" model = keras.models.Sequential([\n",
|
||||
" keras.layers.Conv2D(filters=64, kernel_size=7, activation=\"relu\",\n",
|
||||
" padding=\"same\", input_shape=[28, 28, 1]),\n",
|
||||
" keras.layers.MaxPooling2D(pool_size=2),\n",
|
||||
|
@ -1096,14 +1070,62 @@
|
|||
" keras.layers.Dropout(0.5),\n",
|
||||
" keras.layers.Dense(units=10, activation='softmax'),\n",
|
||||
" ])\n",
|
||||
"\n",
|
||||
"with distribution.scope():\n",
|
||||
" model = create_model()\n",
|
||||
" model.compile(loss=\"sparse_categorical_crossentropy\",\n",
|
||||
" optimizer=keras.optimizers.SGD(lr=1e-2),\n",
|
||||
" metrics=[\"accuracy\"])\n",
|
||||
"\n",
|
||||
"model.fit(dataset, steps_per_epoch=len(X_train)//batch_size, epochs=10)"
|
||||
"model.fit(X_train, y_train, validation_data=(X_valid, y_valid),\n",
|
||||
" epochs=10, callbacks=callbacks)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"In a real world application, there would typically be a single worker per machine, but in this example we're running both workers on the same machine, so they will both try to use all the available GPU RAM (if this machine has a GPU), and this will likely lead to an Out-Of-Memory (OOM) error. To avoid this, we could use the `CUDA_VISIBLE_DEVICES` environment variable to assign a different GPU to each worker. Alternatively, we can simply disable GPU support, like this:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 58,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"os.environ[\"CUDA_VISIBLE_DEVICES\"] = \"-1\""
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"We are now ready to start both workers, each in its own process, using Python's `subprocess` module. Before we start each process, we need to set the `TF_CONFIG` environment variable appropriately, changing only the task index:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 59,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import subprocess\n",
|
||||
"\n",
|
||||
"cluster_spec = {\"worker\": [\"127.0.0.1:9901\", \"127.0.0.1:9902\"]}\n",
|
||||
"\n",
|
||||
"for index, worker_address in enumerate(cluster_spec[\"worker\"]):\n",
|
||||
" os.environ[\"TF_CONFIG\"] = json.dumps({\n",
|
||||
" \"cluster\": cluster_spec,\n",
|
||||
" \"task\": {\"type\": \"worker\", \"index\": index}\n",
|
||||
" })\n",
|
||||
" subprocess.Popen(\"python my_mnist_multiworker_task.py\", shell=True)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"That's it! Our TensorFlow cluster is now running, but we can't see it in this notebook because it's running in separate processes (but if you are running this notebook in Jupyter, you can see the worker logs in Jupyter's server logs).\n",
|
||||
"\n",
|
||||
"Since the chief (worker #0) is writing to TensorBoard, we use TensorBoard to view the training progress. Run the following cell, then click on the settings button (i.e., the gear icon) in the TensorBoard interface and check the \"Reload data\" box to make TensorBoard automatically refresh every 30s. Once the first epoch of training is finished (which may take a few minutes), and once TensorBoard refreshes, the SCALARS tab will appear. Click on this tab to view the progress of the model's training and validation accuracy."
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -1112,12 +1134,15 @@
|
|||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# Hyperparameter tuning\n",
|
||||
"\n",
|
||||
"# Only talk to ps server\n",
|
||||
"config_proto = tf.ConfigProto(device_filters=['/job:ps', '/job:worker/task:%d' % tf_config['task']['index']])\n",
|
||||
"config = tf.estimator.RunConfig(session_config=config_proto)\n",
|
||||
"# default since 1.10"
|
||||
"%load_ext tensorboard\n",
|
||||
"%tensorboard --logdir=./my_mnist_multiworker_logs --port=6006"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"That's it! Once training is over, the best checkpoint of the model will be available in the `my_mnist_multiworker_model.h5` file. You can load it using `keras.models.load_model()` and use it for predictions, as usual:"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -1126,7 +1151,18 @@
|
|||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"strategy.num_replicas_in_sync"
|
||||
"from tensorflow import keras\n",
|
||||
"\n",
|
||||
"model = keras.models.load_model(\"my_mnist_multiworker_model.h5\")\n",
|
||||
"Y_pred = model.predict(X_new)\n",
|
||||
"np.argmax(Y_pred, axis=-1)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"And that's all for today! Hope you found this useful. 😊"
|
||||
]
|
||||
}
|
||||
],
|
||||
|
@ -1146,7 +1182,7 @@
|
|||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.7.6"
|
||||
"version": "3.7.9"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
|
|
Loading…
Reference in New Issue