{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "**Chapter 15 – Autoencoders**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "_This notebook contains all the sample code and solutions to the exercices in chapter 15._" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Setup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, let's make sure this notebook works well in both python 2 and 3, import a few common modules, ensure MatplotLib plots figures inline and prepare a function to save the figures:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# To support both python 2 and python 3\n", "from __future__ import division, print_function, unicode_literals\n", "\n", "# Common imports\n", "import numpy as np\n", "import os\n", "import sys\n", "\n", "# to make this notebook's output stable across runs\n", "def reset_graph(seed=42):\n", " tf.reset_default_graph()\n", " tf.set_random_seed(seed)\n", " np.random.seed(seed)\n", "\n", "# To plot pretty figures\n", "%matplotlib inline\n", "import matplotlib\n", "import matplotlib.pyplot as plt\n", "plt.rcParams['axes.labelsize'] = 14\n", "plt.rcParams['xtick.labelsize'] = 12\n", "plt.rcParams['ytick.labelsize'] = 12\n", "\n", "# Where to save the figures\n", "PROJECT_ROOT_DIR = \".\"\n", "CHAPTER_ID = \"autoencoders\"\n", "\n", "def save_fig(fig_id, tight_layout=True):\n", " path = os.path.join(PROJECT_ROOT_DIR, \"images\", CHAPTER_ID, fig_id + \".png\")\n", " print(\"Saving figure\", fig_id)\n", " if tight_layout:\n", " plt.tight_layout()\n", " plt.savefig(path, format='png', dpi=300)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A couple utility functions to plot grayscale 28x28 image:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def plot_image(image, shape=[28, 28]):\n", " plt.imshow(image.reshape(shape), cmap=\"Greys\", interpolation=\"nearest\")\n", " plt.axis(\"off\")" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def plot_multiple_images(images, n_rows, n_cols, pad=2):\n", " images = images - images.min() # make the minimum == 0, so the padding looks white\n", " w,h = images.shape[1:]\n", " image = np.zeros(((w+pad)*n_rows+pad, (h+pad)*n_cols+pad))\n", " for y in range(n_rows):\n", " for x in range(n_cols):\n", " image[(y*(h+pad)+pad):(y*(h+pad)+pad+h),(x*(w+pad)+pad):(x*(w+pad)+pad+w)] = images[y*n_cols+x]\n", " plt.imshow(image, cmap=\"Greys\", interpolation=\"nearest\")\n", " plt.axis(\"off\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# PCA with a linear Autoencoder" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Build 3D dataset:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": true }, "outputs": [], "source": [ "rnd.seed(4)\n", "m = 200\n", "w1, w2 = 0.1, 0.3\n", "noise = 0.1\n", "\n", "angles = rnd.rand(m) * 3 * np.pi / 2 - 0.5\n", "data = np.empty((m, 3))\n", "data[:, 0] = np.cos(angles) + np.sin(angles)/2 + noise * rnd.randn(m) / 2\n", "data[:, 1] = np.sin(angles) * 0.7 + noise * rnd.randn(m) / 2\n", "data[:, 2] = data[:, 0] * w1 + data[:, 1] * w2 + noise * rnd.randn(m)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Normalize the data:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "from sklearn.preprocessing import StandardScaler\n", "scaler = StandardScaler()\n", "X_train = scaler.fit_transform(data[:100])\n", "X_test = scaler.transform(data[100:])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's build the Autoencoder..." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note: instead of using the `fully_connected()` function from the `tensorflow.contrib.layers` module (as in the book), we now use the `dense()` function from the `tf.layers` module, which did not exist when this chapter was written. This is preferable because anything in contrib may change or be deleted without notice, while `tf.layers` is part of the official API. As you will see, the code is mostly the same.\n", "\n", "The main differences relevant to this chapter are:\n", "* the `scope` parameter was renamed to `name`, and the `_fn` suffix was removed in all the parameters that had it (for example the `activation_fn` parameter was renamed to `activation`).\n", "* the `weights` parameter was renamed to `kernel` and the weights variable is now named `\"kernel\"` rather than `\"weights\"`,\n", "* the bias variable is now named `\"bias\"` rather than `\"biases\"`,\n", "* the default activation is `None` instead of `tf.nn.relu`" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "import tensorflow as tf\n", "\n", "reset_graph()\n", "\n", "n_inputs = 3\n", "n_hidden = 2 # codings\n", "n_outputs = n_inputs\n", "\n", "learning_rate = 0.01\n", "\n", "X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n", "hidden = tf.layers.dense(X, n_hidden)\n", "outputs = tf.layers.dense(hidden, n_outputs)\n", "\n", "reconstruction_loss = tf.reduce_mean(tf.square(outputs - X))\n", "\n", "optimizer = tf.train.AdamOptimizer(learning_rate)\n", "training_op = optimizer.minimize(reconstruction_loss)\n", "\n", "init = tf.global_variables_initializer()" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "n_iterations = 1000\n", "codings = hidden\n", "\n", "with tf.Session() as sess:\n", " init.run()\n", " for iteration in range(n_iterations):\n", " training_op.run(feed_dict={X: X_train})\n", " codings_val = codings.eval(feed_dict={X: X_test})" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "fig = plt.figure(figsize=(4,3))\n", "plt.plot(codings_val[:,0], codings_val[:, 1], \"b.\")\n", "plt.xlabel(\"$z_1$\", fontsize=18)\n", "plt.ylabel(\"$z_2$\", fontsize=18, rotation=0)\n", "save_fig(\"linear_autoencoder_pca_plot\")\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Stacked Autoencoders" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's use MNIST:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "from tensorflow.examples.tutorials.mnist import input_data\n", "mnist = input_data.read_data_sets(\"/tmp/data/\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Train all layers at once" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's build a stacked Autoencoder with 3 hidden layers and 1 output layer (ie. 2 stacked Autoencoders). We will use ELU activation, He initialization and L2 regularization." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note: since the `tf.layers.dense()` function is incompatible with `tf.contrib.layers.arg_scope()` (which is used in the book), we now use python's `functools.partial()` function instead. It makes it easy to create a `my_dense_layer()` function that just calls `tf.layers.dense()` with the desired parameters automatically set (unless they are overridden when calling `my_dense_layer()`)." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", "\n", "from functools import partial\n", "\n", "n_inputs = 28 * 28\n", "n_hidden1 = 300\n", "n_hidden2 = 150 # codings\n", "n_hidden3 = n_hidden1\n", "n_outputs = n_inputs\n", "\n", "learning_rate = 0.01\n", "l2_reg = 0.0001\n", "\n", "X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n", "\n", "he_init = tf.contrib.layers.variance_scaling_initializer() # He initialization\n", "#Equivalent to:\n", "#he_init = lambda shape, dtype=tf.float32: tf.truncated_normal(shape, 0., stddev=np.sqrt(2/shape[0]))\n", "l2_regularizer = tf.contrib.layers.l2_regularizer(l2_reg)\n", "my_dense_layer = partial(tf.layers.dense,\n", " activation=tf.nn.elu,\n", " kernel_initializer=he_init,\n", " kernel_regularizer=l2_regularizer)\n", "\n", "hidden1 = my_dense_layer(X, n_hidden1)\n", "hidden2 = my_dense_layer(hidden1, n_hidden2)\n", "hidden3 = my_dense_layer(hidden2, n_hidden3)\n", "outputs = my_dense_layer(hidden3, n_outputs, activation=None)\n", "\n", "reconstruction_loss = tf.reduce_mean(tf.square(outputs - X))\n", "\n", "reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)\n", "loss = tf.add_n([reconstruction_loss] + reg_losses)\n", "\n", "optimizer = tf.train.AdamOptimizer(learning_rate)\n", "training_op = optimizer.minimize(loss)\n", "\n", "init = tf.global_variables_initializer()\n", "saver = tf.train.Saver() # not shown in the book" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's train it! Note that we don't feed target values (`y_batch` is not used). This is unsupervised training." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "n_epochs = 5\n", "batch_size = 150\n", "\n", "with tf.Session() as sess:\n", " init.run()\n", " for epoch in range(n_epochs):\n", " n_batches = mnist.train.num_examples // batch_size\n", " for iteration in range(n_batches):\n", " print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\") # not shown in the book\n", " sys.stdout.flush() # not shown\n", " X_batch, y_batch = mnist.train.next_batch(batch_size)\n", " sess.run(training_op, feed_dict={X: X_batch})\n", " loss_train = reconstruction_loss.eval(feed_dict={X: X_batch}) # not shown\n", " print(\"\\r{}\".format(epoch), \"Train MSE:\", loss_train) # not shown\n", " saver.save(sess, \"./my_model_all_layers.ckpt\") # not shown" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This function loads the model, evaluates it on the test set (it measures the reconstruction error), then it displays the original image and its reconstruction:" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "def show_reconstructed_digits(X, outputs, model_path = None, n_test_digits = 2):\n", " with tf.Session() as sess:\n", " if model_path:\n", " saver.restore(sess, model_path)\n", " X_test = mnist.test.images[:n_test_digits]\n", " outputs_val = outputs.eval(feed_dict={X: X_test})\n", "\n", " fig = plt.figure(figsize=(8, 3 * n_test_digits))\n", " for digit_index in range(n_test_digits):\n", " plt.subplot(n_test_digits, 2, digit_index * 2 + 1)\n", " plot_image(X_test[digit_index])\n", " plt.subplot(n_test_digits, 2, digit_index * 2 + 2)\n", " plot_image(outputs_val[digit_index])" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "show_reconstructed_digits(X, outputs, \"./my_model_all_layers.ckpt\")\n", "save_fig(\"reconstruction_plot\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Tying weights" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is common to tie the weights of the encoder and the decoder (`weights_decoder = tf.transpose(weights_encoder)`). Unfortunately this makes it impossible (or very tricky) to use the `tf.layers.dense()` function, so we need to build the Autoencoder manually:" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "collapsed": true }, "outputs": [], "source": [ "reset_graph()\n", "\n", "n_inputs = 28 * 28\n", "n_hidden1 = 300\n", "n_hidden2 = 150 # codings\n", "n_hidden3 = n_hidden1\n", "n_outputs = n_inputs\n", "\n", "learning_rate = 0.01\n", "l2_reg = 0.0005" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "activation = tf.nn.elu\n", "regularizer = tf.contrib.layers.l2_regularizer(l2_reg)\n", "initializer = tf.contrib.layers.variance_scaling_initializer()\n", "\n", "X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n", "\n", "weights1_init = initializer([n_inputs, n_hidden1])\n", "weights2_init = initializer([n_hidden1, n_hidden2])\n", "\n", "weights1 = tf.Variable(weights1_init, dtype=tf.float32, name=\"weights1\")\n", "weights2 = tf.Variable(weights2_init, dtype=tf.float32, name=\"weights2\")\n", "weights3 = tf.transpose(weights2, name=\"weights3\") # tied weights\n", "weights4 = tf.transpose(weights1, name=\"weights4\") # tied weights\n", "\n", "biases1 = tf.Variable(tf.zeros(n_hidden1), name=\"biases1\")\n", "biases2 = tf.Variable(tf.zeros(n_hidden2), name=\"biases2\")\n", "biases3 = tf.Variable(tf.zeros(n_hidden3), name=\"biases3\")\n", "biases4 = tf.Variable(tf.zeros(n_outputs), name=\"biases4\")\n", "\n", "hidden1 = activation(tf.matmul(X, weights1) + biases1)\n", "hidden2 = activation(tf.matmul(hidden1, weights2) + biases2)\n", "hidden3 = activation(tf.matmul(hidden2, weights3) + biases3)\n", "outputs = tf.matmul(hidden3, weights4) + biases4\n", "\n", "reconstruction_loss = tf.reduce_mean(tf.square(outputs - X))\n", "reg_loss = regularizer(weights1) + regularizer(weights2)\n", "loss = reconstruction_loss + reg_loss\n", "\n", "optimizer = tf.train.AdamOptimizer(learning_rate)\n", "training_op = optimizer.minimize(loss)\n", "\n", "init = tf.global_variables_initializer()" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "collapsed": true }, "outputs": [], "source": [ "saver = tf.train.Saver()" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "n_epochs = 5\n", "batch_size = 150\n", "\n", "with tf.Session() as sess:\n", " init.run()\n", " for epoch in range(n_epochs):\n", " n_batches = mnist.train.num_examples // batch_size\n", " for iteration in range(n_batches):\n", " print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n", " sys.stdout.flush()\n", " X_batch, y_batch = mnist.train.next_batch(batch_size)\n", " sess.run(training_op, feed_dict={X: X_batch})\n", " loss_train = reconstruction_loss.eval(feed_dict={X: X_batch})\n", " print(\"\\r{}\".format(epoch), \"Train MSE:\", loss_train)\n", " saver.save(sess, \"./my_model_tying_weights.ckpt\")" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "scrolled": true }, "outputs": [], "source": [ "show_reconstructed_digits(X, outputs, \"./my_model_tying_weights.ckpt\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Training one Autoencoder at a time in multiple graphs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There are many ways to train one Autoencoder at a time. The first approach it to train each Autoencoder using a different graph, then we create the Stacked Autoencoder by simply initializing it with the weights and biases copied from these Autoencoders." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's create a function that will train one autoencoder and return the transformed training set (i.e., the output of the hidden layer) and the model parameters." ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "collapsed": true }, "outputs": [], "source": [ "reset_graph()\n", "\n", "from functools import partial\n", "\n", "def train_autoencoder(X_train, n_neurons, n_epochs, batch_size,\n", " learning_rate = 0.01, l2_reg = 0.0005,\n", " activation=tf.nn.elu, seed=42):\n", " graph = tf.Graph()\n", " with graph.as_default():\n", " tf.set_random_seed(seed)\n", "\n", " n_inputs = X_train.shape[1]\n", "\n", " X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n", " \n", " my_dense_layer = partial(\n", " tf.layers.dense,\n", " activation=activation,\n", " kernel_initializer=tf.contrib.layers.variance_scaling_initializer(),\n", " kernel_regularizer=tf.contrib.layers.l2_regularizer(l2_reg))\n", "\n", " hidden = my_dense_layer(X, n_neurons, name=\"hidden\")\n", " outputs = my_dense_layer(hidden, n_inputs, activation=None, name=\"outputs\")\n", "\n", " reconstruction_loss = tf.reduce_mean(tf.square(outputs - X))\n", "\n", " reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)\n", " loss = tf.add_n([reconstruction_loss] + reg_losses)\n", "\n", " optimizer = tf.train.AdamOptimizer(learning_rate)\n", " training_op = optimizer.minimize(loss)\n", "\n", " init = tf.global_variables_initializer()\n", "\n", " with tf.Session(graph=graph) as sess:\n", " init.run()\n", " for epoch in range(n_epochs):\n", " n_batches = len(X_train) // batch_size\n", " for iteration in range(n_batches):\n", " print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n", " sys.stdout.flush()\n", " indices = rnd.permutation(len(X_train))[:batch_size]\n", " X_batch = X_train[indices]\n", " sess.run(training_op, feed_dict={X: X_batch})\n", " loss_train = reconstruction_loss.eval(feed_dict={X: X_batch})\n", " print(\"\\r{}\".format(epoch), \"Train MSE:\", loss_train)\n", " params = dict([(var.name, var.eval()) for var in tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES)])\n", " hidden_val = hidden.eval(feed_dict={X: X_train})\n", " return hidden_val, params[\"hidden/kernel:0\"], params[\"hidden/bias:0\"], params[\"outputs/kernel:0\"], params[\"outputs/bias:0\"]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's train two Autoencoders. The first one is trained on the training data, and the second is trained on the previous Autoencoder's hidden layer output:" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "hidden_output, W1, b1, W4, b4 = train_autoencoder(mnist.train.images, n_neurons=300, n_epochs=4, batch_size=150)\n", "_, W2, b2, W3, b3 = train_autoencoder(hidden_output, n_neurons=150, n_epochs=4, batch_size=150)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, we can create a Stacked Autoencoder by simply reusing the weights and biases from the Autoencoders we just trained:" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", "\n", "n_inputs = 28*28\n", "\n", "X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n", "hidden1 = tf.nn.elu(tf.matmul(X, W1) + b1)\n", "hidden2 = tf.nn.elu(tf.matmul(hidden1, W2) + b2)\n", "hidden3 = tf.nn.elu(tf.matmul(hidden2, W3) + b3)\n", "outputs = tf.matmul(hidden3, W4) + b4" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [], "source": [ "show_reconstructed_digits(X, outputs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Training one Autoencoder at a time in a single graph" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Another approach is to use a single graph. To do this, we create the graph for the full Stacked Autoencoder, but then we also add operations to train each Autoencoder independently: phase 1 trains the bottom and top layer (ie. the first Autoencoder) and phase 2 trains the two middle layers (ie. the second Autoencoder)." ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "collapsed": true }, "outputs": [], "source": [ "reset_graph()\n", "\n", "n_inputs = 28 * 28\n", "n_hidden1 = 300\n", "n_hidden2 = 150 # codings\n", "n_hidden3 = n_hidden1\n", "n_outputs = n_inputs\n", "\n", "learning_rate = 0.01\n", "l2_reg = 0.0001\n", "\n", "activation = tf.nn.elu\n", "regularizer = tf.contrib.layers.l2_regularizer(l2_reg)\n", "initializer = tf.contrib.layers.variance_scaling_initializer()\n", "\n", "X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n", "\n", "weights1_init = initializer([n_inputs, n_hidden1])\n", "weights2_init = initializer([n_hidden1, n_hidden2])\n", "weights3_init = initializer([n_hidden2, n_hidden3])\n", "weights4_init = initializer([n_hidden3, n_outputs])\n", "\n", "weights1 = tf.Variable(weights1_init, dtype=tf.float32, name=\"weights1\")\n", "weights2 = tf.Variable(weights2_init, dtype=tf.float32, name=\"weights2\")\n", "weights3 = tf.Variable(weights3_init, dtype=tf.float32, name=\"weights3\")\n", "weights4 = tf.Variable(weights4_init, dtype=tf.float32, name=\"weights4\")\n", "\n", "biases1 = tf.Variable(tf.zeros(n_hidden1), name=\"biases1\")\n", "biases2 = tf.Variable(tf.zeros(n_hidden2), name=\"biases2\")\n", "biases3 = tf.Variable(tf.zeros(n_hidden3), name=\"biases3\")\n", "biases4 = tf.Variable(tf.zeros(n_outputs), name=\"biases4\")\n", "\n", "hidden1 = activation(tf.matmul(X, weights1) + biases1)\n", "hidden2 = activation(tf.matmul(hidden1, weights2) + biases2)\n", "hidden3 = activation(tf.matmul(hidden2, weights3) + biases3)\n", "outputs = tf.matmul(hidden3, weights4) + biases4\n", "\n", "reconstruction_loss = tf.reduce_mean(tf.square(outputs - X))" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "optimizer = tf.train.AdamOptimizer(learning_rate)\n", "\n", "with tf.name_scope(\"phase1\"):\n", " phase1_outputs = tf.matmul(hidden1, weights4) + biases4 # bypass hidden2 and hidden3\n", " phase1_reconstruction_loss = tf.reduce_mean(tf.square(phase1_outputs - X))\n", " phase1_reg_loss = regularizer(weights1) + regularizer(weights4)\n", " phase1_loss = phase1_reconstruction_loss + phase1_reg_loss\n", " phase1_training_op = optimizer.minimize(phase1_loss)\n", "\n", "with tf.name_scope(\"phase2\"):\n", " phase2_reconstruction_loss = tf.reduce_mean(tf.square(hidden3 - hidden1))\n", " phase2_reg_loss = regularizer(weights2) + regularizer(weights3)\n", " phase2_loss = phase2_reconstruction_loss + phase2_reg_loss\n", " train_vars = [weights2, biases2, weights3, biases3]\n", " phase2_training_op = optimizer.minimize(phase2_loss, var_list=train_vars) # freeze hidden1" ] }, { "cell_type": "code", "execution_count": 25, "metadata": { "collapsed": true }, "outputs": [], "source": [ "init = tf.global_variables_initializer()\n", "saver = tf.train.Saver()" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [], "source": [ "training_ops = [phase1_training_op, phase2_training_op]\n", "reconstruction_losses = [phase1_reconstruction_loss, phase2_reconstruction_loss]\n", "n_epochs = [4, 4]\n", "batch_sizes = [150, 150]\n", "\n", "with tf.Session() as sess:\n", " init.run()\n", " for phase in range(2):\n", " print(\"Training phase #{}\".format(phase + 1))\n", " for epoch in range(n_epochs[phase]):\n", " n_batches = mnist.train.num_examples // batch_sizes[phase]\n", " for iteration in range(n_batches):\n", " print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n", " sys.stdout.flush()\n", " X_batch, y_batch = mnist.train.next_batch(batch_sizes[phase])\n", " sess.run(training_ops[phase], feed_dict={X: X_batch})\n", " loss_train = reconstruction_losses[phase].eval(feed_dict={X: X_batch})\n", " print(\"\\r{}\".format(epoch), \"Train MSE:\", loss_train)\n", " saver.save(sess, \"./my_model_one_at_a_time.ckpt\")\n", " loss_test = reconstruction_loss.eval(feed_dict={X: mnist.test.images})\n", " print(\"Test MSE:\", loss_test)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cache the frozen layer outputs" ] }, { "cell_type": "code", "execution_count": 27, "metadata": { "scrolled": true }, "outputs": [], "source": [ "training_ops = [phase1_training_op, phase2_training_op]\n", "reconstruction_losses = [phase1_reconstruction_loss, phase2_reconstruction_loss]\n", "n_epochs = [4, 4]\n", "batch_sizes = [150, 150]\n", "\n", "with tf.Session() as sess:\n", " init.run()\n", " for phase in range(2):\n", " print(\"Training phase #{}\".format(phase + 1))\n", " if phase == 1:\n", " hidden1_cache = hidden1.eval(feed_dict={X: mnist.train.images})\n", " for epoch in range(n_epochs[phase]):\n", " n_batches = mnist.train.num_examples // batch_sizes[phase]\n", " for iteration in range(n_batches):\n", " print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n", " sys.stdout.flush()\n", " if phase == 1:\n", " indices = rnd.permutation(mnist.train.num_examples)\n", " hidden1_batch = hidden1_cache[indices[:batch_sizes[phase]]]\n", " feed_dict = {hidden1: hidden1_batch}\n", " sess.run(training_ops[phase], feed_dict=feed_dict)\n", " else:\n", " X_batch, y_batch = mnist.train.next_batch(batch_sizes[phase])\n", " feed_dict = {X: X_batch}\n", " sess.run(training_ops[phase], feed_dict=feed_dict)\n", " loss_train = reconstruction_losses[phase].eval(feed_dict=feed_dict)\n", " print(\"\\r{}\".format(epoch), \"Train MSE:\", loss_train)\n", " saver.save(sess, \"./my_model_cache_frozen.ckpt\")\n", " loss_test = reconstruction_loss.eval(feed_dict={X: mnist.test.images})\n", " print(\"Test MSE:\", loss_test)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Visualizing the Reconstructions" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [], "source": [ "n_test_digits = 2\n", "X_test = mnist.test.images[:n_test_digits]\n", "\n", "with tf.Session() as sess:\n", " saver.restore(sess, \"./my_model_one_at_a_time.ckpt\") # not shown in the book\n", " outputs_val = outputs.eval(feed_dict={X: X_test})\n", "\n", "def plot_image(image, shape=[28, 28]):\n", " plt.imshow(image.reshape(shape), cmap=\"Greys\", interpolation=\"nearest\")\n", " plt.axis(\"off\")\n", "\n", "for digit_index in range(n_test_digits):\n", " plt.subplot(n_test_digits, 2, digit_index * 2 + 1)\n", " plot_image(X_test[digit_index])\n", " plt.subplot(n_test_digits, 2, digit_index * 2 + 2)\n", " plot_image(outputs_val[digit_index])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Visualizing the extracted features" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [], "source": [ "with tf.Session() as sess:\n", " saver.restore(sess, \"./my_model_one_at_a_time.ckpt\") # not shown in the book\n", " weights1_val = weights1.eval()\n", "\n", "for i in range(5):\n", " plt.subplot(1, 5, i + 1)\n", " plot_image(weights1_val.T[i])\n", "\n", "save_fig(\"extracted_features_plot\") # not shown\n", "plt.show() # not shown" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Unsupervised pretraining" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's create a small neural network for MNIST classification:" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", "\n", "n_inputs = 28 * 28\n", "n_hidden1 = 300\n", "n_hidden2 = 150\n", "n_outputs = 10\n", "\n", "learning_rate = 0.01\n", "l2_reg = 0.0005\n", "\n", "activation = tf.nn.elu\n", "regularizer = tf.contrib.layers.l2_regularizer(l2_reg)\n", "initializer = tf.contrib.layers.variance_scaling_initializer()\n", "\n", "X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n", "y = tf.placeholder(tf.int32, shape=[None])\n", "\n", "weights1_init = initializer([n_inputs, n_hidden1])\n", "weights2_init = initializer([n_hidden1, n_hidden2])\n", "weights3_init = initializer([n_hidden2, n_outputs])\n", "\n", "weights1 = tf.Variable(weights1_init, dtype=tf.float32, name=\"weights1\")\n", "weights2 = tf.Variable(weights2_init, dtype=tf.float32, name=\"weights2\")\n", "weights3 = tf.Variable(weights3_init, dtype=tf.float32, name=\"weights3\")\n", "\n", "biases1 = tf.Variable(tf.zeros(n_hidden1), name=\"biases1\")\n", "biases2 = tf.Variable(tf.zeros(n_hidden2), name=\"biases2\")\n", "biases3 = tf.Variable(tf.zeros(n_outputs), name=\"biases3\")\n", "\n", "hidden1 = activation(tf.matmul(X, weights1) + biases1)\n", "hidden2 = activation(tf.matmul(hidden1, weights2) + biases2)\n", "logits = tf.matmul(hidden2, weights3) + biases3\n", "\n", "cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=logits)\n", "reg_loss = regularizer(weights1) + regularizer(weights2) + regularizer(weights3)\n", "loss = cross_entropy + reg_loss\n", "optimizer = tf.train.AdamOptimizer(learning_rate)\n", "training_op = optimizer.minimize(loss)\n", "\n", "correct = tf.nn.in_top_k(logits, y, 1)\n", "accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))\n", "\n", "init = tf.global_variables_initializer()\n", "pretrain_saver = tf.train.Saver([weights1, weights2, biases1, biases2])\n", "saver = tf.train.Saver()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Regular training (without pretraining):" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [], "source": [ "n_epochs = 4\n", "batch_size = 150\n", "n_labeled_instances = 20000\n", "\n", "with tf.Session() as sess:\n", " init.run()\n", " for epoch in range(n_epochs):\n", " n_batches = n_labeled_instances // batch_size\n", " for iteration in range(n_batches):\n", " print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n", " sys.stdout.flush()\n", " indices = rnd.permutation(n_labeled_instances)[:batch_size]\n", " X_batch, y_batch = mnist.train.images[indices], mnist.train.labels[indices]\n", " sess.run(training_op, feed_dict={X: X_batch, y: y_batch})\n", " accuracy_val = accuracy.eval(feed_dict={X: X_batch, y: y_batch})\n", " print(\"\\r{}\".format(epoch), \"Train accuracy:\", accuracy_val, end=\" \")\n", " saver.save(sess, \"./my_model_supervised.ckpt\")\n", " accuracy_val = accuracy.eval(feed_dict={X: mnist.test.images, y: mnist.test.labels})\n", " print(\"Test accuracy:\", accuracy_val)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now reusing the first two layers of the autoencoder we pretrained:" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [], "source": [ "n_epochs = 4\n", "batch_size = 150\n", "n_labeled_instances = 20000\n", "\n", "#training_op = optimizer.minimize(loss, var_list=[weights3, biases3]) # Freeze layers 1 and 2 (optional)\n", "\n", "with tf.Session() as sess:\n", " init.run()\n", " pretrain_saver.restore(sess, \"./my_model_cache_frozen.ckpt\")\n", " for epoch in range(n_epochs):\n", " n_batches = n_labeled_instances // batch_size\n", " for iteration in range(n_batches):\n", " print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n", " sys.stdout.flush()\n", " indices = rnd.permutation(n_labeled_instances)[:batch_size]\n", " X_batch, y_batch = mnist.train.images[indices], mnist.train.labels[indices]\n", " sess.run(training_op, feed_dict={X: X_batch, y: y_batch})\n", " accuracy_val = accuracy.eval(feed_dict={X: X_batch, y: y_batch})\n", " print(\"\\r{}\".format(epoch), \"Train accuracy:\", accuracy_val, end=\"\\t\")\n", " saver.save(sess, \"./my_model_supervised_pretrained.ckpt\")\n", " accuracy_val = accuracy.eval(feed_dict={X: mnist.test.images, y: mnist.test.labels})\n", " print(\"Test accuracy:\", accuracy_val)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Stacked denoising Autoencoder" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note: the book uses `tf.contrib.layers.dropout()` rather than `tf.layers.dropout()` (which did not exist when this chapter was written). It is now preferable to use `tf.layers.dropout()`, because anything in the contrib module may change or be deleted without notice. The `tf.layers.dropout()` function is almost identical to the `tf.contrib.layers.dropout()` function, except for a few minor differences. Most importantly:\n", "* you must specify the dropout rate (`rate`) rather than the keep probability (`keep_prob`), where `rate` is simply equal to `1 - keep_prob`,\n", "* the `is_training` parameter is renamed to `training`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using Gaussian noise:" ] }, { "cell_type": "code", "execution_count": 33, "metadata": { "collapsed": true }, "outputs": [], "source": [ "reset_graph()\n", "\n", "n_inputs = 28 * 28\n", "n_hidden1 = 300\n", "n_hidden2 = 150 # codings\n", "n_hidden3 = n_hidden1\n", "n_outputs = n_inputs\n", "\n", "learning_rate = 0.01" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [], "source": [ "noise_level = 1.0\n", "\n", "X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n", "X_noisy = X + noise_level * tf.random_normal(tf.shape(X))\n", "\n", "hidden1 = tf.layers.dense(X_noisy, n_hidden1, activation=tf.nn.relu,\n", " name=\"hidden1\")\n", "hidden2 = tf.layers.dense(hidden1, n_hidden2, activation=tf.nn.relu, # not shown in the book\n", " name=\"hidden2\") # not shown\n", "hidden3 = tf.layers.dense(hidden2, n_hidden3, activation=tf.nn.relu, # not shown\n", " name=\"hidden3\") # not shown\n", "outputs = tf.layers.dense(hidden3, n_outputs, name=\"outputs\") # not shown\n", "\n", "reconstruction_loss = tf.reduce_mean(tf.square(outputs - X)) # MSE" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [], "source": [ "optimizer = tf.train.AdamOptimizer(learning_rate)\n", "training_op = optimizer.minimize(reconstruction_loss)\n", " \n", "init = tf.global_variables_initializer()\n", "saver = tf.train.Saver()" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [], "source": [ "n_epochs = 10\n", "batch_size = 150\n", "\n", "with tf.Session() as sess:\n", " init.run()\n", " for epoch in range(n_epochs):\n", " n_batches = mnist.train.num_examples // batch_size\n", " for iteration in range(n_batches):\n", " print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n", " sys.stdout.flush()\n", " X_batch, y_batch = mnist.train.next_batch(batch_size)\n", " sess.run(training_op, feed_dict={X: X_batch})\n", " loss_train = reconstruction_loss.eval(feed_dict={X: X_batch})\n", " print(\"\\r{}\".format(epoch), \"Train MSE:\", loss_train)\n", " saver.save(sess, \"./my_model_stacked_denoising_gaussian.ckpt\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using dropout:" ] }, { "cell_type": "code", "execution_count": 37, "metadata": { "collapsed": true }, "outputs": [], "source": [ "reset_graph()\n", "\n", "n_inputs = 28 * 28\n", "n_hidden1 = 300\n", "n_hidden2 = 150 # codings\n", "n_hidden3 = n_hidden1\n", "n_outputs = n_inputs\n", "\n", "learning_rate = 0.01" ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [], "source": [ "dropout_rate = 0.3\n", "\n", "training = tf.placeholder_with_default(False, shape=(), name='training')\n", "\n", "X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n", "X_drop = tf.layers.dropout(X, dropout_rate, training=training)\n", "\n", "hidden1 = tf.layers.dense(X_drop, n_hidden1, activation=tf.nn.relu,\n", " name=\"hidden1\")\n", "hidden2 = tf.layers.dense(hidden1, n_hidden2, activation=tf.nn.relu, # not shown in the book\n", " name=\"hidden2\") # not shown\n", "hidden3 = tf.layers.dense(hidden2, n_hidden3, activation=tf.nn.relu, # not shown\n", " name=\"hidden3\") # not shown\n", "outputs = tf.layers.dense(hidden3, n_outputs, name=\"outputs\") # not shown\n", "\n", "reconstruction_loss = tf.reduce_mean(tf.square(outputs - X)) # MSE" ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [], "source": [ "optimizer = tf.train.AdamOptimizer(learning_rate)\n", "training_op = optimizer.minimize(reconstruction_loss)\n", " \n", "init = tf.global_variables_initializer()\n", "saver = tf.train.Saver()" ] }, { "cell_type": "code", "execution_count": 40, "metadata": {}, "outputs": [], "source": [ "n_epochs = 10\n", "batch_size = 150\n", "\n", "with tf.Session() as sess:\n", " init.run()\n", " for epoch in range(n_epochs):\n", " n_batches = mnist.train.num_examples // batch_size\n", " for iteration in range(n_batches):\n", " print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n", " sys.stdout.flush()\n", " X_batch, y_batch = mnist.train.next_batch(batch_size)\n", " sess.run(training_op, feed_dict={X: X_batch, training: True})\n", " loss_train = reconstruction_loss.eval(feed_dict={X: X_batch})\n", " print(\"\\r{}\".format(epoch), \"Train MSE:\", loss_train)\n", " saver.save(sess, \"./my_model_stacked_denoising_dropout.ckpt\")" ] }, { "cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [], "source": [ "show_reconstructed_digits(X, outputs, \"./my_model_stacked_denoising_dropout.ckpt\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Sparse Autoencoder" ] }, { "cell_type": "code", "execution_count": 42, "metadata": {}, "outputs": [], "source": [ "p = 0.1\n", "q = np.linspace(0.001, 0.999, 500)\n", "kl_div = p * np.log(p / q) + (1 - p) * np.log((1 - p) / (1 - q))\n", "mse = (p - q)**2\n", "plt.plot([p, p], [0, 0.3], \"k:\")\n", "plt.text(0.05, 0.32, \"Target\\nsparsity\", fontsize=14)\n", "plt.plot(q, kl_div, \"b-\", label=\"KL divergence\")\n", "plt.plot(q, mse, \"r--\", label=\"MSE\")\n", "plt.legend(loc=\"upper left\")\n", "plt.xlabel(\"Actual sparsity\")\n", "plt.ylabel(\"Cost\", rotation=0)\n", "plt.axis([0, 1, 0, 0.95])\n", "save_fig(\"sparsity_loss_plot\")" ] }, { "cell_type": "code", "execution_count": 43, "metadata": { "collapsed": true }, "outputs": [], "source": [ "reset_graph()\n", "\n", "n_inputs = 28 * 28\n", "n_hidden1 = 1000 # sparse codings\n", "n_outputs = n_inputs" ] }, { "cell_type": "code", "execution_count": 44, "metadata": {}, "outputs": [], "source": [ "def kl_divergence(p, q):\n", " # Kullback Leibler divergence\n", " return p * tf.log(p / q) + (1 - p) * tf.log((1 - p) / (1 - q))\n", "\n", "learning_rate = 0.01\n", "sparsity_target = 0.1\n", "sparsity_weight = 0.2\n", "\n", "X = tf.placeholder(tf.float32, shape=[None, n_inputs]) # not shown in the book\n", "\n", "hidden1 = tf.layers.dense(X, n_hidden1, activation=tf.nn.sigmoid) # not shown\n", "outputs = tf.layers.dense(hidden1, n_outputs) # not shown\n", "\n", "hidden1_mean = tf.reduce_mean(hidden1, axis=0) # batch mean\n", "sparsity_loss = tf.reduce_sum(kl_divergence(sparsity_target, hidden1_mean))\n", "reconstruction_loss = tf.reduce_mean(tf.square(outputs - X)) # MSE\n", "loss = reconstruction_loss + sparsity_weight * sparsity_loss\n", "\n", "optimizer = tf.train.AdamOptimizer(learning_rate)\n", "training_op = optimizer.minimize(loss)" ] }, { "cell_type": "code", "execution_count": 45, "metadata": { "collapsed": true }, "outputs": [], "source": [ "init = tf.global_variables_initializer()\n", "saver = tf.train.Saver()" ] }, { "cell_type": "code", "execution_count": 46, "metadata": {}, "outputs": [], "source": [ "n_epochs = 100\n", "batch_size = 1000\n", "\n", "with tf.Session() as sess:\n", " init.run()\n", " for epoch in range(n_epochs):\n", " n_batches = mnist.train.num_examples // batch_size\n", " for iteration in range(n_batches):\n", " print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n", " sys.stdout.flush()\n", " X_batch, y_batch = mnist.train.next_batch(batch_size)\n", " sess.run(training_op, feed_dict={X: X_batch})\n", " reconstruction_loss_val, sparsity_loss_val, loss_val = sess.run([reconstruction_loss, sparsity_loss, loss], feed_dict={X: X_batch})\n", " print(\"\\r{}\".format(epoch), \"Train MSE:\", reconstruction_loss_val, \"\\tSparsity loss:\", sparsity_loss_val, \"\\tTotal loss:\", loss_val)\n", " saver.save(sess, \"./my_model_sparse.ckpt\")" ] }, { "cell_type": "code", "execution_count": 47, "metadata": {}, "outputs": [], "source": [ "show_reconstructed_digits(X, outputs, \"./my_model_sparse.ckpt\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that the coding layer must output values from 0 to 1, which is why we use the sigmoid activation function:" ] }, { "cell_type": "code", "execution_count": 48, "metadata": { "collapsed": true }, "outputs": [], "source": [ "hidden1 = tf.layers.dense(X, n_hidden1, activation=tf.nn.sigmoid)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To speed up training, you can normalize the inputs between 0 and 1, and use the cross entropy instead of the MSE for the cost function:" ] }, { "cell_type": "code", "execution_count": 49, "metadata": { "collapsed": true }, "outputs": [], "source": [ "logits = tf.layers.dense(hidden1, n_outputs)\n", "outputs = tf.nn.sigmoid(logits)\n", "\n", "xentropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=X, logits=logits)\n", "reconstruction_loss = tf.reduce_mean(xentropy)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Variational Autoencoder" ] }, { "cell_type": "code", "execution_count": 50, "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", "\n", "from functools import partial\n", "\n", "n_inputs = 28 * 28\n", "n_hidden1 = 500\n", "n_hidden2 = 500\n", "n_hidden3 = 20 # codings\n", "n_hidden4 = n_hidden2\n", "n_hidden5 = n_hidden1\n", "n_outputs = n_inputs\n", "learning_rate = 0.001\n", "\n", "initializer = tf.contrib.layers.variance_scaling_initializer()\n", "\n", "my_dense_layer = partial(\n", " tf.layers.dense,\n", " activation=tf.nn.elu,\n", " kernel_initializer=initializer)\n", "\n", "X = tf.placeholder(tf.float32, [None, n_inputs])\n", "hidden1 = my_dense_layer(X, n_hidden1)\n", "hidden2 = my_dense_layer(hidden1, n_hidden2)\n", "hidden3_mean = my_dense_layer(hidden2, n_hidden3, activation=None)\n", "hidden3_sigma = my_dense_layer(hidden2, n_hidden3, activation=None)\n", "noise = tf.random_normal(tf.shape(hidden3_sigma), dtype=tf.float32)\n", "hidden3 = hidden3_mean + hidden3_sigma * noise\n", "hidden4 = my_dense_layer(hidden3, n_hidden4)\n", "hidden5 = my_dense_layer(hidden4, n_hidden5)\n", "logits = my_dense_layer(hidden5, n_outputs, activation=None)\n", "outputs = tf.sigmoid(logits)\n", "\n", "xentropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=X, logits=logits)\n", "reconstruction_loss = tf.reduce_sum(xentropy)" ] }, { "cell_type": "code", "execution_count": 51, "metadata": {}, "outputs": [], "source": [ "eps = 1e-10 # smoothing term to avoid computing log(0) which is NaN\n", "latent_loss = 0.5 * tf.reduce_sum(\n", " tf.square(hidden3_sigma) + tf.square(hidden3_mean)\n", " - 1 - tf.log(eps + tf.square(hidden3_sigma)))" ] }, { "cell_type": "code", "execution_count": 52, "metadata": { "collapsed": true }, "outputs": [], "source": [ "loss = reconstruction_loss + latent_loss\n", "\n", "optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)\n", "training_op = optimizer.minimize(loss)\n", "\n", "init = tf.global_variables_initializer()\n", "saver = tf.train.Saver()" ] }, { "cell_type": "code", "execution_count": 53, "metadata": {}, "outputs": [], "source": [ "n_epochs = 50\n", "batch_size = 150\n", "\n", "with tf.Session() as sess:\n", " init.run()\n", " for epoch in range(n_epochs):\n", " n_batches = mnist.train.num_examples // batch_size\n", " for iteration in range(n_batches):\n", " print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n", " sys.stdout.flush()\n", " X_batch, y_batch = mnist.train.next_batch(batch_size)\n", " sess.run(training_op, feed_dict={X: X_batch})\n", " loss_val, reconstruction_loss_val, latent_loss_val = sess.run([loss, reconstruction_loss, latent_loss], feed_dict={X: X_batch})\n", " print(\"\\r{}\".format(epoch), \"Train total loss:\", loss_val, \"\\tReconstruction loss:\", reconstruction_loss_val, \"\\tLatent loss:\", latent_loss_val)\n", " saver.save(sess, \"./my_model_variational.ckpt\")" ] }, { "cell_type": "code", "execution_count": 54, "metadata": {}, "outputs": [], "source": [ "reset_graph()\n", "\n", "from functools import partial\n", "\n", "n_inputs = 28 * 28\n", "n_hidden1 = 500\n", "n_hidden2 = 500\n", "n_hidden3 = 20 # codings\n", "n_hidden4 = n_hidden2\n", "n_hidden5 = n_hidden1\n", "n_outputs = n_inputs\n", "learning_rate = 0.001\n", "\n", "initializer = tf.contrib.layers.variance_scaling_initializer()\n", "my_dense_layer = partial(\n", " tf.layers.dense,\n", " activation=tf.nn.elu,\n", " kernel_initializer=initializer)\n", "\n", "X = tf.placeholder(tf.float32, [None, n_inputs])\n", "hidden1 = my_dense_layer(X, n_hidden1)\n", "hidden2 = my_dense_layer(hidden1, n_hidden2)\n", "hidden3_mean = my_dense_layer(hidden2, n_hidden3, activation=None)\n", "hidden3_gamma = my_dense_layer(hidden2, n_hidden3, activation=None)\n", "noise = tf.random_normal(tf.shape(hidden3_gamma), dtype=tf.float32)\n", "hidden3 = hidden3_mean + tf.exp(0.5 * hidden3_gamma) * noise\n", "hidden4 = my_dense_layer(hidden3, n_hidden4)\n", "hidden5 = my_dense_layer(hidden4, n_hidden5)\n", "logits = my_dense_layer(hidden5, n_outputs, activation=None)\n", "outputs = tf.sigmoid(logits)\n", "\n", "xentropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=X, logits=logits)\n", "reconstruction_loss = tf.reduce_sum(xentropy)\n", "latent_loss = 0.5 * tf.reduce_sum(\n", " tf.exp(hidden3_gamma) + tf.square(hidden3_mean) - 1 - hidden3_gamma)\n", "loss = reconstruction_loss + latent_loss\n", "\n", "optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)\n", "training_op = optimizer.minimize(loss)\n", "\n", "init = tf.global_variables_initializer()\n", "saver = tf.train.Saver()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Generate digits" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's train the model and generate a few random digits:" ] }, { "cell_type": "code", "execution_count": 55, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "\n", "n_digits = 60\n", "n_epochs = 50\n", "batch_size = 150\n", "\n", "with tf.Session() as sess:\n", " init.run()\n", " for epoch in range(n_epochs):\n", " n_batches = mnist.train.num_examples // batch_size\n", " for iteration in range(n_batches):\n", " print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\") # not shown in the book\n", " sys.stdout.flush() # not shown\n", " X_batch, y_batch = mnist.train.next_batch(batch_size)\n", " sess.run(training_op, feed_dict={X: X_batch})\n", " loss_val, reconstruction_loss_val, latent_loss_val = sess.run([loss, reconstruction_loss, latent_loss], feed_dict={X: X_batch}) # not shown\n", " print(\"\\r{}\".format(epoch), \"Train total loss:\", loss_val, \"\\tReconstruction loss:\", reconstruction_loss_val, \"\\tLatent loss:\", latent_loss_val) # not shown\n", " saver.save(sess, \"./my_model_variational.ckpt\") # not shown\n", " \n", " codings_rnd = np.random.normal(size=[n_digits, n_hidden3])\n", " outputs_val = outputs.eval(feed_dict={hidden3: codings_rnd})" ] }, { "cell_type": "code", "execution_count": 56, "metadata": {}, "outputs": [], "source": [ "plt.figure(figsize=(8,50)) # not shown in the book\n", "for iteration in range(n_digits):\n", " plt.subplot(n_digits, 10, iteration + 1)\n", " plot_image(outputs_val[iteration])" ] }, { "cell_type": "code", "execution_count": 57, "metadata": {}, "outputs": [], "source": [ "n_rows = 6\n", "n_cols = 10\n", "plot_multiple_images(outputs_val.reshape(-1, 28, 28), n_rows, n_cols)\n", "save_fig(\"generated_digits_plot\")\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that the latent loss is computed differently in this second variant:" ] }, { "cell_type": "code", "execution_count": 58, "metadata": { "collapsed": true }, "outputs": [], "source": [ "latent_loss = 0.5 * tf.reduce_sum(\n", " tf.exp(hidden3_gamma) + tf.square(hidden3_mean) - 1 - hidden3_gamma)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Encode & Decode" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Encode:" ] }, { "cell_type": "code", "execution_count": 59, "metadata": {}, "outputs": [], "source": [ "n_digits = 3\n", "X_test, y_test = mnist.test.next_batch(batch_size)\n", "codings = hidden3\n", "\n", "with tf.Session() as sess:\n", " saver.restore(sess, \"./my_model_variational.ckpt\")\n", " codings_val = codings.eval(feed_dict={X: X_test})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Decode:" ] }, { "cell_type": "code", "execution_count": 60, "metadata": {}, "outputs": [], "source": [ "with tf.Session() as sess:\n", " saver.restore(sess, \"./my_model_variational.ckpt\")\n", " outputs_val = outputs.eval(feed_dict={codings: codings_val})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's plot the reconstructions:" ] }, { "cell_type": "code", "execution_count": 61, "metadata": {}, "outputs": [], "source": [ "fig = plt.figure(figsize=(8, 2.5 * n_digits))\n", "for iteration in range(n_digits):\n", " plt.subplot(n_digits, 2, 1 + 2 * iteration)\n", " plot_image(X_test[iteration])\n", " plt.subplot(n_digits, 2, 2 + 2 * iteration)\n", " plot_image(outputs_val[iteration])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Interpolate digits" ] }, { "cell_type": "code", "execution_count": 62, "metadata": { "scrolled": true }, "outputs": [], "source": [ "n_iterations = 3\n", "n_digits = 6\n", "codings_rnd = np.random.normal(size=[n_digits, n_hidden3])\n", "\n", "with tf.Session() as sess:\n", " saver.restore(sess, \"./my_model_variational.ckpt\")\n", " target_codings = np.roll(codings_rnd, -1, axis=0)\n", " for iteration in range(n_iterations + 1):\n", " codings_interpolate = codings_rnd + (target_codings - codings_rnd) * iteration / n_iterations\n", " outputs_val = outputs.eval(feed_dict={codings: codings_interpolate})\n", " plt.figure(figsize=(11, 1.5*n_iterations))\n", " for digit_index in range(n_digits):\n", " plt.subplot(1, n_digits, digit_index + 1)\n", " plot_image(outputs_val[digit_index])\n", " plt.show()" ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "# Exercise solutions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Coming soon..." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.2" }, "nav_menu": { "height": "381px", "width": "453px" }, "toc": { "navigate_menu": true, "number_sections": true, "sideBar": true, "threshold": 6, "toc_cell": false, "toc_section_display": "block", "toc_window_display": false } }, "nbformat": 4, "nbformat_minor": 1 }