{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "**Chapter 12 – Distributed TensorFlow**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "_This notebook contains all the sample code and solutions to the exercises in chapter 12._" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Setup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, let's make sure this notebook works well in both python 2 and 3, import a few common modules, ensure MatplotLib plots figures inline and prepare a function to save the figures:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# To support both python 2 and python 3\n", "from __future__ import division, print_function, unicode_literals\n", "\n", "# Common imports\n", "import numpy as np\n", "import os\n", "\n", "# to make this notebook's output stable across runs\n", "def reset_graph(seed=42):\n", " tf.reset_default_graph()\n", " tf.set_random_seed(seed)\n", " np.random.seed(seed)\n", "\n", "# To plot pretty figures\n", "%matplotlib inline\n", "import matplotlib\n", "import matplotlib.pyplot as plt\n", "plt.rcParams['axes.labelsize'] = 14\n", "plt.rcParams['xtick.labelsize'] = 12\n", "plt.rcParams['ytick.labelsize'] = 12\n", "\n", "# Where to save the figures\n", "PROJECT_ROOT_DIR = \".\"\n", "CHAPTER_ID = \"distributed\"\n", "\n", "def save_fig(fig_id, tight_layout=True):\n", " path = os.path.join(PROJECT_ROOT_DIR, \"images\", CHAPTER_ID, fig_id + \".png\")\n", " print(\"Saving figure\", fig_id)\n", " if tight_layout:\n", " plt.tight_layout()\n", " plt.savefig(path, format='png', dpi=300)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Local server" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import tensorflow as tf" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "c = tf.constant(\"Hello distributed TensorFlow!\")\n", "server = tf.train.Server.create_local_server()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "with tf.Session(server.target) as sess:\n", " print(sess.run(c))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Cluster" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "cluster_spec = tf.train.ClusterSpec({\n", " \"ps\": [\n", " \"127.0.0.1:2221\", # /job:ps/task:0\n", " \"127.0.0.1:2222\", # /job:ps/task:1\n", " ],\n", " \"worker\": [\n", " \"127.0.0.1:2223\", # /job:worker/task:0\n", " \"127.0.0.1:2224\", # /job:worker/task:1\n", " \"127.0.0.1:2225\", # /job:worker/task:2\n", " ]})" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "task_ps0 = tf.train.Server(cluster_spec, job_name=\"ps\", task_index=0)\n", "task_ps1 = tf.train.Server(cluster_spec, job_name=\"ps\", task_index=1)\n", "task_worker0 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=0)\n", "task_worker1 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=1)\n", "task_worker2 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Pinning operations across devices and servers" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", "\n", "with tf.device(\"/job:ps\"):\n", " a = tf.Variable(1.0, name=\"a\")\n", "\n", "with tf.device(\"/job:worker\"):\n", " b = a + 2\n", "\n", "with tf.device(\"/job:worker/task:1\"):\n", " c = a + b" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "with tf.Session(\"grpc://127.0.0.1:2221\") as sess:\n", " sess.run(a.initializer)\n", " print(c.eval())" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", "\n", "with tf.device(tf.train.replica_device_setter(\n", " ps_tasks=2,\n", " ps_device=\"/job:ps\",\n", " worker_device=\"/job:worker\")):\n", " v1 = tf.Variable(1.0, name=\"v1\") # pinned to /job:ps/task:0 (defaults to /cpu:0)\n", " v2 = tf.Variable(2.0, name=\"v2\") # pinned to /job:ps/task:1 (defaults to /cpu:0)\n", " v3 = tf.Variable(3.0, name=\"v3\") # pinned to /job:ps/task:0 (defaults to /cpu:0)\n", " s = v1 + v2 # pinned to /job:worker (defaults to task:0/cpu:0)\n", " with tf.device(\"/task:1\"):\n", " p1 = 2 * s # pinned to /job:worker/task:1 (defaults to /cpu:0)\n", " with tf.device(\"/cpu:0\"):\n", " p2 = 3 * s # pinned to /job:worker/task:1/cpu:0\n", "\n", "config = tf.ConfigProto()\n", "config.log_device_placement = True\n", "\n", "with tf.Session(\"grpc://127.0.0.1:2221\", config=config) as sess:\n", " v1.initializer.run()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Readers – the old way" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "reset_graph()" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "default1 = tf.constant([5.])\n", "default2 = tf.constant([6])\n", "default3 = tf.constant([7])\n", "dec = tf.decode_csv(tf.constant(\"1.,,44\"),\n", " record_defaults=[default1, default2, default3])\n", "with tf.Session() as sess:\n", " print(sess.run(dec))" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", "\n", "test_csv = open(\"my_test.csv\", \"w\")\n", "test_csv.write(\"x1, x2 , target\\n\")\n", "test_csv.write(\"1.,, 0\\n\")\n", "test_csv.write(\"4., 5. , 1\\n\")\n", "test_csv.write(\"7., 8. , 0\\n\")\n", "test_csv.close()\n", "\n", "filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n", "filename = tf.placeholder(tf.string)\n", "enqueue_filename = filename_queue.enqueue([filename])\n", "close_filename_queue = filename_queue.close()\n", "\n", "reader = tf.TextLineReader(skip_header_lines=1)\n", "key, value = reader.read(filename_queue)\n", "\n", "x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n", "features = tf.stack([x1, x2])\n", "\n", "instance_queue = tf.RandomShuffleQueue(\n", " capacity=10, min_after_dequeue=2,\n", " dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n", " name=\"instance_q\", shared_name=\"shared_instance_q\")\n", "enqueue_instance = instance_queue.enqueue([features, target])\n", "close_instance_queue = instance_queue.close()\n", "\n", "minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n", "\n", "with tf.Session() as sess:\n", " sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n", " sess.run(close_filename_queue)\n", " try:\n", " while True:\n", " sess.run(enqueue_instance)\n", " except tf.errors.OutOfRangeError as ex:\n", " print(\"No more files to read\")\n", " sess.run(close_instance_queue)\n", " try:\n", " while True:\n", " print(sess.run([minibatch_instances, minibatch_targets]))\n", " except tf.errors.OutOfRangeError as ex:\n", " print(\"No more training instances\")" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "#coord = tf.train.Coordinator()\n", "#threads = tf.train.start_queue_runners(coord=coord)\n", "#filename_queue = tf.train.string_input_producer([\"test.csv\"])\n", "#coord.request_stop()\n", "#coord.join(threads)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Queue runners and coordinators" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", "\n", "filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n", "filename = tf.placeholder(tf.string)\n", "enqueue_filename = filename_queue.enqueue([filename])\n", "close_filename_queue = filename_queue.close()\n", "\n", "reader = tf.TextLineReader(skip_header_lines=1)\n", "key, value = reader.read(filename_queue)\n", "\n", "x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n", "features = tf.stack([x1, x2])\n", "\n", "instance_queue = tf.RandomShuffleQueue(\n", " capacity=10, min_after_dequeue=2,\n", " dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n", " name=\"instance_q\", shared_name=\"shared_instance_q\")\n", "enqueue_instance = instance_queue.enqueue([features, target])\n", "close_instance_queue = instance_queue.close()\n", "\n", "minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n", "\n", "n_threads = 5\n", "queue_runner = tf.train.QueueRunner(instance_queue, [enqueue_instance] * n_threads)\n", "coord = tf.train.Coordinator()\n", "\n", "with tf.Session() as sess:\n", " sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n", " sess.run(close_filename_queue)\n", " enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)\n", " try:\n", " while True:\n", " print(sess.run([minibatch_instances, minibatch_targets]))\n", " except tf.errors.OutOfRangeError as ex:\n", " print(\"No more training instances\")" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", "\n", "def read_and_push_instance(filename_queue, instance_queue):\n", " reader = tf.TextLineReader(skip_header_lines=1)\n", " key, value = reader.read(filename_queue)\n", " x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n", " features = tf.stack([x1, x2])\n", " enqueue_instance = instance_queue.enqueue([features, target])\n", " return enqueue_instance\n", "\n", "filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n", "filename = tf.placeholder(tf.string)\n", "enqueue_filename = filename_queue.enqueue([filename])\n", "close_filename_queue = filename_queue.close()\n", "\n", "instance_queue = tf.RandomShuffleQueue(\n", " capacity=10, min_after_dequeue=2,\n", " dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n", " name=\"instance_q\", shared_name=\"shared_instance_q\")\n", "\n", "minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n", "\n", "read_and_enqueue_ops = [read_and_push_instance(filename_queue, instance_queue) for i in range(5)]\n", "queue_runner = tf.train.QueueRunner(instance_queue, read_and_enqueue_ops)\n", "\n", "with tf.Session() as sess:\n", " sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n", " sess.run(close_filename_queue)\n", " coord = tf.train.Coordinator()\n", " enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)\n", " try:\n", " while True:\n", " print(sess.run([minibatch_instances, minibatch_targets]))\n", " except tf.errors.OutOfRangeError as ex:\n", " print(\"No more training instances\")\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Setting a timeout" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", "\n", "q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[()])\n", "v = tf.placeholder(tf.float32)\n", "enqueue = q.enqueue([v])\n", "dequeue = q.dequeue()\n", "output = dequeue + 1\n", "\n", "config = tf.ConfigProto()\n", "config.operation_timeout_in_ms = 1000\n", "\n", "with tf.Session(config=config) as sess:\n", " sess.run(enqueue, feed_dict={v: 1.0})\n", " sess.run(enqueue, feed_dict={v: 2.0})\n", " sess.run(enqueue, feed_dict={v: 3.0})\n", " print(sess.run(output))\n", " print(sess.run(output, feed_dict={dequeue: 5}))\n", " print(sess.run(output))\n", " print(sess.run(output))\n", " try:\n", " print(sess.run(output))\n", " except tf.errors.DeadlineExceededError as ex:\n", " print(\"Timed out while dequeuing\")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Data API" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The Data API, introduced in TensorFlow 1.4, makes reading data efficiently much easier." ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "tf.reset_default_graph()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's start with a simple dataset composed of three times the integers 0 to 9, in batches of 7:" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))\n", "dataset = dataset.repeat(3).batch(7)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The first line creates a dataset containing the integers 0 through 9. The second line creates a new dataset based on the first one, repeating its elements three times and creating batches of 7 elements. As you can see, we start with a source dataset, then we chain calls to various methods to apply transformations to the data." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we create a one-shot-iterator to go through this dataset just once, and we call its `get_next()` method to get a tensor that represents the next element." ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "iterator = dataset.make_one_shot_iterator()\n", "next_element = iterator.get_next()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's repeatedly evaluate `next_element` to go through the dataset. When there are not more elements, we get an `OutOfRangeError`:" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "with tf.Session() as sess:\n", " try:\n", " while True:\n", " print(next_element.eval())\n", " except tf.errors.OutOfRangeError:\n", " print(\"Done\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Great! It worked fine." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that, as always, a tensor is only evaluated once each time we run the graph (`sess.run()`): so even if we evaluate multiple tensors that all depend on `next_element`, it is only evaluated once. This is true as well if we ask for `next_element` to be evaluated twice in just one run:" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "with tf.Session() as sess:\n", " try:\n", " while True:\n", " print(sess.run([next_element, next_element]))\n", " except tf.errors.OutOfRangeError:\n", " print(\"Done\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `interleave()` method is powerful but a bit tricky to grasp at first. The easiest way to understand it is to look at an example:" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [], "source": [ "tf.reset_default_graph()" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [], "source": [ "dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))\n", "dataset = dataset.repeat(3).batch(7)\n", "dataset = dataset.interleave(\n", " lambda v: tf.data.Dataset.from_tensor_slices(v),\n", " cycle_length=3,\n", " block_length=2)\n", "iterator = dataset.make_one_shot_iterator()\n", "next_element = iterator.get_next()" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "with tf.Session() as sess:\n", " try:\n", " while True:\n", " print(next_element.eval(), end=\",\")\n", " except tf.errors.OutOfRangeError:\n", " print(\"Done\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Because `cycle_length=3`, the new dataset starts by pulling 3 elements from the previous dataset: that's `[0,1,2,3,4,5,6]`, `[7,8,9,0,1,2,3]` and `[4,5,6,7,8,9,0]`. Then it calls the lambda function we gave it to create one dataset for each of the elements. Since we use `Dataset.from_tensor_slices()`, each dataset is going to return its elements one by one. Next, it pulls two items (since `block_length=2`) from each of these three datasets, and it iterates until all three datasets are out of items: 0,1 (from 1st), 7,8 (from 2nd), 4,5 (from 3rd), 2,3 (from 1st), 9,0 (from 2nd), and so on until 8,9 (from 3rd), 6 (from 1st), 3 (from 2nd), 0 (from 3rd). Next it tries to pull the next 3 elements from the original dataset, but there are just two left: `[1,2,3,4,5,6,7]` and `[8,9]`. Again, it creates datasets from these elements, and it pulls two items from each until both datasets are out of items: 1,2 (from 1st), 8,9 (from 2nd), 3,4 (from 1st), 5,6 (from 1st), 7 (from 1st). Notice that there's no interleaving at the end since the arrays do not have the same length." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Readers – the new way" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Instead of using a source dataset based on `from_tensor_slices()` or `from_tensor()`, we can use a reader dataset. It handles most of the complexity for us (e.g., threads):" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "tf.reset_default_graph()" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [], "source": [ "filenames = [\"my_test.csv\"]" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [], "source": [ "dataset = tf.data.TextLineDataset(filenames)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We still need to tell it how to decode each line:" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [], "source": [ "def decode_csv_line(line):\n", " x1, x2, y = tf.decode_csv(\n", " line, record_defaults=[[-1.], [-1.], [-1.]])\n", " X = tf.stack([x1, x2])\n", " return X, y" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we can apply this decoding function to each element in the dataset using `map()`:" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [], "source": [ "dataset = dataset.skip(1).map(decode_csv_line)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, let's create a one-shot iterator:" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [], "source": [ "it = dataset.make_one_shot_iterator()\n", "X, y = it.get_next()" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [], "source": [ "with tf.Session() as sess:\n", " try:\n", " while True:\n", " X_val, y_val = sess.run([X, y])\n", " print(X_val, y_val)\n", " except tf.errors.OutOfRangeError as ex:\n", " print(\"Done\")\n" ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "# Exercise solutions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Coming soon**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" }, "nav_menu": {}, "toc": { "navigate_menu": true, "number_sections": true, "sideBar": true, "threshold": 6, "toc_cell": false, "toc_section_display": "block", "toc_window_display": false } }, "nbformat": 4, "nbformat_minor": 1 }