diff --git a/12_distributed_tensorflow.ipynb b/12_distributed_tensorflow.ipynb index 9d51b19..e457831 100644 --- a/12_distributed_tensorflow.ipynb +++ b/12_distributed_tensorflow.ipynb @@ -2,40 +2,28 @@ "cells": [ { "cell_type": "markdown", - "metadata": { - "deletable": true, - "editable": true - }, + "metadata": {}, "source": [ "**Chapter 12 – Distributed TensorFlow**" ] }, { "cell_type": "markdown", - "metadata": { - "deletable": true, - "editable": true - }, + "metadata": {}, "source": [ "_This notebook contains all the sample code and solutions to the exercises in chapter 12._" ] }, { "cell_type": "markdown", - "metadata": { - "deletable": true, - "editable": true - }, + "metadata": {}, "source": [ "# Setup" ] }, { "cell_type": "markdown", - "metadata": { - "deletable": true, - "editable": true - }, + "metadata": {}, "source": [ "First, let's make sure this notebook works well in both python 2 and 3, import a few common modules, ensure MatplotLib plots figures inline and prepare a function to save the figures:" ] @@ -43,11 +31,7 @@ { "cell_type": "code", "execution_count": 1, - "metadata": { - "collapsed": true, - "deletable": true, - "editable": true - }, + "metadata": {}, "outputs": [], "source": [ "# To support both python 2 and python 3\n", @@ -85,10 +69,7 @@ }, { "cell_type": "markdown", - "metadata": { - "deletable": true, - "editable": true - }, + "metadata": {}, "source": [ "# Local server" ] @@ -96,11 +77,7 @@ { "cell_type": "code", "execution_count": 2, - "metadata": { - "collapsed": true, - "deletable": true, - "editable": true - }, + "metadata": {}, "outputs": [], "source": [ "import tensorflow as tf" @@ -109,11 +86,7 @@ { "cell_type": "code", "execution_count": 3, - "metadata": { - "collapsed": true, - "deletable": true, - "editable": true - }, + "metadata": {}, "outputs": [], "source": [ "c = tf.constant(\"Hello distributed TensorFlow!\")\n", @@ -123,11 +96,7 @@ { "cell_type": "code", "execution_count": 4, - "metadata": { - "collapsed": false, - "deletable": true, - "editable": true - }, + "metadata": {}, "outputs": [], "source": [ "with tf.Session(server.target) as sess:\n", @@ -136,10 +105,7 @@ }, { "cell_type": "markdown", - "metadata": { - "deletable": true, - "editable": true - }, + "metadata": {}, "source": [ "# Cluster" ] @@ -147,11 +113,7 @@ { "cell_type": "code", "execution_count": 5, - "metadata": { - "collapsed": true, - "deletable": true, - "editable": true - }, + "metadata": {}, "outputs": [], "source": [ "cluster_spec = tf.train.ClusterSpec({\n", @@ -169,11 +131,7 @@ { "cell_type": "code", "execution_count": 6, - "metadata": { - "collapsed": false, - "deletable": true, - "editable": true - }, + "metadata": {}, "outputs": [], "source": [ "task_ps0 = tf.train.Server(cluster_spec, job_name=\"ps\", task_index=0)\n", @@ -185,10 +143,7 @@ }, { "cell_type": "markdown", - "metadata": { - "deletable": true, - "editable": true - }, + "metadata": {}, "source": [ "# Pinning operations across devices and servers" ] @@ -196,11 +151,7 @@ { "cell_type": "code", "execution_count": 7, - "metadata": { - "collapsed": true, - "deletable": true, - "editable": true - }, + "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", @@ -218,11 +169,7 @@ { "cell_type": "code", "execution_count": 8, - "metadata": { - "collapsed": false, - "deletable": true, - "editable": true - }, + "metadata": {}, "outputs": [], "source": [ "with tf.Session(\"grpc://127.0.0.1:2221\") as sess:\n", @@ -233,11 +180,7 @@ { "cell_type": "code", "execution_count": 9, - "metadata": { - "collapsed": false, - "deletable": true, - "editable": true - }, + "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", @@ -264,29 +207,46 @@ }, { "cell_type": "markdown", - "metadata": { - "deletable": true, - "editable": true - }, + "metadata": {}, "source": [ - "# Readers" + "# Readers – the old way" ] }, { "cell_type": "code", "execution_count": 10, - "metadata": { - "collapsed": false, - "deletable": true, - "editable": true - }, + "metadata": {}, + "outputs": [], + "source": [ + "reset_graph()" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "default1 = tf.constant([5.])\n", + "default2 = tf.constant([6])\n", + "default3 = tf.constant([7])\n", + "dec = tf.decode_csv(tf.constant(\"1.,,44\"),\n", + " record_defaults=[default1, default2, default3])\n", + "with tf.Session() as sess:\n", + " print(sess.run(dec))" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", "\n", "test_csv = open(\"my_test.csv\", \"w\")\n", "test_csv.write(\"x1, x2 , target\\n\")\n", - "test_csv.write(\"1., , 0\\n\")\n", + "test_csv.write(\"1.,, 0\\n\")\n", "test_csv.write(\"4., 5. , 1\\n\")\n", "test_csv.write(\"7., 8. , 0\\n\")\n", "test_csv.close()\n", @@ -329,12 +289,8 @@ }, { "cell_type": "code", - "execution_count": 11, - "metadata": { - "collapsed": true, - "deletable": true, - "editable": true - }, + "execution_count": 13, + "metadata": {}, "outputs": [], "source": [ "#coord = tf.train.Coordinator()\n", @@ -346,22 +302,15 @@ }, { "cell_type": "markdown", - "metadata": { - "deletable": true, - "editable": true - }, + "metadata": {}, "source": [ "# Queue runners and coordinators" ] }, { "cell_type": "code", - "execution_count": 12, - "metadata": { - "collapsed": false, - "deletable": true, - "editable": true - }, + "execution_count": 14, + "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", @@ -403,12 +352,8 @@ }, { "cell_type": "code", - "execution_count": 13, - "metadata": { - "collapsed": false, - "deletable": true, - "editable": true - }, + "execution_count": 15, + "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", @@ -451,22 +396,15 @@ }, { "cell_type": "markdown", - "metadata": { - "deletable": true, - "editable": true - }, + "metadata": {}, "source": [ "# Setting a timeout" ] }, { "cell_type": "code", - "execution_count": 14, - "metadata": { - "collapsed": false, - "deletable": true, - "editable": true - }, + "execution_count": 16, + "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", @@ -494,12 +432,285 @@ " print(\"Timed out while dequeuing\")\n" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Data API" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The Data API, introduced in TensorFlow 1.4, makes reading data efficiently much easier." + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [], + "source": [ + "tf.reset_default_graph()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's start with a simple dataset composed of three times the integers 0 to 9, in batches of 7:" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [], + "source": [ + "dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))\n", + "dataset = dataset.repeat(3).batch(7)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The first line creates a dataset containing the integers 0 through 9. The second line creates a new dataset based on the first one, repeating its elements three times and creating batches of 7 elements. As you can see, we start with a source dataset, then we chain calls to various methods to apply transformations to the data." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, we create a one-shot-iterator to go through this dataset just once, and we call its `get_next()` method to get a tensor that represents the next element." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [], + "source": [ + "iterator = dataset.make_one_shot_iterator()\n", + "next_element = iterator.get_next()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's repeatedly evaluate `next_element` to go through the dataset. When there are not more elements, we get an `OutOfRangeError`:" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [], + "source": [ + "with tf.Session() as sess:\n", + " try:\n", + " while True:\n", + " print(next_element.eval())\n", + " except tf.errors.OutOfRangeError:\n", + " print(\"Done\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Great! It worked fine." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Note that, as always, a tensor is only evaluated once each time we run the graph (`sess.run()`): so even if we evaluate multiple tensors that all depend on `next_element`, it is only evaluated once. This is true as well if we ask for `next_element` to be evaluated twice in just one run:" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [], + "source": [ + "with tf.Session() as sess:\n", + " try:\n", + " while True:\n", + " print(sess.run([next_element, next_element]))\n", + " except tf.errors.OutOfRangeError:\n", + " print(\"Done\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The `interleave()` method is powerful but a bit tricky to grasp at first. The easiest way to understand it is to look at an example:" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [], + "source": [ + "tf.reset_default_graph()" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [], + "source": [ + "dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))\n", + "dataset = dataset.repeat(3).batch(7)\n", + "dataset = dataset.interleave(\n", + " lambda v: tf.data.Dataset.from_tensor_slices(v),\n", + " cycle_length=3,\n", + " block_length=2)\n", + "iterator = dataset.make_one_shot_iterator()\n", + "next_element = iterator.get_next()" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [], + "source": [ + "with tf.Session() as sess:\n", + " try:\n", + " while True:\n", + " print(next_element.eval(), end=\",\")\n", + " except tf.errors.OutOfRangeError:\n", + " print(\"Done\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Because `cycle_length=3`, the new dataset starts by pulling 3 elements from the previous dataset: that's `[0,1,2,3,4,5,6]`, `[7,8,9,0,1,2,3]` and `[4,5,6,7,8,9,0]`. Then it calls the lambda function we gave it to create one dataset for each of the elements. Since we use `Dataset.from_tensor_slices()`, each dataset is going to return its elements one by one. Next, it pulls two items (since `block_length=2`) from each of these three datasets, and it iterates until all three datasets are out of items: 0,1 (from 1st), 7,8 (from 2nd), 4,5 (from 3rd), 2,3 (from 1st), 9,0 (from 2nd), and so on until 8,9 (from 3rd), 6 (from 1st), 3 (from 2nd), 0 (from 3rd). Next it tries to pull the next 3 elements from the original dataset, but there are just two left: `[1,2,3,4,5,6,7]` and `[8,9]`. Again, it creates datasets from these elements, and it pulls two items from each until both datasets are out of items: 1,2 (from 1st), 8,9 (from 2nd), 3,4 (from 1st), 5,6 (from 1st), 7 (from 1st). Notice that there's no interleaving at the end since the arrays do not have the same length." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Readers – the new way" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Instead of using a source dataset based on `from_tensor_slices()` or `from_tensor()`, we can use a reader dataset. It handles most of the complexity for us (e.g., threads):" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [], + "source": [ + "tf.reset_default_graph()" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [], + "source": [ + "filenames = [\"my_test.csv\"]" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [], + "source": [ + "dataset = tf.data.TextLineDataset(filenames)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We still need to tell it how to decode each line:" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "metadata": {}, + "outputs": [], + "source": [ + "def decode_csv_line(line):\n", + " x1, x2, y = tf.decode_csv(\n", + " line, record_defaults=[[-1.], [-1.], [-1.]])\n", + " X = tf.stack([x1, x2])\n", + " return X, y" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, we can apply this decoding function to each element in the dataset using `map()`:" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "metadata": {}, + "outputs": [], + "source": [ + "dataset = dataset.skip(1).map(decode_csv_line)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, let's create a one-shot iterator:" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "metadata": {}, + "outputs": [], + "source": [ + "it = dataset.make_one_shot_iterator()\n", + "X, y = it.get_next()" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "metadata": {}, + "outputs": [], + "source": [ + "with tf.Session() as sess:\n", + " try:\n", + " while True:\n", + " X_val, y_val = sess.run([X, y])\n", + " print(X_val, y_val)\n", + " except tf.errors.OutOfRangeError as ex:\n", + " print(\"Done\")\n" + ] + }, { "cell_type": "markdown", "metadata": { - "collapsed": true, - "deletable": true, - "editable": true + "collapsed": true }, "source": [ "# Exercise solutions" @@ -507,22 +718,15 @@ }, { "cell_type": "markdown", - "metadata": { - "deletable": true, - "editable": true - }, + "metadata": {}, "source": [ "**Coming soon**" ] }, { "cell_type": "code", - "execution_count": 15, - "metadata": { - "collapsed": true, - "deletable": true, - "editable": true - }, + "execution_count": null, + "metadata": {}, "outputs": [], "source": [] } @@ -543,7 +747,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.5.3" + "version": "3.6.5" }, "nav_menu": {}, "toc": { @@ -557,5 +761,5 @@ } }, "nbformat": 4, - "nbformat_minor": 0 + "nbformat_minor": 1 }