handson-ml/12_distributed_tensorflow.i...

560 lines
15 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"**Chapter 12 Distributed TensorFlow**"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"_This notebook contains all the sample code and solutions to the exercices in chapter 12._"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"# Setup"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"First, let's make sure this notebook works well in both python 2 and 3, import a few common modules, ensure MatplotLib plots figures inline and prepare a function to save the figures:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"# To support both python 2 and python 3\n",
"from __future__ import division, print_function, unicode_literals\n",
"\n",
"# Common imports\n",
"import numpy as np\n",
"import numpy.random as rnd\n",
"import os\n",
"\n",
"# to make this notebook's output stable across runs\n",
"rnd.seed(42)\n",
"\n",
"# To plot pretty figures\n",
"%matplotlib inline\n",
"import matplotlib\n",
"import matplotlib.pyplot as plt\n",
"plt.rcParams['axes.labelsize'] = 14\n",
"plt.rcParams['xtick.labelsize'] = 12\n",
"plt.rcParams['ytick.labelsize'] = 12\n",
"\n",
"# Where to save the figures\n",
"PROJECT_ROOT_DIR = \".\"\n",
"CHAPTER_ID = \"distributed\"\n",
"\n",
"def save_fig(fig_id, tight_layout=True):\n",
" path = os.path.join(PROJECT_ROOT_DIR, \"images\", CHAPTER_ID, fig_id + \".png\")\n",
" print(\"Saving figure\", fig_id)\n",
" if tight_layout:\n",
" plt.tight_layout()\n",
" plt.savefig(path, format='png', dpi=300)"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"# Local server"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"import tensorflow as tf"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"c = tf.constant(\"Hello distributed TensorFlow!\")\n",
"server = tf.train.Server.create_local_server()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"with tf.Session(server.target) as sess:\n",
" print(sess.run(c))"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"# Cluster"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"cluster_spec = tf.train.ClusterSpec({\n",
" \"ps\": [\n",
" \"127.0.0.1:2221\", # /job:ps/task:0\n",
" \"127.0.0.1:2222\", # /job:ps/task:1\n",
" ],\n",
" \"worker\": [\n",
" \"127.0.0.1:2223\", # /job:worker/task:0\n",
" \"127.0.0.1:2224\", # /job:worker/task:1\n",
" \"127.0.0.1:2225\", # /job:worker/task:2\n",
" ]})"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"task_ps0 = tf.train.Server(cluster_spec, job_name=\"ps\", task_index=0)\n",
"task_ps1 = tf.train.Server(cluster_spec, job_name=\"ps\", task_index=1)\n",
"task_worker0 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=0)\n",
"task_worker1 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=1)\n",
"task_worker2 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=2)"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"# Pinning operations across devices and servers"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"tf.reset_default_graph()\n",
"\n",
"with tf.device(\"/job:ps\"):\n",
" a = tf.Variable(1.0, name=\"a\")\n",
"\n",
"with tf.device(\"/job:worker\"):\n",
" b = a + 2\n",
"\n",
"with tf.device(\"/job:worker/task:1\"):\n",
" c = a + b"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"with tf.Session(\"grpc://127.0.0.1:2221\") as sess:\n",
" sess.run(a.initializer)\n",
" print(c.eval())"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"tf.reset_default_graph()\n",
"\n",
"with tf.device(tf.train.replica_device_setter(\n",
" ps_tasks=2,\n",
" ps_device=\"/job:ps\",\n",
" worker_device=\"/job:worker\")):\n",
" v1 = tf.Variable(1.0, name=\"v1\") # pinned to /job:ps/task:0 (defaults to /cpu:0)\n",
" v2 = tf.Variable(2.0, name=\"v2\") # pinned to /job:ps/task:1 (defaults to /cpu:0)\n",
" v3 = tf.Variable(3.0, name=\"v3\") # pinned to /job:ps/task:0 (defaults to /cpu:0)\n",
" s = v1 + v2 # pinned to /job:worker (defaults to task:0/cpu:0)\n",
" with tf.device(\"/task:1\"):\n",
" p1 = 2 * s # pinned to /job:worker/task:1 (defaults to /cpu:0)\n",
" with tf.device(\"/cpu:0\"):\n",
" p2 = 3 * s # pinned to /job:worker/task:1/cpu:0\n",
"\n",
"config = tf.ConfigProto()\n",
"config.log_device_placement = True\n",
"\n",
"with tf.Session(\"grpc://127.0.0.1:2221\", config=config) as sess:\n",
" v1.initializer.run()"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"# Readers"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"tf.reset_default_graph()\n",
"\n",
"test_csv = open(\"my_test.csv\", \"w\")\n",
"test_csv.write(\"x1, x2 , target\\n\")\n",
"test_csv.write(\"1., , 0\\n\")\n",
"test_csv.write(\"4., 5. , 1\\n\")\n",
"test_csv.write(\"7., 8. , 0\\n\")\n",
"test_csv.close()\n",
"\n",
"filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n",
"filename = tf.placeholder(tf.string)\n",
"enqueue_filename = filename_queue.enqueue([filename])\n",
"close_filename_queue = filename_queue.close()\n",
"\n",
"reader = tf.TextLineReader(skip_header_lines=1)\n",
"key, value = reader.read(filename_queue)\n",
"\n",
"x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n",
"features = tf.stack([x1, x2])\n",
"\n",
"instance_queue = tf.RandomShuffleQueue(\n",
" capacity=10, min_after_dequeue=2,\n",
" dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n",
" name=\"instance_q\", shared_name=\"shared_instance_q\")\n",
"enqueue_instance = instance_queue.enqueue([features, target])\n",
"close_instance_queue = instance_queue.close()\n",
"\n",
"minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n",
"\n",
"with tf.Session() as sess:\n",
" sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n",
" sess.run(close_filename_queue)\n",
" try:\n",
" while True:\n",
" sess.run(enqueue_instance)\n",
" except tf.errors.OutOfRangeError as ex:\n",
" print(\"No more files to read\")\n",
" sess.run(close_instance_queue)\n",
" try:\n",
" while True:\n",
" print(sess.run([minibatch_instances, minibatch_targets]))\n",
" except tf.errors.OutOfRangeError as ex:\n",
" print(\"No more training instances\")"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"#coord = tf.train.Coordinator()\n",
"#threads = tf.train.start_queue_runners(coord=coord)\n",
"#filename_queue = tf.train.string_input_producer([\"test.csv\"])\n",
"#coord.request_stop()\n",
"#coord.join(threads)"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"# Queue runners and coordinators"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"tf.reset_default_graph()\n",
"\n",
"filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n",
"filename = tf.placeholder(tf.string)\n",
"enqueue_filename = filename_queue.enqueue([filename])\n",
"close_filename_queue = filename_queue.close()\n",
"\n",
"reader = tf.TextLineReader(skip_header_lines=1)\n",
"key, value = reader.read(filename_queue)\n",
"\n",
"x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n",
"features = tf.stack([x1, x2])\n",
"\n",
"instance_queue = tf.RandomShuffleQueue(\n",
" capacity=10, min_after_dequeue=2,\n",
" dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n",
" name=\"instance_q\", shared_name=\"shared_instance_q\")\n",
"enqueue_instance = instance_queue.enqueue([features, target])\n",
"close_instance_queue = instance_queue.close()\n",
"\n",
"minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n",
"\n",
"n_threads = 5\n",
"queue_runner = tf.train.QueueRunner(instance_queue, [enqueue_instance] * n_threads)\n",
"coord = tf.train.Coordinator()\n",
"\n",
"with tf.Session() as sess:\n",
" sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n",
" sess.run(close_filename_queue)\n",
" enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)\n",
" try:\n",
" while True:\n",
" print(sess.run([minibatch_instances, minibatch_targets]))\n",
" except tf.errors.OutOfRangeError as ex:\n",
" print(\"No more training instances\")"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"tf.reset_default_graph()\n",
"\n",
"def read_and_push_instance(filename_queue, instance_queue):\n",
" reader = tf.TextLineReader(skip_header_lines=1)\n",
" key, value = reader.read(filename_queue)\n",
" x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n",
" features = tf.stack([x1, x2])\n",
" enqueue_instance = instance_queue.enqueue([features, target])\n",
" return enqueue_instance\n",
"\n",
"filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n",
"filename = tf.placeholder(tf.string)\n",
"enqueue_filename = filename_queue.enqueue([filename])\n",
"close_filename_queue = filename_queue.close()\n",
"\n",
"instance_queue = tf.RandomShuffleQueue(\n",
" capacity=10, min_after_dequeue=2,\n",
" dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n",
" name=\"instance_q\", shared_name=\"shared_instance_q\")\n",
"\n",
"minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n",
"\n",
"read_and_enqueue_ops = [read_and_push_instance(filename_queue, instance_queue) for i in range(5)]\n",
"queue_runner = tf.train.QueueRunner(instance_queue, read_and_enqueue_ops)\n",
"\n",
"with tf.Session() as sess:\n",
" sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n",
" sess.run(close_filename_queue)\n",
" coord = tf.train.Coordinator()\n",
" enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)\n",
" try:\n",
" while True:\n",
" print(sess.run([minibatch_instances, minibatch_targets]))\n",
" except tf.errors.OutOfRangeError as ex:\n",
" print(\"No more training instances\")\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"# Setting a timeout"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"tf.reset_default_graph()\n",
"\n",
"q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[()])\n",
"v = tf.placeholder(tf.float32)\n",
"enqueue = q.enqueue([v])\n",
"dequeue = q.dequeue()\n",
"output = dequeue + 1\n",
"\n",
"config = tf.ConfigProto()\n",
"config.operation_timeout_in_ms = 1000\n",
"\n",
"with tf.Session(config=config) as sess:\n",
" sess.run(enqueue, feed_dict={v: 1.0})\n",
" sess.run(enqueue, feed_dict={v: 2.0})\n",
" sess.run(enqueue, feed_dict={v: 3.0})\n",
" print(sess.run(output))\n",
" print(sess.run(output, feed_dict={dequeue: 5}))\n",
" print(sess.run(output))\n",
" print(sess.run(output))\n",
" try:\n",
" print(sess.run(output))\n",
" except tf.errors.DeadlineExceededError as ex:\n",
" print(\"Timed out while dequeuing\")\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"source": [
"# Exercise solutions"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"**Coming soon**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.3"
},
"nav_menu": {},
"toc": {
"navigate_menu": true,
"number_sections": true,
"sideBar": true,
"threshold": 6,
"toc_cell": false,
"toc_section_display": "block",
"toc_window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 0
}