Add a quick intro to the Data API in notebook 12
parent
0a1b040d9d
commit
373a52c39e
|
@ -2,40 +2,28 @@
|
|||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"**Chapter 12 – Distributed TensorFlow**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"_This notebook contains all the sample code and solutions to the exercises in chapter 12._"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Setup"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"First, let's make sure this notebook works well in both python 2 and 3, import a few common modules, ensure MatplotLib plots figures inline and prepare a function to save the figures:"
|
||||
]
|
||||
|
@ -43,11 +31,7 @@
|
|||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"metadata": {
|
||||
"collapsed": true,
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# To support both python 2 and python 3\n",
|
||||
|
@ -85,10 +69,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Local server"
|
||||
]
|
||||
|
@ -96,11 +77,7 @@
|
|||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"metadata": {
|
||||
"collapsed": true,
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import tensorflow as tf"
|
||||
|
@ -109,11 +86,7 @@
|
|||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 3,
|
||||
"metadata": {
|
||||
"collapsed": true,
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"c = tf.constant(\"Hello distributed TensorFlow!\")\n",
|
||||
|
@ -123,11 +96,7 @@
|
|||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"metadata": {
|
||||
"collapsed": false,
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"with tf.Session(server.target) as sess:\n",
|
||||
|
@ -136,10 +105,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Cluster"
|
||||
]
|
||||
|
@ -147,11 +113,7 @@
|
|||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 5,
|
||||
"metadata": {
|
||||
"collapsed": true,
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"cluster_spec = tf.train.ClusterSpec({\n",
|
||||
|
@ -169,11 +131,7 @@
|
|||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 6,
|
||||
"metadata": {
|
||||
"collapsed": false,
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"task_ps0 = tf.train.Server(cluster_spec, job_name=\"ps\", task_index=0)\n",
|
||||
|
@ -185,10 +143,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Pinning operations across devices and servers"
|
||||
]
|
||||
|
@ -196,11 +151,7 @@
|
|||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 7,
|
||||
"metadata": {
|
||||
"collapsed": true,
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"reset_graph()\n",
|
||||
|
@ -218,11 +169,7 @@
|
|||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 8,
|
||||
"metadata": {
|
||||
"collapsed": false,
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"with tf.Session(\"grpc://127.0.0.1:2221\") as sess:\n",
|
||||
|
@ -233,11 +180,7 @@
|
|||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 9,
|
||||
"metadata": {
|
||||
"collapsed": false,
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"reset_graph()\n",
|
||||
|
@ -264,29 +207,46 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Readers"
|
||||
"# Readers – the old way"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 10,
|
||||
"metadata": {
|
||||
"collapsed": false,
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"reset_graph()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 11,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"default1 = tf.constant([5.])\n",
|
||||
"default2 = tf.constant([6])\n",
|
||||
"default3 = tf.constant([7])\n",
|
||||
"dec = tf.decode_csv(tf.constant(\"1.,,44\"),\n",
|
||||
" record_defaults=[default1, default2, default3])\n",
|
||||
"with tf.Session() as sess:\n",
|
||||
" print(sess.run(dec))"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 12,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"reset_graph()\n",
|
||||
"\n",
|
||||
"test_csv = open(\"my_test.csv\", \"w\")\n",
|
||||
"test_csv.write(\"x1, x2 , target\\n\")\n",
|
||||
"test_csv.write(\"1., , 0\\n\")\n",
|
||||
"test_csv.write(\"1.,, 0\\n\")\n",
|
||||
"test_csv.write(\"4., 5. , 1\\n\")\n",
|
||||
"test_csv.write(\"7., 8. , 0\\n\")\n",
|
||||
"test_csv.close()\n",
|
||||
|
@ -329,12 +289,8 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 11,
|
||||
"metadata": {
|
||||
"collapsed": true,
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"execution_count": 13,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"#coord = tf.train.Coordinator()\n",
|
||||
|
@ -346,22 +302,15 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Queue runners and coordinators"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 12,
|
||||
"metadata": {
|
||||
"collapsed": false,
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"execution_count": 14,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"reset_graph()\n",
|
||||
|
@ -403,12 +352,8 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 13,
|
||||
"metadata": {
|
||||
"collapsed": false,
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"execution_count": 15,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"reset_graph()\n",
|
||||
|
@ -451,22 +396,15 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Setting a timeout"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 14,
|
||||
"metadata": {
|
||||
"collapsed": false,
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"execution_count": 16,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"reset_graph()\n",
|
||||
|
@ -494,12 +432,285 @@
|
|||
" print(\"Timed out while dequeuing\")\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Data API"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"The Data API, introduced in TensorFlow 1.4, makes reading data efficiently much easier."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 17,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"tf.reset_default_graph()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Let's start with a simple dataset composed of three times the integers 0 to 9, in batches of 7:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 18,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))\n",
|
||||
"dataset = dataset.repeat(3).batch(7)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"The first line creates a dataset containing the integers 0 through 9. The second line creates a new dataset based on the first one, repeating its elements three times and creating batches of 7 elements. As you can see, we start with a source dataset, then we chain calls to various methods to apply transformations to the data."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Next, we create a one-shot-iterator to go through this dataset just once, and we call its `get_next()` method to get a tensor that represents the next element."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 19,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"iterator = dataset.make_one_shot_iterator()\n",
|
||||
"next_element = iterator.get_next()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Let's repeatedly evaluate `next_element` to go through the dataset. When there are not more elements, we get an `OutOfRangeError`:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 20,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"with tf.Session() as sess:\n",
|
||||
" try:\n",
|
||||
" while True:\n",
|
||||
" print(next_element.eval())\n",
|
||||
" except tf.errors.OutOfRangeError:\n",
|
||||
" print(\"Done\")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Great! It worked fine."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Note that, as always, a tensor is only evaluated once each time we run the graph (`sess.run()`): so even if we evaluate multiple tensors that all depend on `next_element`, it is only evaluated once. This is true as well if we ask for `next_element` to be evaluated twice in just one run:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 21,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"with tf.Session() as sess:\n",
|
||||
" try:\n",
|
||||
" while True:\n",
|
||||
" print(sess.run([next_element, next_element]))\n",
|
||||
" except tf.errors.OutOfRangeError:\n",
|
||||
" print(\"Done\")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"The `interleave()` method is powerful but a bit tricky to grasp at first. The easiest way to understand it is to look at an example:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 22,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"tf.reset_default_graph()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 23,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))\n",
|
||||
"dataset = dataset.repeat(3).batch(7)\n",
|
||||
"dataset = dataset.interleave(\n",
|
||||
" lambda v: tf.data.Dataset.from_tensor_slices(v),\n",
|
||||
" cycle_length=3,\n",
|
||||
" block_length=2)\n",
|
||||
"iterator = dataset.make_one_shot_iterator()\n",
|
||||
"next_element = iterator.get_next()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 24,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"with tf.Session() as sess:\n",
|
||||
" try:\n",
|
||||
" while True:\n",
|
||||
" print(next_element.eval(), end=\",\")\n",
|
||||
" except tf.errors.OutOfRangeError:\n",
|
||||
" print(\"Done\")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Because `cycle_length=3`, the new dataset starts by pulling 3 elements from the previous dataset: that's `[0,1,2,3,4,5,6]`, `[7,8,9,0,1,2,3]` and `[4,5,6,7,8,9,0]`. Then it calls the lambda function we gave it to create one dataset for each of the elements. Since we use `Dataset.from_tensor_slices()`, each dataset is going to return its elements one by one. Next, it pulls two items (since `block_length=2`) from each of these three datasets, and it iterates until all three datasets are out of items: 0,1 (from 1st), 7,8 (from 2nd), 4,5 (from 3rd), 2,3 (from 1st), 9,0 (from 2nd), and so on until 8,9 (from 3rd), 6 (from 1st), 3 (from 2nd), 0 (from 3rd). Next it tries to pull the next 3 elements from the original dataset, but there are just two left: `[1,2,3,4,5,6,7]` and `[8,9]`. Again, it creates datasets from these elements, and it pulls two items from each until both datasets are out of items: 1,2 (from 1st), 8,9 (from 2nd), 3,4 (from 1st), 5,6 (from 1st), 7 (from 1st). Notice that there's no interleaving at the end since the arrays do not have the same length."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Readers – the new way"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Instead of using a source dataset based on `from_tensor_slices()` or `from_tensor()`, we can use a reader dataset. It handles most of the complexity for us (e.g., threads):"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 25,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"tf.reset_default_graph()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 26,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"filenames = [\"my_test.csv\"]"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 27,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"dataset = tf.data.TextLineDataset(filenames)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"We still need to tell it how to decode each line:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 28,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"def decode_csv_line(line):\n",
|
||||
" x1, x2, y = tf.decode_csv(\n",
|
||||
" line, record_defaults=[[-1.], [-1.], [-1.]])\n",
|
||||
" X = tf.stack([x1, x2])\n",
|
||||
" return X, y"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Next, we can apply this decoding function to each element in the dataset using `map()`:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 29,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"dataset = dataset.skip(1).map(decode_csv_line)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Finally, let's create a one-shot iterator:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 30,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"it = dataset.make_one_shot_iterator()\n",
|
||||
"X, y = it.get_next()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 31,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"with tf.Session() as sess:\n",
|
||||
" try:\n",
|
||||
" while True:\n",
|
||||
" X_val, y_val = sess.run([X, y])\n",
|
||||
" print(X_val, y_val)\n",
|
||||
" except tf.errors.OutOfRangeError as ex:\n",
|
||||
" print(\"Done\")\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"collapsed": true,
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
"collapsed": true
|
||||
},
|
||||
"source": [
|
||||
"# Exercise solutions"
|
||||
|
@ -507,22 +718,15 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"**Coming soon**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 15,
|
||||
"metadata": {
|
||||
"collapsed": true,
|
||||
"deletable": true,
|
||||
"editable": true
|
||||
},
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
|
@ -543,7 +747,7 @@
|
|||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.5.3"
|
||||
"version": "3.6.5"
|
||||
},
|
||||
"nav_menu": {},
|
||||
"toc": {
|
||||
|
@ -557,5 +761,5 @@
|
|||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 0
|
||||
"nbformat_minor": 1
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue