
766 lines
22 KiB
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"cells": [
"cell_type": "markdown",
"metadata": {},
"source": [
"**Chapter 12 Distributed TensorFlow**"
"cell_type": "markdown",
"metadata": {},
"source": [
"_This notebook contains all the sample code and solutions to the exercises in chapter 12._"
"cell_type": "markdown",
"metadata": {},
"source": [
"# Setup"
"cell_type": "markdown",
"metadata": {},
"source": [
"First, let's make sure this notebook works well in both python 2 and 3, import a few common modules, ensure MatplotLib plots figures inline and prepare a function to save the figures:"
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# To support both python 2 and python 3\n",
"from __future__ import division, print_function, unicode_literals\n",
"# Common imports\n",
"import numpy as np\n",
"import os\n",
"# to make this notebook's output stable across runs\n",
"def reset_graph(seed=42):\n",
" tf.reset_default_graph()\n",
" tf.set_random_seed(seed)\n",
" np.random.seed(seed)\n",
"# To plot pretty figures\n",
"%matplotlib inline\n",
"import matplotlib\n",
"import matplotlib.pyplot as plt\n",
"plt.rcParams['axes.labelsize'] = 14\n",
"plt.rcParams['xtick.labelsize'] = 12\n",
"plt.rcParams['ytick.labelsize'] = 12\n",
"# Where to save the figures\n",
"PROJECT_ROOT_DIR = \".\"\n",
"CHAPTER_ID = \"distributed\"\n",
"def save_fig(fig_id, tight_layout=True):\n",
" path = os.path.join(PROJECT_ROOT_DIR, \"images\", CHAPTER_ID, fig_id + \".png\")\n",
" print(\"Saving figure\", fig_id)\n",
" if tight_layout:\n",
" plt.tight_layout()\n",
" plt.savefig(path, format='png', dpi=300)"
"cell_type": "markdown",
"metadata": {},
"source": [
"# Local server"
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import tensorflow as tf"
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"c = tf.constant(\"Hello distributed TensorFlow!\")\n",
"server = tf.train.Server.create_local_server()"
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"with tf.Session(server.target) as sess:\n",
" print(sess.run(c))"
"cell_type": "markdown",
"metadata": {},
"source": [
"# Cluster"
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"cluster_spec = tf.train.ClusterSpec({\n",
" \"ps\": [\n",
" \"\", # /job:ps/task:0\n",
" \"\", # /job:ps/task:1\n",
" ],\n",
" \"worker\": [\n",
" \"\", # /job:worker/task:0\n",
" \"\", # /job:worker/task:1\n",
" \"\", # /job:worker/task:2\n",
" ]})"
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"task_ps0 = tf.train.Server(cluster_spec, job_name=\"ps\", task_index=0)\n",
"task_ps1 = tf.train.Server(cluster_spec, job_name=\"ps\", task_index=1)\n",
"task_worker0 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=0)\n",
"task_worker1 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=1)\n",
"task_worker2 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=2)"
"cell_type": "markdown",
"metadata": {},
"source": [
"# Pinning operations across devices and servers"
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"with tf.device(\"/job:ps\"):\n",
" a = tf.Variable(1.0, name=\"a\")\n",
"with tf.device(\"/job:worker\"):\n",
" b = a + 2\n",
"with tf.device(\"/job:worker/task:1\"):\n",
" c = a + b"
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"with tf.Session(\"grpc://\") as sess:\n",
" sess.run(a.initializer)\n",
" print(c.eval())"
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"with tf.device(tf.train.replica_device_setter(\n",
" ps_tasks=2,\n",
" ps_device=\"/job:ps\",\n",
" worker_device=\"/job:worker\")):\n",
" v1 = tf.Variable(1.0, name=\"v1\") # pinned to /job:ps/task:0 (defaults to /cpu:0)\n",
" v2 = tf.Variable(2.0, name=\"v2\") # pinned to /job:ps/task:1 (defaults to /cpu:0)\n",
" v3 = tf.Variable(3.0, name=\"v3\") # pinned to /job:ps/task:0 (defaults to /cpu:0)\n",
" s = v1 + v2 # pinned to /job:worker (defaults to task:0/cpu:0)\n",
" with tf.device(\"/task:1\"):\n",
" p1 = 2 * s # pinned to /job:worker/task:1 (defaults to /cpu:0)\n",
" with tf.device(\"/cpu:0\"):\n",
" p2 = 3 * s # pinned to /job:worker/task:1/cpu:0\n",
"config = tf.ConfigProto()\n",
"config.log_device_placement = True\n",
"with tf.Session(\"grpc://\", config=config) as sess:\n",
" v1.initializer.run()"
"cell_type": "markdown",
"metadata": {},
"source": [
"# Readers the old way"
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"default1 = tf.constant([5.])\n",
"default2 = tf.constant([6])\n",
"default3 = tf.constant([7])\n",
"dec = tf.decode_csv(tf.constant(\"1.,,44\"),\n",
" record_defaults=[default1, default2, default3])\n",
"with tf.Session() as sess:\n",
" print(sess.run(dec))"
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"test_csv = open(\"my_test.csv\", \"w\")\n",
"test_csv.write(\"x1, x2 , target\\n\")\n",
"test_csv.write(\"1.,, 0\\n\")\n",
"test_csv.write(\"4., 5. , 1\\n\")\n",
"test_csv.write(\"7., 8. , 0\\n\")\n",
"filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n",
"filename = tf.placeholder(tf.string)\n",
"enqueue_filename = filename_queue.enqueue([filename])\n",
"close_filename_queue = filename_queue.close()\n",
"reader = tf.TextLineReader(skip_header_lines=1)\n",
"key, value = reader.read(filename_queue)\n",
"x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n",
"features = tf.stack([x1, x2])\n",
"instance_queue = tf.RandomShuffleQueue(\n",
" capacity=10, min_after_dequeue=2,\n",
" dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n",
" name=\"instance_q\", shared_name=\"shared_instance_q\")\n",
"enqueue_instance = instance_queue.enqueue([features, target])\n",
"close_instance_queue = instance_queue.close()\n",
"minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n",
"with tf.Session() as sess:\n",
" sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n",
" sess.run(close_filename_queue)\n",
" try:\n",
" while True:\n",
" sess.run(enqueue_instance)\n",
" except tf.errors.OutOfRangeError as ex:\n",
" print(\"No more files to read\")\n",
" sess.run(close_instance_queue)\n",
" try:\n",
" while True:\n",
" print(sess.run([minibatch_instances, minibatch_targets]))\n",
" except tf.errors.OutOfRangeError as ex:\n",
" print(\"No more training instances\")"
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"#coord = tf.train.Coordinator()\n",
"#threads = tf.train.start_queue_runners(coord=coord)\n",
"#filename_queue = tf.train.string_input_producer([\"test.csv\"])\n",
"cell_type": "markdown",
"metadata": {},
"source": [
"# Queue runners and coordinators"
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n",
"filename = tf.placeholder(tf.string)\n",
"enqueue_filename = filename_queue.enqueue([filename])\n",
"close_filename_queue = filename_queue.close()\n",
"reader = tf.TextLineReader(skip_header_lines=1)\n",
"key, value = reader.read(filename_queue)\n",
"x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n",
"features = tf.stack([x1, x2])\n",
"instance_queue = tf.RandomShuffleQueue(\n",
" capacity=10, min_after_dequeue=2,\n",
" dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n",
" name=\"instance_q\", shared_name=\"shared_instance_q\")\n",
"enqueue_instance = instance_queue.enqueue([features, target])\n",
"close_instance_queue = instance_queue.close()\n",
"minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n",
"n_threads = 5\n",
"queue_runner = tf.train.QueueRunner(instance_queue, [enqueue_instance] * n_threads)\n",
"coord = tf.train.Coordinator()\n",
"with tf.Session() as sess:\n",
" sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n",
" sess.run(close_filename_queue)\n",
" enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)\n",
" try:\n",
" while True:\n",
" print(sess.run([minibatch_instances, minibatch_targets]))\n",
" except tf.errors.OutOfRangeError as ex:\n",
" print(\"No more training instances\")"
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"def read_and_push_instance(filename_queue, instance_queue):\n",
" reader = tf.TextLineReader(skip_header_lines=1)\n",
" key, value = reader.read(filename_queue)\n",
" x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n",
" features = tf.stack([x1, x2])\n",
" enqueue_instance = instance_queue.enqueue([features, target])\n",
" return enqueue_instance\n",
"filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n",
"filename = tf.placeholder(tf.string)\n",
"enqueue_filename = filename_queue.enqueue([filename])\n",
"close_filename_queue = filename_queue.close()\n",
"instance_queue = tf.RandomShuffleQueue(\n",
" capacity=10, min_after_dequeue=2,\n",
" dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n",
" name=\"instance_q\", shared_name=\"shared_instance_q\")\n",
"minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n",
"read_and_enqueue_ops = [read_and_push_instance(filename_queue, instance_queue) for i in range(5)]\n",
"queue_runner = tf.train.QueueRunner(instance_queue, read_and_enqueue_ops)\n",
"with tf.Session() as sess:\n",
" sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n",
" sess.run(close_filename_queue)\n",
" coord = tf.train.Coordinator()\n",
" enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)\n",
" try:\n",
" while True:\n",
" print(sess.run([minibatch_instances, minibatch_targets]))\n",
" except tf.errors.OutOfRangeError as ex:\n",
" print(\"No more training instances\")\n",
"cell_type": "markdown",
"metadata": {},
"source": [
"# Setting a timeout"
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[()])\n",
"v = tf.placeholder(tf.float32)\n",
"enqueue = q.enqueue([v])\n",
"dequeue = q.dequeue()\n",
"output = dequeue + 1\n",
"config = tf.ConfigProto()\n",
"config.operation_timeout_in_ms = 1000\n",
"with tf.Session(config=config) as sess:\n",
" sess.run(enqueue, feed_dict={v: 1.0})\n",
" sess.run(enqueue, feed_dict={v: 2.0})\n",
" sess.run(enqueue, feed_dict={v: 3.0})\n",
" print(sess.run(output))\n",
" print(sess.run(output, feed_dict={dequeue: 5}))\n",
" print(sess.run(output))\n",
" print(sess.run(output))\n",
" try:\n",
" print(sess.run(output))\n",
" except tf.errors.DeadlineExceededError as ex:\n",
" print(\"Timed out while dequeuing\")\n"
"cell_type": "markdown",
"metadata": {},
"source": [
"# Data API"
"cell_type": "markdown",
"metadata": {},
"source": [
"The Data API, introduced in TensorFlow 1.4, makes reading data efficiently much easier."
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's start with a simple dataset composed of three times the integers 0 to 9, in batches of 7:"
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))\n",
"dataset = dataset.repeat(3).batch(7)"
"cell_type": "markdown",
"metadata": {},
"source": [
"The first line creates a dataset containing the integers 0 through 9. The second line creates a new dataset based on the first one, repeating its elements three times and creating batches of 7 elements. As you can see, we start with a source dataset, then we chain calls to various methods to apply transformations to the data."
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we create a one-shot-iterator to go through this dataset just once, and we call its `get_next()` method to get a tensor that represents the next element."
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"iterator = dataset.make_one_shot_iterator()\n",
"next_element = iterator.get_next()"
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's repeatedly evaluate `next_element` to go through the dataset. When there are not more elements, we get an `OutOfRangeError`:"
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [],
"source": [
"with tf.Session() as sess:\n",
" try:\n",
" while True:\n",
" print(next_element.eval())\n",
" except tf.errors.OutOfRangeError:\n",
" print(\"Done\")"
"cell_type": "markdown",
"metadata": {},
"source": [
"Great! It worked fine."
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that, as always, a tensor is only evaluated once each time we run the graph (`sess.run()`): so even if we evaluate multiple tensors that all depend on `next_element`, it is only evaluated once. This is true as well if we ask for `next_element` to be evaluated twice in just one run:"
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"with tf.Session() as sess:\n",
" try:\n",
" while True:\n",
" print(sess.run([next_element, next_element]))\n",
" except tf.errors.OutOfRangeError:\n",
" print(\"Done\")"
"cell_type": "markdown",
"metadata": {},
"source": [
"The `interleave()` method is powerful but a bit tricky to grasp at first. The easiest way to understand it is to look at an example:"
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
"dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))\n",
"dataset = dataset.repeat(3).batch(7)\n",
"dataset = dataset.interleave(\n",
" lambda v: tf.data.Dataset.from_tensor_slices(v),\n",
" cycle_length=3,\n",
" block_length=2)\n",
"iterator = dataset.make_one_shot_iterator()\n",
"next_element = iterator.get_next()"
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [],
"source": [
"with tf.Session() as sess:\n",
" try:\n",
" while True:\n",
" print(next_element.eval(), end=\",\")\n",
" except tf.errors.OutOfRangeError:\n",
" print(\"Done\")"
"cell_type": "markdown",
"metadata": {},
"source": [
"Because `cycle_length=3`, the new dataset starts by pulling 3 elements from the previous dataset: that's `[0,1,2,3,4,5,6]`, `[7,8,9,0,1,2,3]` and `[4,5,6,7,8,9,0]`. Then it calls the lambda function we gave it to create one dataset for each of the elements. Since we use `Dataset.from_tensor_slices()`, each dataset is going to return its elements one by one. Next, it pulls two items (since `block_length=2`) from each of these three datasets, and it iterates until all three datasets are out of items: 0,1 (from 1st), 7,8 (from 2nd), 4,5 (from 3rd), 2,3 (from 1st), 9,0 (from 2nd), and so on until 8,9 (from 3rd), 6 (from 1st), 3 (from 2nd), 0 (from 3rd). Next it tries to pull the next 3 elements from the original dataset, but there are just two left: `[1,2,3,4,5,6,7]` and `[8,9]`. Again, it creates datasets from these elements, and it pulls two items from each until both datasets are out of items: 1,2 (from 1st), 8,9 (from 2nd), 3,4 (from 1st), 5,6 (from 1st), 7 (from 1st). Notice that there's no interleaving at the end since the arrays do not have the same length."
"cell_type": "markdown",
"metadata": {},
"source": [
"# Readers the new way"
"cell_type": "markdown",
"metadata": {},
"source": [
"Instead of using a source dataset based on `from_tensor_slices()` or `from_tensor()`, we can use a reader dataset. It handles most of the complexity for us (e.g., threads):"
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [],
"source": [
"filenames = [\"my_test.csv\"]"
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [],
"source": [
"dataset = tf.data.TextLineDataset(filenames)"
"cell_type": "markdown",
"metadata": {},
"source": [
"We still need to tell it how to decode each line:"
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [],
"source": [
"def decode_csv_line(line):\n",
" x1, x2, y = tf.decode_csv(\n",
" line, record_defaults=[[-1.], [-1.], [-1.]])\n",
" X = tf.stack([x1, x2])\n",
" return X, y"
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we can apply this decoding function to each element in the dataset using `map()`:"
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [],
"source": [
"dataset = dataset.skip(1).map(decode_csv_line)"
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, let's create a one-shot iterator:"
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [],
"source": [
"it = dataset.make_one_shot_iterator()\n",
"X, y = it.get_next()"
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [],
"source": [
"with tf.Session() as sess:\n",
" try:\n",
" while True:\n",
" X_val, y_val = sess.run([X, y])\n",
" print(X_val, y_val)\n",
" except tf.errors.OutOfRangeError as ex:\n",
" print(\"Done\")\n"
"cell_type": "markdown",
"metadata": {
"collapsed": true
"source": [
"# Exercise solutions"
"cell_type": "markdown",
"metadata": {},
"source": [
"**Coming soon**"
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
"nav_menu": {},
"toc": {
"navigate_menu": true,
"number_sections": true,
"sideBar": true,
"threshold": 6,
"toc_cell": false,
"toc_section_display": "block",
"toc_window_display": false
"nbformat": 4,
"nbformat_minor": 1