handson-ml/15_autoencoders.ipynb

1768 lines
57 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Chapter 15 Autoencoders**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"_This notebook contains all the sample code and solutions to the exercises in chapter 15._"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Setup"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First, let's make sure this notebook works well in both python 2 and 3, import a few common modules, ensure MatplotLib plots figures inline and prepare a function to save the figures:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# To support both python 2 and python 3\n",
"from __future__ import division, print_function, unicode_literals\n",
"\n",
"# Common imports\n",
"import numpy as np\n",
"import os\n",
"import sys\n",
"\n",
"# to make this notebook's output stable across runs\n",
"def reset_graph(seed=42):\n",
" tf.reset_default_graph()\n",
" tf.set_random_seed(seed)\n",
" np.random.seed(seed)\n",
"\n",
"# To plot pretty figures\n",
"%matplotlib inline\n",
"import matplotlib\n",
"import matplotlib.pyplot as plt\n",
"plt.rcParams['axes.labelsize'] = 14\n",
"plt.rcParams['xtick.labelsize'] = 12\n",
"plt.rcParams['ytick.labelsize'] = 12\n",
"\n",
"# Where to save the figures\n",
"PROJECT_ROOT_DIR = \".\"\n",
"CHAPTER_ID = \"autoencoders\"\n",
"\n",
"def save_fig(fig_id, tight_layout=True):\n",
" path = os.path.join(PROJECT_ROOT_DIR, \"images\", CHAPTER_ID, fig_id + \".png\")\n",
" print(\"Saving figure\", fig_id)\n",
" if tight_layout:\n",
" plt.tight_layout()\n",
" plt.savefig(path, format='png', dpi=300)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A couple utility functions to plot grayscale 28x28 image:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def plot_image(image, shape=[28, 28]):\n",
" plt.imshow(image.reshape(shape), cmap=\"Greys\", interpolation=\"nearest\")\n",
" plt.axis(\"off\")"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def plot_multiple_images(images, n_rows, n_cols, pad=2):\n",
" images = images - images.min() # make the minimum == 0, so the padding looks white\n",
" w,h = images.shape[1:]\n",
" image = np.zeros(((w+pad)*n_rows+pad, (h+pad)*n_cols+pad))\n",
" for y in range(n_rows):\n",
" for x in range(n_cols):\n",
" image[(y*(h+pad)+pad):(y*(h+pad)+pad+h),(x*(w+pad)+pad):(x*(w+pad)+pad+w)] = images[y*n_cols+x]\n",
" plt.imshow(image, cmap=\"Greys\", interpolation=\"nearest\")\n",
" plt.axis(\"off\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# PCA with a linear Autoencoder"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Build 3D dataset:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"rnd.seed(4)\n",
"m = 200\n",
"w1, w2 = 0.1, 0.3\n",
"noise = 0.1\n",
"\n",
"angles = rnd.rand(m) * 3 * np.pi / 2 - 0.5\n",
"data = np.empty((m, 3))\n",
"data[:, 0] = np.cos(angles) + np.sin(angles)/2 + noise * rnd.randn(m) / 2\n",
"data[:, 1] = np.sin(angles) * 0.7 + noise * rnd.randn(m) / 2\n",
"data[:, 2] = data[:, 0] * w1 + data[:, 1] * w2 + noise * rnd.randn(m)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Normalize the data:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.preprocessing import StandardScaler\n",
"scaler = StandardScaler()\n",
"X_train = scaler.fit_transform(data[:100])\n",
"X_test = scaler.transform(data[100:])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's build the Autoencoder..."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note: instead of using the `fully_connected()` function from the `tensorflow.contrib.layers` module (as in the book), we now use the `dense()` function from the `tf.layers` module, which did not exist when this chapter was written. This is preferable because anything in contrib may change or be deleted without notice, while `tf.layers` is part of the official API. As you will see, the code is mostly the same.\n",
"\n",
"The main differences relevant to this chapter are:\n",
"* the `scope` parameter was renamed to `name`, and the `_fn` suffix was removed in all the parameters that had it (for example the `activation_fn` parameter was renamed to `activation`).\n",
"* the `weights` parameter was renamed to `kernel` and the weights variable is now named `\"kernel\"` rather than `\"weights\"`,\n",
"* the bias variable is now named `\"bias\"` rather than `\"biases\"`,\n",
"* the default activation is `None` instead of `tf.nn.relu`"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"import tensorflow as tf\n",
"\n",
"reset_graph()\n",
"\n",
"n_inputs = 3\n",
"n_hidden = 2 # codings\n",
"n_outputs = n_inputs\n",
"\n",
"learning_rate = 0.01\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
"hidden = tf.layers.dense(X, n_hidden)\n",
"outputs = tf.layers.dense(hidden, n_outputs)\n",
"\n",
"reconstruction_loss = tf.reduce_mean(tf.square(outputs - X))\n",
"\n",
"optimizer = tf.train.AdamOptimizer(learning_rate)\n",
"training_op = optimizer.minimize(reconstruction_loss)\n",
"\n",
"init = tf.global_variables_initializer()"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"n_iterations = 1000\n",
"codings = hidden\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for iteration in range(n_iterations):\n",
" training_op.run(feed_dict={X: X_train})\n",
" codings_val = codings.eval(feed_dict={X: X_test})"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"fig = plt.figure(figsize=(4,3))\n",
"plt.plot(codings_val[:,0], codings_val[:, 1], \"b.\")\n",
"plt.xlabel(\"$z_1$\", fontsize=18)\n",
"plt.ylabel(\"$z_2$\", fontsize=18, rotation=0)\n",
"save_fig(\"linear_autoencoder_pca_plot\")\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Stacked Autoencoders"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's use MNIST:"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"from tensorflow.examples.tutorials.mnist import input_data\n",
"mnist = input_data.read_data_sets(\"/tmp/data/\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train all layers at once"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's build a stacked Autoencoder with 3 hidden layers and 1 output layer (ie. 2 stacked Autoencoders). We will use ELU activation, He initialization and L2 regularization."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note: since the `tf.layers.dense()` function is incompatible with `tf.contrib.layers.arg_scope()` (which is used in the book), we now use python's `functools.partial()` function instead. It makes it easy to create a `my_dense_layer()` function that just calls `tf.layers.dense()` with the desired parameters automatically set (unless they are overridden when calling `my_dense_layer()`)."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"reset_graph()\n",
"\n",
"from functools import partial\n",
"\n",
"n_inputs = 28 * 28\n",
"n_hidden1 = 300\n",
"n_hidden2 = 150 # codings\n",
"n_hidden3 = n_hidden1\n",
"n_outputs = n_inputs\n",
"\n",
"learning_rate = 0.01\n",
"l2_reg = 0.0001\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
"\n",
"he_init = tf.contrib.layers.variance_scaling_initializer() # He initialization\n",
"#Equivalent to:\n",
"#he_init = lambda shape, dtype=tf.float32: tf.truncated_normal(shape, 0., stddev=np.sqrt(2/shape[0]))\n",
"l2_regularizer = tf.contrib.layers.l2_regularizer(l2_reg)\n",
"my_dense_layer = partial(tf.layers.dense,\n",
" activation=tf.nn.elu,\n",
" kernel_initializer=he_init,\n",
" kernel_regularizer=l2_regularizer)\n",
"\n",
"hidden1 = my_dense_layer(X, n_hidden1)\n",
"hidden2 = my_dense_layer(hidden1, n_hidden2)\n",
"hidden3 = my_dense_layer(hidden2, n_hidden3)\n",
"outputs = my_dense_layer(hidden3, n_outputs, activation=None)\n",
"\n",
"reconstruction_loss = tf.reduce_mean(tf.square(outputs - X))\n",
"\n",
"reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)\n",
"loss = tf.add_n([reconstruction_loss] + reg_losses)\n",
"\n",
"optimizer = tf.train.AdamOptimizer(learning_rate)\n",
"training_op = optimizer.minimize(loss)\n",
"\n",
"init = tf.global_variables_initializer()\n",
"saver = tf.train.Saver() # not shown in the book"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's train it! Note that we don't feed target values (`y_batch` is not used). This is unsupervised training."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"n_epochs = 5\n",
"batch_size = 150\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for epoch in range(n_epochs):\n",
" n_batches = mnist.train.num_examples // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\") # not shown in the book\n",
" sys.stdout.flush() # not shown\n",
" X_batch, y_batch = mnist.train.next_batch(batch_size)\n",
" sess.run(training_op, feed_dict={X: X_batch})\n",
" loss_train = reconstruction_loss.eval(feed_dict={X: X_batch}) # not shown\n",
" print(\"\\r{}\".format(epoch), \"Train MSE:\", loss_train) # not shown\n",
" saver.save(sess, \"./my_model_all_layers.ckpt\") # not shown"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This function loads the model, evaluates it on the test set (it measures the reconstruction error), then it displays the original image and its reconstruction:"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"def show_reconstructed_digits(X, outputs, model_path = None, n_test_digits = 2):\n",
" with tf.Session() as sess:\n",
" if model_path:\n",
" saver.restore(sess, model_path)\n",
" X_test = mnist.test.images[:n_test_digits]\n",
" outputs_val = outputs.eval(feed_dict={X: X_test})\n",
"\n",
" fig = plt.figure(figsize=(8, 3 * n_test_digits))\n",
" for digit_index in range(n_test_digits):\n",
" plt.subplot(n_test_digits, 2, digit_index * 2 + 1)\n",
" plot_image(X_test[digit_index])\n",
" plt.subplot(n_test_digits, 2, digit_index * 2 + 2)\n",
" plot_image(outputs_val[digit_index])"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"show_reconstructed_digits(X, outputs, \"./my_model_all_layers.ckpt\")\n",
"save_fig(\"reconstruction_plot\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Tying weights"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"It is common to tie the weights of the encoder and the decoder (`weights_decoder = tf.transpose(weights_encoder)`). Unfortunately this makes it impossible (or very tricky) to use the `tf.layers.dense()` function, so we need to build the Autoencoder manually:"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"reset_graph()\n",
"\n",
"n_inputs = 28 * 28\n",
"n_hidden1 = 300\n",
"n_hidden2 = 150 # codings\n",
"n_hidden3 = n_hidden1\n",
"n_outputs = n_inputs\n",
"\n",
"learning_rate = 0.01\n",
"l2_reg = 0.0005"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"activation = tf.nn.elu\n",
"regularizer = tf.contrib.layers.l2_regularizer(l2_reg)\n",
"initializer = tf.contrib.layers.variance_scaling_initializer()\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
"\n",
"weights1_init = initializer([n_inputs, n_hidden1])\n",
"weights2_init = initializer([n_hidden1, n_hidden2])\n",
"\n",
"weights1 = tf.Variable(weights1_init, dtype=tf.float32, name=\"weights1\")\n",
"weights2 = tf.Variable(weights2_init, dtype=tf.float32, name=\"weights2\")\n",
"weights3 = tf.transpose(weights2, name=\"weights3\") # tied weights\n",
"weights4 = tf.transpose(weights1, name=\"weights4\") # tied weights\n",
"\n",
"biases1 = tf.Variable(tf.zeros(n_hidden1), name=\"biases1\")\n",
"biases2 = tf.Variable(tf.zeros(n_hidden2), name=\"biases2\")\n",
"biases3 = tf.Variable(tf.zeros(n_hidden3), name=\"biases3\")\n",
"biases4 = tf.Variable(tf.zeros(n_outputs), name=\"biases4\")\n",
"\n",
"hidden1 = activation(tf.matmul(X, weights1) + biases1)\n",
"hidden2 = activation(tf.matmul(hidden1, weights2) + biases2)\n",
"hidden3 = activation(tf.matmul(hidden2, weights3) + biases3)\n",
"outputs = tf.matmul(hidden3, weights4) + biases4\n",
"\n",
"reconstruction_loss = tf.reduce_mean(tf.square(outputs - X))\n",
"reg_loss = regularizer(weights1) + regularizer(weights2)\n",
"loss = reconstruction_loss + reg_loss\n",
"\n",
"optimizer = tf.train.AdamOptimizer(learning_rate)\n",
"training_op = optimizer.minimize(loss)\n",
"\n",
"init = tf.global_variables_initializer()"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"saver = tf.train.Saver()"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"n_epochs = 5\n",
"batch_size = 150\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for epoch in range(n_epochs):\n",
" n_batches = mnist.train.num_examples // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" X_batch, y_batch = mnist.train.next_batch(batch_size)\n",
" sess.run(training_op, feed_dict={X: X_batch})\n",
" loss_train = reconstruction_loss.eval(feed_dict={X: X_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train MSE:\", loss_train)\n",
" saver.save(sess, \"./my_model_tying_weights.ckpt\")"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"show_reconstructed_digits(X, outputs, \"./my_model_tying_weights.ckpt\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Training one Autoencoder at a time in multiple graphs"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"There are many ways to train one Autoencoder at a time. The first approach it to train each Autoencoder using a different graph, then we create the Stacked Autoencoder by simply initializing it with the weights and biases copied from these Autoencoders."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's create a function that will train one autoencoder and return the transformed training set (i.e., the output of the hidden layer) and the model parameters."
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"reset_graph()\n",
"\n",
"from functools import partial\n",
"\n",
"def train_autoencoder(X_train, n_neurons, n_epochs, batch_size,\n",
" learning_rate = 0.01, l2_reg = 0.0005,\n",
" activation=tf.nn.elu, seed=42):\n",
" graph = tf.Graph()\n",
" with graph.as_default():\n",
" tf.set_random_seed(seed)\n",
"\n",
" n_inputs = X_train.shape[1]\n",
"\n",
" X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
" \n",
" my_dense_layer = partial(\n",
" tf.layers.dense,\n",
" activation=activation,\n",
" kernel_initializer=tf.contrib.layers.variance_scaling_initializer(),\n",
" kernel_regularizer=tf.contrib.layers.l2_regularizer(l2_reg))\n",
"\n",
" hidden = my_dense_layer(X, n_neurons, name=\"hidden\")\n",
" outputs = my_dense_layer(hidden, n_inputs, activation=None, name=\"outputs\")\n",
"\n",
" reconstruction_loss = tf.reduce_mean(tf.square(outputs - X))\n",
"\n",
" reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)\n",
" loss = tf.add_n([reconstruction_loss] + reg_losses)\n",
"\n",
" optimizer = tf.train.AdamOptimizer(learning_rate)\n",
" training_op = optimizer.minimize(loss)\n",
"\n",
" init = tf.global_variables_initializer()\n",
"\n",
" with tf.Session(graph=graph) as sess:\n",
" init.run()\n",
" for epoch in range(n_epochs):\n",
" n_batches = len(X_train) // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" indices = rnd.permutation(len(X_train))[:batch_size]\n",
" X_batch = X_train[indices]\n",
" sess.run(training_op, feed_dict={X: X_batch})\n",
" loss_train = reconstruction_loss.eval(feed_dict={X: X_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train MSE:\", loss_train)\n",
" params = dict([(var.name, var.eval()) for var in tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES)])\n",
" hidden_val = hidden.eval(feed_dict={X: X_train})\n",
" return hidden_val, params[\"hidden/kernel:0\"], params[\"hidden/bias:0\"], params[\"outputs/kernel:0\"], params[\"outputs/bias:0\"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's train two Autoencoders. The first one is trained on the training data, and the second is trained on the previous Autoencoder's hidden layer output:"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [],
"source": [
"hidden_output, W1, b1, W4, b4 = train_autoencoder(mnist.train.images, n_neurons=300, n_epochs=4, batch_size=150)\n",
"_, W2, b2, W3, b3 = train_autoencoder(hidden_output, n_neurons=150, n_epochs=4, batch_size=150)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, we can create a Stacked Autoencoder by simply reusing the weights and biases from the Autoencoders we just trained:"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"reset_graph()\n",
"\n",
"n_inputs = 28*28\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
"hidden1 = tf.nn.elu(tf.matmul(X, W1) + b1)\n",
"hidden2 = tf.nn.elu(tf.matmul(hidden1, W2) + b2)\n",
"hidden3 = tf.nn.elu(tf.matmul(hidden2, W3) + b3)\n",
"outputs = tf.matmul(hidden3, W4) + b4"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"show_reconstructed_digits(X, outputs)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Training one Autoencoder at a time in a single graph"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Another approach is to use a single graph. To do this, we create the graph for the full Stacked Autoencoder, but then we also add operations to train each Autoencoder independently: phase 1 trains the bottom and top layer (ie. the first Autoencoder) and phase 2 trains the two middle layers (ie. the second Autoencoder)."
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"reset_graph()\n",
"\n",
"n_inputs = 28 * 28\n",
"n_hidden1 = 300\n",
"n_hidden2 = 150 # codings\n",
"n_hidden3 = n_hidden1\n",
"n_outputs = n_inputs\n",
"\n",
"learning_rate = 0.01\n",
"l2_reg = 0.0001\n",
"\n",
"activation = tf.nn.elu\n",
"regularizer = tf.contrib.layers.l2_regularizer(l2_reg)\n",
"initializer = tf.contrib.layers.variance_scaling_initializer()\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
"\n",
"weights1_init = initializer([n_inputs, n_hidden1])\n",
"weights2_init = initializer([n_hidden1, n_hidden2])\n",
"weights3_init = initializer([n_hidden2, n_hidden3])\n",
"weights4_init = initializer([n_hidden3, n_outputs])\n",
"\n",
"weights1 = tf.Variable(weights1_init, dtype=tf.float32, name=\"weights1\")\n",
"weights2 = tf.Variable(weights2_init, dtype=tf.float32, name=\"weights2\")\n",
"weights3 = tf.Variable(weights3_init, dtype=tf.float32, name=\"weights3\")\n",
"weights4 = tf.Variable(weights4_init, dtype=tf.float32, name=\"weights4\")\n",
"\n",
"biases1 = tf.Variable(tf.zeros(n_hidden1), name=\"biases1\")\n",
"biases2 = tf.Variable(tf.zeros(n_hidden2), name=\"biases2\")\n",
"biases3 = tf.Variable(tf.zeros(n_hidden3), name=\"biases3\")\n",
"biases4 = tf.Variable(tf.zeros(n_outputs), name=\"biases4\")\n",
"\n",
"hidden1 = activation(tf.matmul(X, weights1) + biases1)\n",
"hidden2 = activation(tf.matmul(hidden1, weights2) + biases2)\n",
"hidden3 = activation(tf.matmul(hidden2, weights3) + biases3)\n",
"outputs = tf.matmul(hidden3, weights4) + biases4\n",
"\n",
"reconstruction_loss = tf.reduce_mean(tf.square(outputs - X))"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [],
"source": [
"optimizer = tf.train.AdamOptimizer(learning_rate)\n",
"\n",
"with tf.name_scope(\"phase1\"):\n",
" phase1_outputs = tf.matmul(hidden1, weights4) + biases4 # bypass hidden2 and hidden3\n",
" phase1_reconstruction_loss = tf.reduce_mean(tf.square(phase1_outputs - X))\n",
" phase1_reg_loss = regularizer(weights1) + regularizer(weights4)\n",
" phase1_loss = phase1_reconstruction_loss + phase1_reg_loss\n",
" phase1_training_op = optimizer.minimize(phase1_loss)\n",
"\n",
"with tf.name_scope(\"phase2\"):\n",
" phase2_reconstruction_loss = tf.reduce_mean(tf.square(hidden3 - hidden1))\n",
" phase2_reg_loss = regularizer(weights2) + regularizer(weights3)\n",
" phase2_loss = phase2_reconstruction_loss + phase2_reg_loss\n",
" train_vars = [weights2, biases2, weights3, biases3]\n",
" phase2_training_op = optimizer.minimize(phase2_loss, var_list=train_vars) # freeze hidden1"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"init = tf.global_variables_initializer()\n",
"saver = tf.train.Saver()"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [],
"source": [
"training_ops = [phase1_training_op, phase2_training_op]\n",
"reconstruction_losses = [phase1_reconstruction_loss, phase2_reconstruction_loss]\n",
"n_epochs = [4, 4]\n",
"batch_sizes = [150, 150]\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for phase in range(2):\n",
" print(\"Training phase #{}\".format(phase + 1))\n",
" for epoch in range(n_epochs[phase]):\n",
" n_batches = mnist.train.num_examples // batch_sizes[phase]\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" X_batch, y_batch = mnist.train.next_batch(batch_sizes[phase])\n",
" sess.run(training_ops[phase], feed_dict={X: X_batch})\n",
" loss_train = reconstruction_losses[phase].eval(feed_dict={X: X_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train MSE:\", loss_train)\n",
" saver.save(sess, \"./my_model_one_at_a_time.ckpt\")\n",
" loss_test = reconstruction_loss.eval(feed_dict={X: mnist.test.images})\n",
" print(\"Test MSE:\", loss_test)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cache the frozen layer outputs"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"training_ops = [phase1_training_op, phase2_training_op]\n",
"reconstruction_losses = [phase1_reconstruction_loss, phase2_reconstruction_loss]\n",
"n_epochs = [4, 4]\n",
"batch_sizes = [150, 150]\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for phase in range(2):\n",
" print(\"Training phase #{}\".format(phase + 1))\n",
" if phase == 1:\n",
" hidden1_cache = hidden1.eval(feed_dict={X: mnist.train.images})\n",
" for epoch in range(n_epochs[phase]):\n",
" n_batches = mnist.train.num_examples // batch_sizes[phase]\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" if phase == 1:\n",
" indices = rnd.permutation(mnist.train.num_examples)\n",
" hidden1_batch = hidden1_cache[indices[:batch_sizes[phase]]]\n",
" feed_dict = {hidden1: hidden1_batch}\n",
" sess.run(training_ops[phase], feed_dict=feed_dict)\n",
" else:\n",
" X_batch, y_batch = mnist.train.next_batch(batch_sizes[phase])\n",
" feed_dict = {X: X_batch}\n",
" sess.run(training_ops[phase], feed_dict=feed_dict)\n",
" loss_train = reconstruction_losses[phase].eval(feed_dict=feed_dict)\n",
" print(\"\\r{}\".format(epoch), \"Train MSE:\", loss_train)\n",
" saver.save(sess, \"./my_model_cache_frozen.ckpt\")\n",
" loss_test = reconstruction_loss.eval(feed_dict={X: mnist.test.images})\n",
" print(\"Test MSE:\", loss_test)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Visualizing the Reconstructions"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [],
"source": [
"n_test_digits = 2\n",
"X_test = mnist.test.images[:n_test_digits]\n",
"\n",
"with tf.Session() as sess:\n",
" saver.restore(sess, \"./my_model_one_at_a_time.ckpt\") # not shown in the book\n",
" outputs_val = outputs.eval(feed_dict={X: X_test})\n",
"\n",
"def plot_image(image, shape=[28, 28]):\n",
" plt.imshow(image.reshape(shape), cmap=\"Greys\", interpolation=\"nearest\")\n",
" plt.axis(\"off\")\n",
"\n",
"for digit_index in range(n_test_digits):\n",
" plt.subplot(n_test_digits, 2, digit_index * 2 + 1)\n",
" plot_image(X_test[digit_index])\n",
" plt.subplot(n_test_digits, 2, digit_index * 2 + 2)\n",
" plot_image(outputs_val[digit_index])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Visualizing the extracted features"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [],
"source": [
"with tf.Session() as sess:\n",
" saver.restore(sess, \"./my_model_one_at_a_time.ckpt\") # not shown in the book\n",
" weights1_val = weights1.eval()\n",
"\n",
"for i in range(5):\n",
" plt.subplot(1, 5, i + 1)\n",
" plot_image(weights1_val.T[i])\n",
"\n",
"save_fig(\"extracted_features_plot\") # not shown\n",
"plt.show() # not shown"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Unsupervised pretraining"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's create a small neural network for MNIST classification:"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [],
"source": [
"reset_graph()\n",
"\n",
"n_inputs = 28 * 28\n",
"n_hidden1 = 300\n",
"n_hidden2 = 150\n",
"n_outputs = 10\n",
"\n",
"learning_rate = 0.01\n",
"l2_reg = 0.0005\n",
"\n",
"activation = tf.nn.elu\n",
"regularizer = tf.contrib.layers.l2_regularizer(l2_reg)\n",
"initializer = tf.contrib.layers.variance_scaling_initializer()\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
"y = tf.placeholder(tf.int32, shape=[None])\n",
"\n",
"weights1_init = initializer([n_inputs, n_hidden1])\n",
"weights2_init = initializer([n_hidden1, n_hidden2])\n",
"weights3_init = initializer([n_hidden2, n_outputs])\n",
"\n",
"weights1 = tf.Variable(weights1_init, dtype=tf.float32, name=\"weights1\")\n",
"weights2 = tf.Variable(weights2_init, dtype=tf.float32, name=\"weights2\")\n",
"weights3 = tf.Variable(weights3_init, dtype=tf.float32, name=\"weights3\")\n",
"\n",
"biases1 = tf.Variable(tf.zeros(n_hidden1), name=\"biases1\")\n",
"biases2 = tf.Variable(tf.zeros(n_hidden2), name=\"biases2\")\n",
"biases3 = tf.Variable(tf.zeros(n_outputs), name=\"biases3\")\n",
"\n",
"hidden1 = activation(tf.matmul(X, weights1) + biases1)\n",
"hidden2 = activation(tf.matmul(hidden1, weights2) + biases2)\n",
"logits = tf.matmul(hidden2, weights3) + biases3\n",
"\n",
"cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=logits)\n",
"reg_loss = regularizer(weights1) + regularizer(weights2) + regularizer(weights3)\n",
"loss = cross_entropy + reg_loss\n",
"optimizer = tf.train.AdamOptimizer(learning_rate)\n",
"training_op = optimizer.minimize(loss)\n",
"\n",
"correct = tf.nn.in_top_k(logits, y, 1)\n",
"accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))\n",
"\n",
"init = tf.global_variables_initializer()\n",
"pretrain_saver = tf.train.Saver([weights1, weights2, biases1, biases2])\n",
"saver = tf.train.Saver()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Regular training (without pretraining):"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [],
"source": [
"n_epochs = 4\n",
"batch_size = 150\n",
"n_labeled_instances = 20000\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for epoch in range(n_epochs):\n",
" n_batches = n_labeled_instances // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" indices = rnd.permutation(n_labeled_instances)[:batch_size]\n",
" X_batch, y_batch = mnist.train.images[indices], mnist.train.labels[indices]\n",
" sess.run(training_op, feed_dict={X: X_batch, y: y_batch})\n",
" accuracy_val = accuracy.eval(feed_dict={X: X_batch, y: y_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train accuracy:\", accuracy_val, end=\" \")\n",
" saver.save(sess, \"./my_model_supervised.ckpt\")\n",
" accuracy_val = accuracy.eval(feed_dict={X: mnist.test.images, y: mnist.test.labels})\n",
" print(\"Test accuracy:\", accuracy_val)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now reusing the first two layers of the autoencoder we pretrained:"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [],
"source": [
"n_epochs = 4\n",
"batch_size = 150\n",
"n_labeled_instances = 20000\n",
"\n",
"#training_op = optimizer.minimize(loss, var_list=[weights3, biases3]) # Freeze layers 1 and 2 (optional)\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" pretrain_saver.restore(sess, \"./my_model_cache_frozen.ckpt\")\n",
" for epoch in range(n_epochs):\n",
" n_batches = n_labeled_instances // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" indices = rnd.permutation(n_labeled_instances)[:batch_size]\n",
" X_batch, y_batch = mnist.train.images[indices], mnist.train.labels[indices]\n",
" sess.run(training_op, feed_dict={X: X_batch, y: y_batch})\n",
" accuracy_val = accuracy.eval(feed_dict={X: X_batch, y: y_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train accuracy:\", accuracy_val, end=\"\\t\")\n",
" saver.save(sess, \"./my_model_supervised_pretrained.ckpt\")\n",
" accuracy_val = accuracy.eval(feed_dict={X: mnist.test.images, y: mnist.test.labels})\n",
" print(\"Test accuracy:\", accuracy_val)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Stacked denoising Autoencoder"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note: the book uses `tf.contrib.layers.dropout()` rather than `tf.layers.dropout()` (which did not exist when this chapter was written). It is now preferable to use `tf.layers.dropout()`, because anything in the contrib module may change or be deleted without notice. The `tf.layers.dropout()` function is almost identical to the `tf.contrib.layers.dropout()` function, except for a few minor differences. Most importantly:\n",
"* you must specify the dropout rate (`rate`) rather than the keep probability (`keep_prob`), where `rate` is simply equal to `1 - keep_prob`,\n",
"* the `is_training` parameter is renamed to `training`."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Using Gaussian noise:"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"reset_graph()\n",
"\n",
"n_inputs = 28 * 28\n",
"n_hidden1 = 300\n",
"n_hidden2 = 150 # codings\n",
"n_hidden3 = n_hidden1\n",
"n_outputs = n_inputs\n",
"\n",
"learning_rate = 0.01"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [],
"source": [
"noise_level = 1.0\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
"X_noisy = X + noise_level * tf.random_normal(tf.shape(X))\n",
"\n",
"hidden1 = tf.layers.dense(X_noisy, n_hidden1, activation=tf.nn.relu,\n",
" name=\"hidden1\")\n",
"hidden2 = tf.layers.dense(hidden1, n_hidden2, activation=tf.nn.relu, # not shown in the book\n",
" name=\"hidden2\") # not shown\n",
"hidden3 = tf.layers.dense(hidden2, n_hidden3, activation=tf.nn.relu, # not shown\n",
" name=\"hidden3\") # not shown\n",
"outputs = tf.layers.dense(hidden3, n_outputs, name=\"outputs\") # not shown\n",
"\n",
"reconstruction_loss = tf.reduce_mean(tf.square(outputs - X)) # MSE"
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {},
"outputs": [],
"source": [
"optimizer = tf.train.AdamOptimizer(learning_rate)\n",
"training_op = optimizer.minimize(reconstruction_loss)\n",
" \n",
"init = tf.global_variables_initializer()\n",
"saver = tf.train.Saver()"
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [],
"source": [
"n_epochs = 10\n",
"batch_size = 150\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for epoch in range(n_epochs):\n",
" n_batches = mnist.train.num_examples // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" X_batch, y_batch = mnist.train.next_batch(batch_size)\n",
" sess.run(training_op, feed_dict={X: X_batch})\n",
" loss_train = reconstruction_loss.eval(feed_dict={X: X_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train MSE:\", loss_train)\n",
" saver.save(sess, \"./my_model_stacked_denoising_gaussian.ckpt\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Using dropout:"
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"reset_graph()\n",
"\n",
"n_inputs = 28 * 28\n",
"n_hidden1 = 300\n",
"n_hidden2 = 150 # codings\n",
"n_hidden3 = n_hidden1\n",
"n_outputs = n_inputs\n",
"\n",
"learning_rate = 0.01"
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {},
"outputs": [],
"source": [
"dropout_rate = 0.3\n",
"\n",
"training = tf.placeholder_with_default(False, shape=(), name='training')\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
"X_drop = tf.layers.dropout(X, dropout_rate, training=training)\n",
"\n",
"hidden1 = tf.layers.dense(X_drop, n_hidden1, activation=tf.nn.relu,\n",
" name=\"hidden1\")\n",
"hidden2 = tf.layers.dense(hidden1, n_hidden2, activation=tf.nn.relu, # not shown in the book\n",
" name=\"hidden2\") # not shown\n",
"hidden3 = tf.layers.dense(hidden2, n_hidden3, activation=tf.nn.relu, # not shown\n",
" name=\"hidden3\") # not shown\n",
"outputs = tf.layers.dense(hidden3, n_outputs, name=\"outputs\") # not shown\n",
"\n",
"reconstruction_loss = tf.reduce_mean(tf.square(outputs - X)) # MSE"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [],
"source": [
"optimizer = tf.train.AdamOptimizer(learning_rate)\n",
"training_op = optimizer.minimize(reconstruction_loss)\n",
" \n",
"init = tf.global_variables_initializer()\n",
"saver = tf.train.Saver()"
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [],
"source": [
"n_epochs = 10\n",
"batch_size = 150\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for epoch in range(n_epochs):\n",
" n_batches = mnist.train.num_examples // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" X_batch, y_batch = mnist.train.next_batch(batch_size)\n",
" sess.run(training_op, feed_dict={X: X_batch, training: True})\n",
" loss_train = reconstruction_loss.eval(feed_dict={X: X_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train MSE:\", loss_train)\n",
" saver.save(sess, \"./my_model_stacked_denoising_dropout.ckpt\")"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {},
"outputs": [],
"source": [
"show_reconstructed_digits(X, outputs, \"./my_model_stacked_denoising_dropout.ckpt\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Sparse Autoencoder"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [],
"source": [
"p = 0.1\n",
"q = np.linspace(0.001, 0.999, 500)\n",
"kl_div = p * np.log(p / q) + (1 - p) * np.log((1 - p) / (1 - q))\n",
"mse = (p - q)**2\n",
"plt.plot([p, p], [0, 0.3], \"k:\")\n",
"plt.text(0.05, 0.32, \"Target\\nsparsity\", fontsize=14)\n",
"plt.plot(q, kl_div, \"b-\", label=\"KL divergence\")\n",
"plt.plot(q, mse, \"r--\", label=\"MSE\")\n",
"plt.legend(loc=\"upper left\")\n",
"plt.xlabel(\"Actual sparsity\")\n",
"plt.ylabel(\"Cost\", rotation=0)\n",
"plt.axis([0, 1, 0, 0.95])\n",
"save_fig(\"sparsity_loss_plot\")"
]
},
{
"cell_type": "code",
"execution_count": 43,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"reset_graph()\n",
"\n",
"n_inputs = 28 * 28\n",
"n_hidden1 = 1000 # sparse codings\n",
"n_outputs = n_inputs"
]
},
{
"cell_type": "code",
"execution_count": 44,
"metadata": {},
"outputs": [],
"source": [
"def kl_divergence(p, q):\n",
" # Kullback Leibler divergence\n",
" return p * tf.log(p / q) + (1 - p) * tf.log((1 - p) / (1 - q))\n",
"\n",
"learning_rate = 0.01\n",
"sparsity_target = 0.1\n",
"sparsity_weight = 0.2\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs]) # not shown in the book\n",
"\n",
"hidden1 = tf.layers.dense(X, n_hidden1, activation=tf.nn.sigmoid) # not shown\n",
"outputs = tf.layers.dense(hidden1, n_outputs) # not shown\n",
"\n",
"hidden1_mean = tf.reduce_mean(hidden1, axis=0) # batch mean\n",
"sparsity_loss = tf.reduce_sum(kl_divergence(sparsity_target, hidden1_mean))\n",
"reconstruction_loss = tf.reduce_mean(tf.square(outputs - X)) # MSE\n",
"loss = reconstruction_loss + sparsity_weight * sparsity_loss\n",
"\n",
"optimizer = tf.train.AdamOptimizer(learning_rate)\n",
"training_op = optimizer.minimize(loss)"
]
},
{
"cell_type": "code",
"execution_count": 45,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"init = tf.global_variables_initializer()\n",
"saver = tf.train.Saver()"
]
},
{
"cell_type": "code",
"execution_count": 46,
"metadata": {},
"outputs": [],
"source": [
"n_epochs = 100\n",
"batch_size = 1000\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for epoch in range(n_epochs):\n",
" n_batches = mnist.train.num_examples // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" X_batch, y_batch = mnist.train.next_batch(batch_size)\n",
" sess.run(training_op, feed_dict={X: X_batch})\n",
" reconstruction_loss_val, sparsity_loss_val, loss_val = sess.run([reconstruction_loss, sparsity_loss, loss], feed_dict={X: X_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train MSE:\", reconstruction_loss_val, \"\\tSparsity loss:\", sparsity_loss_val, \"\\tTotal loss:\", loss_val)\n",
" saver.save(sess, \"./my_model_sparse.ckpt\")"
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {},
"outputs": [],
"source": [
"show_reconstructed_digits(X, outputs, \"./my_model_sparse.ckpt\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that the coding layer must output values from 0 to 1, which is why we use the sigmoid activation function:"
]
},
{
"cell_type": "code",
"execution_count": 48,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"hidden1 = tf.layers.dense(X, n_hidden1, activation=tf.nn.sigmoid)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To speed up training, you can normalize the inputs between 0 and 1, and use the cross entropy instead of the MSE for the cost function:"
]
},
{
"cell_type": "code",
"execution_count": 49,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"logits = tf.layers.dense(hidden1, n_outputs)\n",
"outputs = tf.nn.sigmoid(logits)\n",
"\n",
"xentropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=X, logits=logits)\n",
"reconstruction_loss = tf.reduce_mean(xentropy)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Variational Autoencoder"
]
},
{
"cell_type": "code",
"execution_count": 50,
"metadata": {},
"outputs": [],
"source": [
"reset_graph()\n",
"\n",
"from functools import partial\n",
"\n",
"n_inputs = 28 * 28\n",
"n_hidden1 = 500\n",
"n_hidden2 = 500\n",
"n_hidden3 = 20 # codings\n",
"n_hidden4 = n_hidden2\n",
"n_hidden5 = n_hidden1\n",
"n_outputs = n_inputs\n",
"learning_rate = 0.001\n",
"\n",
"initializer = tf.contrib.layers.variance_scaling_initializer()\n",
"\n",
"my_dense_layer = partial(\n",
" tf.layers.dense,\n",
" activation=tf.nn.elu,\n",
" kernel_initializer=initializer)\n",
"\n",
"X = tf.placeholder(tf.float32, [None, n_inputs])\n",
"hidden1 = my_dense_layer(X, n_hidden1)\n",
"hidden2 = my_dense_layer(hidden1, n_hidden2)\n",
"hidden3_mean = my_dense_layer(hidden2, n_hidden3, activation=None)\n",
"hidden3_sigma = my_dense_layer(hidden2, n_hidden3, activation=None)\n",
"noise = tf.random_normal(tf.shape(hidden3_sigma), dtype=tf.float32)\n",
"hidden3 = hidden3_mean + hidden3_sigma * noise\n",
"hidden4 = my_dense_layer(hidden3, n_hidden4)\n",
"hidden5 = my_dense_layer(hidden4, n_hidden5)\n",
"logits = my_dense_layer(hidden5, n_outputs, activation=None)\n",
"outputs = tf.sigmoid(logits)\n",
"\n",
"xentropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=X, logits=logits)\n",
"reconstruction_loss = tf.reduce_sum(xentropy)"
]
},
{
"cell_type": "code",
"execution_count": 51,
"metadata": {},
"outputs": [],
"source": [
"eps = 1e-10 # smoothing term to avoid computing log(0) which is NaN\n",
"latent_loss = 0.5 * tf.reduce_sum(\n",
" tf.square(hidden3_sigma) + tf.square(hidden3_mean)\n",
" - 1 - tf.log(eps + tf.square(hidden3_sigma)))"
]
},
{
"cell_type": "code",
"execution_count": 52,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"loss = reconstruction_loss + latent_loss\n",
"\n",
"optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)\n",
"training_op = optimizer.minimize(loss)\n",
"\n",
"init = tf.global_variables_initializer()\n",
"saver = tf.train.Saver()"
]
},
{
"cell_type": "code",
"execution_count": 53,
"metadata": {},
"outputs": [],
"source": [
"n_epochs = 50\n",
"batch_size = 150\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for epoch in range(n_epochs):\n",
" n_batches = mnist.train.num_examples // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" X_batch, y_batch = mnist.train.next_batch(batch_size)\n",
" sess.run(training_op, feed_dict={X: X_batch})\n",
" loss_val, reconstruction_loss_val, latent_loss_val = sess.run([loss, reconstruction_loss, latent_loss], feed_dict={X: X_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train total loss:\", loss_val, \"\\tReconstruction loss:\", reconstruction_loss_val, \"\\tLatent loss:\", latent_loss_val)\n",
" saver.save(sess, \"./my_model_variational.ckpt\")"
]
},
{
"cell_type": "code",
"execution_count": 54,
"metadata": {},
"outputs": [],
"source": [
"reset_graph()\n",
"\n",
"from functools import partial\n",
"\n",
"n_inputs = 28 * 28\n",
"n_hidden1 = 500\n",
"n_hidden2 = 500\n",
"n_hidden3 = 20 # codings\n",
"n_hidden4 = n_hidden2\n",
"n_hidden5 = n_hidden1\n",
"n_outputs = n_inputs\n",
"learning_rate = 0.001\n",
"\n",
"initializer = tf.contrib.layers.variance_scaling_initializer()\n",
"my_dense_layer = partial(\n",
" tf.layers.dense,\n",
" activation=tf.nn.elu,\n",
" kernel_initializer=initializer)\n",
"\n",
"X = tf.placeholder(tf.float32, [None, n_inputs])\n",
"hidden1 = my_dense_layer(X, n_hidden1)\n",
"hidden2 = my_dense_layer(hidden1, n_hidden2)\n",
"hidden3_mean = my_dense_layer(hidden2, n_hidden3, activation=None)\n",
"hidden3_gamma = my_dense_layer(hidden2, n_hidden3, activation=None)\n",
"noise = tf.random_normal(tf.shape(hidden3_gamma), dtype=tf.float32)\n",
"hidden3 = hidden3_mean + tf.exp(0.5 * hidden3_gamma) * noise\n",
"hidden4 = my_dense_layer(hidden3, n_hidden4)\n",
"hidden5 = my_dense_layer(hidden4, n_hidden5)\n",
"logits = my_dense_layer(hidden5, n_outputs, activation=None)\n",
"outputs = tf.sigmoid(logits)\n",
"\n",
"xentropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=X, logits=logits)\n",
"reconstruction_loss = tf.reduce_sum(xentropy)\n",
"latent_loss = 0.5 * tf.reduce_sum(\n",
" tf.exp(hidden3_gamma) + tf.square(hidden3_mean) - 1 - hidden3_gamma)\n",
"loss = reconstruction_loss + latent_loss\n",
"\n",
"optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)\n",
"training_op = optimizer.minimize(loss)\n",
"\n",
"init = tf.global_variables_initializer()\n",
"saver = tf.train.Saver()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Generate digits"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's train the model and generate a few random digits:"
]
},
{
"cell_type": "code",
"execution_count": 55,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"\n",
"n_digits = 60\n",
"n_epochs = 50\n",
"batch_size = 150\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for epoch in range(n_epochs):\n",
" n_batches = mnist.train.num_examples // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\") # not shown in the book\n",
" sys.stdout.flush() # not shown\n",
" X_batch, y_batch = mnist.train.next_batch(batch_size)\n",
" sess.run(training_op, feed_dict={X: X_batch})\n",
" loss_val, reconstruction_loss_val, latent_loss_val = sess.run([loss, reconstruction_loss, latent_loss], feed_dict={X: X_batch}) # not shown\n",
" print(\"\\r{}\".format(epoch), \"Train total loss:\", loss_val, \"\\tReconstruction loss:\", reconstruction_loss_val, \"\\tLatent loss:\", latent_loss_val) # not shown\n",
" saver.save(sess, \"./my_model_variational.ckpt\") # not shown\n",
" \n",
" codings_rnd = np.random.normal(size=[n_digits, n_hidden3])\n",
" outputs_val = outputs.eval(feed_dict={hidden3: codings_rnd})"
]
},
{
"cell_type": "code",
"execution_count": 56,
"metadata": {},
"outputs": [],
"source": [
"plt.figure(figsize=(8,50)) # not shown in the book\n",
"for iteration in range(n_digits):\n",
" plt.subplot(n_digits, 10, iteration + 1)\n",
" plot_image(outputs_val[iteration])"
]
},
{
"cell_type": "code",
"execution_count": 57,
"metadata": {},
"outputs": [],
"source": [
"n_rows = 6\n",
"n_cols = 10\n",
"plot_multiple_images(outputs_val.reshape(-1, 28, 28), n_rows, n_cols)\n",
"save_fig(\"generated_digits_plot\")\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that the latent loss is computed differently in this second variant:"
]
},
{
"cell_type": "code",
"execution_count": 58,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"latent_loss = 0.5 * tf.reduce_sum(\n",
" tf.exp(hidden3_gamma) + tf.square(hidden3_mean) - 1 - hidden3_gamma)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Encode & Decode"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Encode:"
]
},
{
"cell_type": "code",
"execution_count": 59,
"metadata": {},
"outputs": [],
"source": [
"n_digits = 3\n",
"X_test, y_test = mnist.test.next_batch(batch_size)\n",
"codings = hidden3\n",
"\n",
"with tf.Session() as sess:\n",
" saver.restore(sess, \"./my_model_variational.ckpt\")\n",
" codings_val = codings.eval(feed_dict={X: X_test})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Decode:"
]
},
{
"cell_type": "code",
"execution_count": 60,
"metadata": {},
"outputs": [],
"source": [
"with tf.Session() as sess:\n",
" saver.restore(sess, \"./my_model_variational.ckpt\")\n",
" outputs_val = outputs.eval(feed_dict={codings: codings_val})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's plot the reconstructions:"
]
},
{
"cell_type": "code",
"execution_count": 61,
"metadata": {},
"outputs": [],
"source": [
"fig = plt.figure(figsize=(8, 2.5 * n_digits))\n",
"for iteration in range(n_digits):\n",
" plt.subplot(n_digits, 2, 1 + 2 * iteration)\n",
" plot_image(X_test[iteration])\n",
" plt.subplot(n_digits, 2, 2 + 2 * iteration)\n",
" plot_image(outputs_val[iteration])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Interpolate digits"
]
},
{
"cell_type": "code",
"execution_count": 62,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"n_iterations = 3\n",
"n_digits = 6\n",
"codings_rnd = np.random.normal(size=[n_digits, n_hidden3])\n",
"\n",
"with tf.Session() as sess:\n",
" saver.restore(sess, \"./my_model_variational.ckpt\")\n",
" target_codings = np.roll(codings_rnd, -1, axis=0)\n",
" for iteration in range(n_iterations + 1):\n",
" codings_interpolate = codings_rnd + (target_codings - codings_rnd) * iteration / n_iterations\n",
" outputs_val = outputs.eval(feed_dict={codings: codings_interpolate})\n",
" plt.figure(figsize=(11, 1.5*n_iterations))\n",
" for digit_index in range(n_digits):\n",
" plt.subplot(1, n_digits, digit_index + 1)\n",
" plot_image(outputs_val[digit_index])\n",
" plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"# Exercise solutions"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Coming soon..."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
},
"nav_menu": {
"height": "381px",
"width": "453px"
},
"toc": {
"navigate_menu": true,
"number_sections": true,
"sideBar": true,
"threshold": 6,
"toc_cell": false,
"toc_section_display": "block",
"toc_window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 1
}