560 lines
15 KiB
Plaintext
560 lines
15 KiB
Plaintext
{
|
||
"cells": [
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"source": [
|
||
"**Chapter 12 – Distributed TensorFlow**"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"source": [
|
||
"_This notebook contains all the sample code and solutions to the exercices in chapter 12._"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"source": [
|
||
"# Setup"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"source": [
|
||
"First, let's make sure this notebook works well in both python 2 and 3, import a few common modules, ensure MatplotLib plots figures inline and prepare a function to save the figures:"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 1,
|
||
"metadata": {
|
||
"collapsed": true,
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"# To support both python 2 and python 3\n",
|
||
"from __future__ import division, print_function, unicode_literals\n",
|
||
"\n",
|
||
"# Common imports\n",
|
||
"import numpy as np\n",
|
||
"import numpy.random as rnd\n",
|
||
"import os\n",
|
||
"\n",
|
||
"# to make this notebook's output stable across runs\n",
|
||
"rnd.seed(42)\n",
|
||
"\n",
|
||
"# To plot pretty figures\n",
|
||
"%matplotlib inline\n",
|
||
"import matplotlib\n",
|
||
"import matplotlib.pyplot as plt\n",
|
||
"plt.rcParams['axes.labelsize'] = 14\n",
|
||
"plt.rcParams['xtick.labelsize'] = 12\n",
|
||
"plt.rcParams['ytick.labelsize'] = 12\n",
|
||
"\n",
|
||
"# Where to save the figures\n",
|
||
"PROJECT_ROOT_DIR = \".\"\n",
|
||
"CHAPTER_ID = \"distributed\"\n",
|
||
"\n",
|
||
"def save_fig(fig_id, tight_layout=True):\n",
|
||
" path = os.path.join(PROJECT_ROOT_DIR, \"images\", CHAPTER_ID, fig_id + \".png\")\n",
|
||
" print(\"Saving figure\", fig_id)\n",
|
||
" if tight_layout:\n",
|
||
" plt.tight_layout()\n",
|
||
" plt.savefig(path, format='png', dpi=300)"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"source": [
|
||
"# Local server"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 2,
|
||
"metadata": {
|
||
"collapsed": true,
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"import tensorflow as tf"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 3,
|
||
"metadata": {
|
||
"collapsed": true,
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"c = tf.constant(\"Hello distributed TensorFlow!\")\n",
|
||
"server = tf.train.Server.create_local_server()"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 4,
|
||
"metadata": {
|
||
"collapsed": false,
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"with tf.Session(server.target) as sess:\n",
|
||
" print(sess.run(c))"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"source": [
|
||
"# Cluster"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 5,
|
||
"metadata": {
|
||
"collapsed": true,
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"cluster_spec = tf.train.ClusterSpec({\n",
|
||
" \"ps\": [\n",
|
||
" \"127.0.0.1:2221\", # /job:ps/task:0\n",
|
||
" \"127.0.0.1:2222\", # /job:ps/task:1\n",
|
||
" ],\n",
|
||
" \"worker\": [\n",
|
||
" \"127.0.0.1:2223\", # /job:worker/task:0\n",
|
||
" \"127.0.0.1:2224\", # /job:worker/task:1\n",
|
||
" \"127.0.0.1:2225\", # /job:worker/task:2\n",
|
||
" ]})"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 6,
|
||
"metadata": {
|
||
"collapsed": false,
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"task_ps0 = tf.train.Server(cluster_spec, job_name=\"ps\", task_index=0)\n",
|
||
"task_ps1 = tf.train.Server(cluster_spec, job_name=\"ps\", task_index=1)\n",
|
||
"task_worker0 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=0)\n",
|
||
"task_worker1 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=1)\n",
|
||
"task_worker2 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=2)"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"source": [
|
||
"# Pinning operations across devices and servers"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 7,
|
||
"metadata": {
|
||
"collapsed": true,
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"tf.reset_default_graph()\n",
|
||
"\n",
|
||
"with tf.device(\"/job:ps\"):\n",
|
||
" a = tf.Variable(1.0, name=\"a\")\n",
|
||
"\n",
|
||
"with tf.device(\"/job:worker\"):\n",
|
||
" b = a + 2\n",
|
||
"\n",
|
||
"with tf.device(\"/job:worker/task:1\"):\n",
|
||
" c = a + b"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 8,
|
||
"metadata": {
|
||
"collapsed": false,
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"with tf.Session(\"grpc://127.0.0.1:2221\") as sess:\n",
|
||
" sess.run(a.initializer)\n",
|
||
" print(c.eval())"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 9,
|
||
"metadata": {
|
||
"collapsed": false,
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"tf.reset_default_graph()\n",
|
||
"\n",
|
||
"with tf.device(tf.train.replica_device_setter(\n",
|
||
" ps_tasks=2,\n",
|
||
" ps_device=\"/job:ps\",\n",
|
||
" worker_device=\"/job:worker\")):\n",
|
||
" v1 = tf.Variable(1.0, name=\"v1\") # pinned to /job:ps/task:0 (defaults to /cpu:0)\n",
|
||
" v2 = tf.Variable(2.0, name=\"v2\") # pinned to /job:ps/task:1 (defaults to /cpu:0)\n",
|
||
" v3 = tf.Variable(3.0, name=\"v3\") # pinned to /job:ps/task:0 (defaults to /cpu:0)\n",
|
||
" s = v1 + v2 # pinned to /job:worker (defaults to task:0/cpu:0)\n",
|
||
" with tf.device(\"/task:1\"):\n",
|
||
" p1 = 2 * s # pinned to /job:worker/task:1 (defaults to /cpu:0)\n",
|
||
" with tf.device(\"/cpu:0\"):\n",
|
||
" p2 = 3 * s # pinned to /job:worker/task:1/cpu:0\n",
|
||
"\n",
|
||
"config = tf.ConfigProto()\n",
|
||
"config.log_device_placement = True\n",
|
||
"\n",
|
||
"with tf.Session(\"grpc://127.0.0.1:2221\", config=config) as sess:\n",
|
||
" v1.initializer.run()"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"source": [
|
||
"# Readers"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 10,
|
||
"metadata": {
|
||
"collapsed": false,
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"tf.reset_default_graph()\n",
|
||
"\n",
|
||
"test_csv = open(\"my_test.csv\", \"w\")\n",
|
||
"test_csv.write(\"x1, x2 , target\\n\")\n",
|
||
"test_csv.write(\"1., , 0\\n\")\n",
|
||
"test_csv.write(\"4., 5. , 1\\n\")\n",
|
||
"test_csv.write(\"7., 8. , 0\\n\")\n",
|
||
"test_csv.close()\n",
|
||
"\n",
|
||
"filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n",
|
||
"filename = tf.placeholder(tf.string)\n",
|
||
"enqueue_filename = filename_queue.enqueue([filename])\n",
|
||
"close_filename_queue = filename_queue.close()\n",
|
||
"\n",
|
||
"reader = tf.TextLineReader(skip_header_lines=1)\n",
|
||
"key, value = reader.read(filename_queue)\n",
|
||
"\n",
|
||
"x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n",
|
||
"features = tf.stack([x1, x2])\n",
|
||
"\n",
|
||
"instance_queue = tf.RandomShuffleQueue(\n",
|
||
" capacity=10, min_after_dequeue=2,\n",
|
||
" dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n",
|
||
" name=\"instance_q\", shared_name=\"shared_instance_q\")\n",
|
||
"enqueue_instance = instance_queue.enqueue([features, target])\n",
|
||
"close_instance_queue = instance_queue.close()\n",
|
||
"\n",
|
||
"minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n",
|
||
"\n",
|
||
"with tf.Session() as sess:\n",
|
||
" sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n",
|
||
" sess.run(close_filename_queue)\n",
|
||
" try:\n",
|
||
" while True:\n",
|
||
" sess.run(enqueue_instance)\n",
|
||
" except tf.errors.OutOfRangeError as ex:\n",
|
||
" print(\"No more files to read\")\n",
|
||
" sess.run(close_instance_queue)\n",
|
||
" try:\n",
|
||
" while True:\n",
|
||
" print(sess.run([minibatch_instances, minibatch_targets]))\n",
|
||
" except tf.errors.OutOfRangeError as ex:\n",
|
||
" print(\"No more training instances\")"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 11,
|
||
"metadata": {
|
||
"collapsed": true,
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"#coord = tf.train.Coordinator()\n",
|
||
"#threads = tf.train.start_queue_runners(coord=coord)\n",
|
||
"#filename_queue = tf.train.string_input_producer([\"test.csv\"])\n",
|
||
"#coord.request_stop()\n",
|
||
"#coord.join(threads)"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"source": [
|
||
"# Queue runners and coordinators"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 12,
|
||
"metadata": {
|
||
"collapsed": false,
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"tf.reset_default_graph()\n",
|
||
"\n",
|
||
"filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n",
|
||
"filename = tf.placeholder(tf.string)\n",
|
||
"enqueue_filename = filename_queue.enqueue([filename])\n",
|
||
"close_filename_queue = filename_queue.close()\n",
|
||
"\n",
|
||
"reader = tf.TextLineReader(skip_header_lines=1)\n",
|
||
"key, value = reader.read(filename_queue)\n",
|
||
"\n",
|
||
"x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n",
|
||
"features = tf.stack([x1, x2])\n",
|
||
"\n",
|
||
"instance_queue = tf.RandomShuffleQueue(\n",
|
||
" capacity=10, min_after_dequeue=2,\n",
|
||
" dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n",
|
||
" name=\"instance_q\", shared_name=\"shared_instance_q\")\n",
|
||
"enqueue_instance = instance_queue.enqueue([features, target])\n",
|
||
"close_instance_queue = instance_queue.close()\n",
|
||
"\n",
|
||
"minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n",
|
||
"\n",
|
||
"n_threads = 5\n",
|
||
"queue_runner = tf.train.QueueRunner(instance_queue, [enqueue_instance] * n_threads)\n",
|
||
"coord = tf.train.Coordinator()\n",
|
||
"\n",
|
||
"with tf.Session() as sess:\n",
|
||
" sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n",
|
||
" sess.run(close_filename_queue)\n",
|
||
" enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)\n",
|
||
" try:\n",
|
||
" while True:\n",
|
||
" print(sess.run([minibatch_instances, minibatch_targets]))\n",
|
||
" except tf.errors.OutOfRangeError as ex:\n",
|
||
" print(\"No more training instances\")"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 13,
|
||
"metadata": {
|
||
"collapsed": false,
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"tf.reset_default_graph()\n",
|
||
"\n",
|
||
"def read_and_push_instance(filename_queue, instance_queue):\n",
|
||
" reader = tf.TextLineReader(skip_header_lines=1)\n",
|
||
" key, value = reader.read(filename_queue)\n",
|
||
" x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n",
|
||
" features = tf.stack([x1, x2])\n",
|
||
" enqueue_instance = instance_queue.enqueue([features, target])\n",
|
||
" return enqueue_instance\n",
|
||
"\n",
|
||
"filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n",
|
||
"filename = tf.placeholder(tf.string)\n",
|
||
"enqueue_filename = filename_queue.enqueue([filename])\n",
|
||
"close_filename_queue = filename_queue.close()\n",
|
||
"\n",
|
||
"instance_queue = tf.RandomShuffleQueue(\n",
|
||
" capacity=10, min_after_dequeue=2,\n",
|
||
" dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n",
|
||
" name=\"instance_q\", shared_name=\"shared_instance_q\")\n",
|
||
"\n",
|
||
"minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n",
|
||
"\n",
|
||
"read_and_enqueue_ops = [read_and_push_instance(filename_queue, instance_queue) for i in range(5)]\n",
|
||
"queue_runner = tf.train.QueueRunner(instance_queue, read_and_enqueue_ops)\n",
|
||
"\n",
|
||
"with tf.Session() as sess:\n",
|
||
" sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n",
|
||
" sess.run(close_filename_queue)\n",
|
||
" coord = tf.train.Coordinator()\n",
|
||
" enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)\n",
|
||
" try:\n",
|
||
" while True:\n",
|
||
" print(sess.run([minibatch_instances, minibatch_targets]))\n",
|
||
" except tf.errors.OutOfRangeError as ex:\n",
|
||
" print(\"No more training instances\")\n",
|
||
"\n"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"source": [
|
||
"# Setting a timeout"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 14,
|
||
"metadata": {
|
||
"collapsed": false,
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"tf.reset_default_graph()\n",
|
||
"\n",
|
||
"q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[()])\n",
|
||
"v = tf.placeholder(tf.float32)\n",
|
||
"enqueue = q.enqueue([v])\n",
|
||
"dequeue = q.dequeue()\n",
|
||
"output = dequeue + 1\n",
|
||
"\n",
|
||
"config = tf.ConfigProto()\n",
|
||
"config.operation_timeout_in_ms = 1000\n",
|
||
"\n",
|
||
"with tf.Session(config=config) as sess:\n",
|
||
" sess.run(enqueue, feed_dict={v: 1.0})\n",
|
||
" sess.run(enqueue, feed_dict={v: 2.0})\n",
|
||
" sess.run(enqueue, feed_dict={v: 3.0})\n",
|
||
" print(sess.run(output))\n",
|
||
" print(sess.run(output, feed_dict={dequeue: 5}))\n",
|
||
" print(sess.run(output))\n",
|
||
" print(sess.run(output))\n",
|
||
" try:\n",
|
||
" print(sess.run(output))\n",
|
||
" except tf.errors.DeadlineExceededError as ex:\n",
|
||
" print(\"Timed out while dequeuing\")\n"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {
|
||
"collapsed": true,
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"source": [
|
||
"# Exercise solutions"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"source": [
|
||
"**Coming soon**"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {
|
||
"collapsed": true,
|
||
"deletable": true,
|
||
"editable": true
|
||
},
|
||
"outputs": [],
|
||
"source": []
|
||
}
|
||
],
|
||
"metadata": {
|
||
"kernelspec": {
|
||
"display_name": "Python 3",
|
||
"language": "python",
|
||
"name": "python3"
|
||
},
|
||
"language_info": {
|
||
"codemirror_mode": {
|
||
"name": "ipython",
|
||
"version": 3
|
||
},
|
||
"file_extension": ".py",
|
||
"mimetype": "text/x-python",
|
||
"name": "python",
|
||
"nbconvert_exporter": "python",
|
||
"pygments_lexer": "ipython3",
|
||
"version": "3.5.3"
|
||
},
|
||
"nav_menu": {},
|
||
"toc": {
|
||
"navigate_menu": true,
|
||
"number_sections": true,
|
||
"sideBar": true,
|
||
"threshold": 6,
|
||
"toc_cell": false,
|
||
"toc_section_display": "block",
|
||
"toc_window_display": false
|
||
}
|
||
},
|
||
"nbformat": 4,
|
||
"nbformat_minor": 0
|
||
}
|