handson-ml/15_autoencoders.ipynb

1589 lines
50 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Chapter 15 Autoencoders**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"_This notebook contains all the sample code and solutions to the exercices in chapter 15._"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Setup"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First, let's make sure this notebook works well in both python 2 and 3, import a few common modules, ensure MatplotLib plots figures inline and prepare a function to save the figures:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# To support both python 2 and python 3\n",
"from __future__ import division, print_function, unicode_literals\n",
"\n",
"# Common imports\n",
"import numpy as np\n",
"import numpy.random as rnd\n",
"import os\n",
"\n",
"# to make this notebook's output stable across runs\n",
"rnd.seed(42)\n",
"\n",
"# To plot pretty figures\n",
"%matplotlib inline\n",
"import matplotlib\n",
"import matplotlib.pyplot as plt\n",
"plt.rcParams['axes.labelsize'] = 14\n",
"plt.rcParams['xtick.labelsize'] = 12\n",
"plt.rcParams['ytick.labelsize'] = 12\n",
"\n",
"# Where to save the figures\n",
"PROJECT_ROOT_DIR = \".\"\n",
"CHAPTER_ID = \"autoencoders\"\n",
"\n",
"def save_fig(fig_id, tight_layout=True):\n",
" path = os.path.join(PROJECT_ROOT_DIR, \"images\", CHAPTER_ID, fig_id + \".png\")\n",
" print(\"Saving figure\", fig_id)\n",
" if tight_layout:\n",
" plt.tight_layout()\n",
" plt.savefig(path, format='png', dpi=300)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A couple utility functions to plot grayscale 28x28 image:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def plot_image(image, shape=[28, 28]):\n",
" plt.imshow(image.reshape(shape), cmap=\"Greys\", interpolation=\"nearest\")\n",
" plt.axis(\"off\")"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def plot_multiple_images(images, n_rows, n_cols, pad=2):\n",
" images = images - images.min() # make the minimum == 0, so the padding looks white\n",
" w,h = images.shape[1:]\n",
" image = np.zeros(((w+pad)*n_rows+pad, (h+pad)*n_cols+pad))\n",
" for y in range(n_rows):\n",
" for x in range(n_cols):\n",
" image[(y*(h+pad)+pad):(y*(h+pad)+pad+h),(x*(w+pad)+pad):(x*(w+pad)+pad+w)] = images[y*n_cols+x]\n",
" plt.imshow(image, cmap=\"Greys\", interpolation=\"nearest\")\n",
" plt.axis(\"off\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# PCA with a linear Autoencoder"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Build 3D dataset:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"rnd.seed(4)\n",
"m = 100\n",
"w1, w2 = 0.1, 0.3\n",
"noise = 0.1\n",
"\n",
"angles = rnd.rand(m) * 3 * np.pi / 2 - 0.5\n",
"X_train = np.empty((m, 3))\n",
"X_train[:, 0] = np.cos(angles) + np.sin(angles)/2 + noise * rnd.randn(m) / 2\n",
"X_train[:, 1] = np.sin(angles) * 0.7 + noise * rnd.randn(m) / 2\n",
"X_train[:, 2] = X_train[:, 0] * w1 + X_train[:, 1] * w2 + noise * rnd.randn(m)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Normalize the data:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from sklearn.preprocessing import StandardScaler\n",
"scaler = StandardScaler()\n",
"X_train = scaler.fit_transform(X_train)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Going to need TensorFlow..."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import tensorflow as tf"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's build the Autoencoder:"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"tf.reset_default_graph()\n",
"\n",
"from tensorflow.contrib.layers import fully_connected\n",
"\n",
"n_inputs = 3\n",
"n_hidden = 2 # codings\n",
"n_outputs = n_inputs\n",
"\n",
"learning_rate = 0.01\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
"hidden = fully_connected(X, n_hidden, activation_fn=None)\n",
"outputs = fully_connected(hidden, n_outputs, activation_fn=None)\n",
"\n",
"mse = tf.reduce_sum(tf.square(outputs - X))\n",
"\n",
"optimizer = tf.train.AdamOptimizer(learning_rate)\n",
"training_op = optimizer.minimize(mse)\n",
"\n",
"init = tf.initialize_all_variables()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"n_iterations = 10000\n",
"codings = hidden\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for iteration in range(n_iterations):\n",
" training_op.run(feed_dict={X: X_train})\n",
" codings_val = codings.eval(feed_dict={X: X_train})"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"fig = plt.figure(figsize=(4,3))\n",
"plt.plot(codings_val[:,0], codings_val[:, 1], \"b.\")\n",
"plt.xlabel(\"$z_1$\", fontsize=18)\n",
"plt.ylabel(\"$z_2$\", fontsize=18, rotation=0)\n",
"save_fig(\"linear_autoencoder_pca_plot\")\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Stacked Autoencoders"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's use MNIST:"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from tensorflow.examples.tutorials.mnist import input_data\n",
"mnist = input_data.read_data_sets(\"/tmp/data/\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train all layers at once"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's build a stacked Autoencoder with 3 hidden layers and 1 output layer (ie. 2 stacked Autoencoders). We will use ELU activation, He initialization and L2 regularization."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"tf.reset_default_graph()\n",
"\n",
"from tensorflow.contrib.layers import fully_connected\n",
"\n",
"n_inputs = 28*28\n",
"n_hidden1 = 300\n",
"n_hidden2 = 150 # codings\n",
"n_hidden3 = n_hidden1\n",
"n_outputs = n_inputs\n",
"\n",
"learning_rate = 0.01\n",
"l2_reg = 0.0001\n",
"\n",
"initializer = tf.contrib.layers.variance_scaling_initializer() # He initialization\n",
"#Equivalent to:\n",
"#initializer = lambda shape, dtype=tf.float32: tf.truncated_normal(shape, 0., stddev=np.sqrt(2/shape[0]))\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
"with tf.contrib.framework.arg_scope(\n",
" [fully_connected],\n",
" activation_fn=tf.nn.elu,\n",
" weights_initializer=initializer,\n",
" weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg)):\n",
" hidden1 = fully_connected(X, n_hidden1)\n",
" hidden2 = fully_connected(hidden1, n_hidden2)\n",
" hidden3 = fully_connected(hidden2, n_hidden3)\n",
" outputs = fully_connected(hidden3, n_outputs, activation_fn=None)\n",
"\n",
"mse = tf.reduce_mean(tf.square(outputs - X))\n",
"\n",
"reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)\n",
"loss = mse + reg_losses\n",
"\n",
"optimizer = tf.train.AdamOptimizer(learning_rate)\n",
"training_op = optimizer.minimize(loss)\n",
"\n",
"init = tf.initialize_all_variables()\n",
"saver = tf.train.Saver()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's train it! Note that we don't feed target values (`y_batch` is not used). This is unsupervised training."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"n_epochs = 4\n",
"batch_size = 150\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for epoch in range(n_epochs):\n",
" n_batches = mnist.train.num_examples // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" X_batch, y_batch = mnist.train.next_batch(batch_size)\n",
" sess.run(training_op, feed_dict={X: X_batch})\n",
" mse_train = mse.eval(feed_dict={X: X_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train MSE:\", mse_train)\n",
" saver.save(sess, \"my_model_all_layers.ckpt\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This function loads the model, evaluates it on the test set (it measures the reconstruction error), then it displays the original image and its reconstruction:"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def show_reconstructed_digits(X, outputs, model_path = None, n_test_digits = 2):\n",
" with tf.Session() as sess:\n",
" if model_path:\n",
" saver.restore(sess, model_path)\n",
" X_test = mnist.test.images[:n_test_digits]\n",
" outputs_val = outputs.eval(feed_dict={X: X_test})\n",
"\n",
" fig = plt.figure(figsize=(8, 3 * n_test_digits))\n",
" for digit_index in range(n_test_digits):\n",
" plt.subplot(n_test_digits, 2, digit_index * 2 + 1)\n",
" plot_image(X_test[digit_index])\n",
" plt.subplot(n_test_digits, 2, digit_index * 2 + 2)\n",
" plot_image(outputs_val[digit_index])"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"show_reconstructed_digits(X, outputs, \"my_model_all_layers.ckpt\")\n",
"save_fig(\"reconstruction_plot\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Training one Autoencoder at a time in multiple graphs"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"There are many ways to train one Autoencoder at a time. The first approach it to train each Autoencoder using a different graph, then we create the Stacked Autoencoder by simply initializing it with the weights and biases copied from these Autoencoders."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's create a function that will train one autoencoder and return the transformed training set (ie. the output of the hidden layer) and the model parameters."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def train_autoencoder(X_train, n_neurons, n_epochs, batch_size, learning_rate = 0.01, l2_reg = 0.0005, activation_fn=tf.nn.elu):\n",
" graph = tf.Graph()\n",
" with graph.as_default():\n",
" n_inputs = X_train.shape[1]\n",
"\n",
" X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
" with tf.contrib.framework.arg_scope(\n",
" [fully_connected],\n",
" activation_fn=activation_fn,\n",
" weights_initializer=tf.contrib.layers.variance_scaling_initializer(),\n",
" weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg)):\n",
" hidden = fully_connected(X, n_neurons, scope=\"hidden\")\n",
" outputs = fully_connected(hidden, n_inputs, activation_fn=None, scope=\"outputs\")\n",
"\n",
" mse = tf.reduce_mean(tf.square(outputs - X))\n",
"\n",
" reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)\n",
" loss = mse + reg_losses\n",
"\n",
" optimizer = tf.train.AdamOptimizer(learning_rate)\n",
" training_op = optimizer.minimize(loss)\n",
"\n",
" init = tf.initialize_all_variables()\n",
"\n",
" with tf.Session(graph=graph) as sess:\n",
" init.run()\n",
" for epoch in range(n_epochs):\n",
" n_batches = len(X_train) // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" indices = rnd.permutation(len(X_train))[:batch_size]\n",
" X_batch = X_train[indices]\n",
" sess.run(training_op, feed_dict={X: X_batch})\n",
" mse_train = mse.eval(feed_dict={X: X_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train MSE:\", mse_train)\n",
" params = dict([(var.name, var.eval()) for var in tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES)])\n",
" hidden_val = hidden.eval(feed_dict={X: X_train})\n",
" return hidden_val, params[\"hidden/weights:0\"], params[\"hidden/biases:0\"], params[\"outputs/weights:0\"], params[\"outputs/biases:0\"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's train two Autoencoders. The first one is trained on the training data, and the second is trained on the previous Autoencoder's hidden layer output:"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"hidden_output, W1, b1, W4, b4 = train_autoencoder(mnist.train.images, n_neurons=300, n_epochs=4, batch_size=150)\n",
"_, W2, b2, W3, b3 = train_autoencoder(hidden_output, n_neurons=150, n_epochs=4, batch_size=150)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, we can create a Stacked Autoencoder by simply reusing the weights and biases from the Autoencoders we just trained:"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"tf.reset_default_graph()\n",
"\n",
"n_inputs = 28*28\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
"hidden1 = tf.nn.elu(tf.matmul(X, W1) + b1)\n",
"hidden2 = tf.nn.elu(tf.matmul(hidden1, W2) + b2)\n",
"hidden3 = tf.nn.elu(tf.matmul(hidden2, W3) + b3)\n",
"outputs = tf.matmul(hidden3, W4) + b4"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"show_reconstructed_digits(X, outputs)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Training one Autoencoder at a time in a single graph"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Another approach is to use a single graph. To do this, we create the graph for the full Stacked Autoencoder, but then we also add operations to train each Autoencoder independently: phase 1 trains the bottom and top layer (ie. the first Autoencoder) and phase 2 trains the two middle layers (ie. the second Autoencoder)."
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"tf.reset_default_graph()\n",
"\n",
"n_inputs = 28 * 28\n",
"n_hidden1 = 300\n",
"n_hidden2 = 150 # codings\n",
"n_hidden3 = n_hidden1\n",
"n_outputs = n_inputs\n",
"\n",
"learning_rate = 0.01\n",
"l2_reg = 0.0001\n",
"\n",
"activation = tf.nn.elu\n",
"regularizer = tf.contrib.layers.l2_regularizer(l2_reg)\n",
"initializer = tf.contrib.layers.variance_scaling_initializer()\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
"\n",
"weights1_init = initializer([n_inputs, n_hidden1])\n",
"weights2_init = initializer([n_hidden1, n_hidden2])\n",
"weights3_init = initializer([n_hidden2, n_hidden3])\n",
"weights4_init = initializer([n_hidden3, n_outputs])\n",
"\n",
"weights1 = tf.Variable(weights1_init, dtype=tf.float32, name=\"weights1\")\n",
"weights2 = tf.Variable(weights2_init, dtype=tf.float32, name=\"weights2\")\n",
"weights3 = tf.Variable(weights3_init, dtype=tf.float32, name=\"weights3\")\n",
"weights4 = tf.Variable(weights4_init, dtype=tf.float32, name=\"weights4\")\n",
"\n",
"biases1 = tf.Variable(tf.zeros(n_hidden1), name=\"biases1\")\n",
"biases2 = tf.Variable(tf.zeros(n_hidden2), name=\"biases2\")\n",
"biases3 = tf.Variable(tf.zeros(n_hidden3), name=\"biases3\")\n",
"biases4 = tf.Variable(tf.zeros(n_outputs), name=\"biases4\")\n",
"\n",
"hidden1 = activation(tf.matmul(X, weights1) + biases1)\n",
"hidden2 = activation(tf.matmul(hidden1, weights2) + biases2)\n",
"hidden3 = activation(tf.matmul(hidden2, weights3) + biases3)\n",
"outputs = tf.matmul(hidden3, weights4) + biases4\n",
"\n",
"\n",
"with tf.name_scope(\"phase1\"):\n",
" optimizer = tf.train.AdamOptimizer(learning_rate)\n",
" phase1_outputs = tf.matmul(hidden1, weights4) + biases4 # bypass hidden2 and hidden3\n",
" phase1_mse = tf.reduce_mean(tf.square(phase1_outputs - X))\n",
" phase1_reg_loss = regularizer(weights1) + regularizer(weights4)\n",
" phase1_loss = phase1_mse + phase1_reg_loss\n",
" phase1_training_op = optimizer.minimize(phase1_loss)\n",
"\n",
"with tf.name_scope(\"phase2\"):\n",
" optimizer = tf.train.AdamOptimizer(learning_rate)\n",
" phase2_mse = tf.reduce_mean(tf.square(hidden3 - hidden1))\n",
" phase2_reg_loss = regularizer(weights2) + regularizer(weights3)\n",
" phase2_loss = phase2_mse + phase2_reg_loss\n",
" phase2_training_op = optimizer.minimize(phase2_loss, var_list=[weights2, biases2, weights3, biases3]) # freeze hidden1\n",
" \n",
"init = tf.initialize_all_variables()\n",
"saver = tf.train.Saver()"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"training_ops = [phase1_training_op, phase2_training_op]\n",
"mses = [phase1_mse, phase2_mse]\n",
"n_epochs = [4, 4]\n",
"batch_sizes = [150, 150]\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for phase in range(2):\n",
" print(\"Training phase #{}\".format(phase + 1))\n",
" for epoch in range(n_epochs[phase]):\n",
" n_batches = mnist.train.num_examples // batch_sizes[phase]\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" X_batch, y_batch = mnist.train.next_batch(batch_sizes[phase])\n",
" sess.run(training_ops[phase], feed_dict={X: X_batch})\n",
" mse_train = mses[phase].eval(feed_dict={X: X_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train MSE:\", mse_train)\n",
" saver.save(sess, \"my_model_one_at_a_time.ckpt\")\n",
" mse_test = mses[phase].eval(feed_dict={X: mnist.test.images})\n",
" print(\"Test MSE:\", mse_test)"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"show_reconstructed_digits(X, outputs, \"my_model_one_at_a_time.ckpt\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cache the frozen layer outputs"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"training_ops = [phase1_training_op, phase2_training_op, training_op]\n",
"mses = [phase1_mse, phase2_mse, mse]\n",
"n_epochs = [4, 4]\n",
"batch_sizes = [150, 150]\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for phase in range(2):\n",
" print(\"Training phase #{}\".format(phase + 1))\n",
" if phase == 1:\n",
" mnist_hidden1 = hidden1.eval(feed_dict={X: mnist.train.images})\n",
" for epoch in range(n_epochs[phase]):\n",
" n_batches = mnist.train.num_examples // batch_sizes[phase]\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" if phase == 1:\n",
" indices = rnd.permutation(len(mnist_hidden1))\n",
" hidden1_batch = mnist_hidden1[indices[:batch_sizes[phase]]]\n",
" feed_dict = {hidden1: hidden1_batch}\n",
" sess.run(training_ops[phase], feed_dict=feed_dict)\n",
" else:\n",
" X_batch, y_batch = mnist.train.next_batch(batch_sizes[phase])\n",
" feed_dict = {X: X_batch}\n",
" sess.run(training_ops[phase], feed_dict=feed_dict)\n",
" mse_train = mses[phase].eval(feed_dict=feed_dict)\n",
" print(\"\\r{}\".format(epoch), \"Train MSE:\", mse_train)\n",
" saver.save(sess, \"my_model_cache_frozen.ckpt\")\n",
" mse_test = mses[phase].eval(feed_dict={X: mnist.test.images})\n",
" print(\"Test MSE:\", mse_test)"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [],
"source": [
"show_reconstructed_digits(X, outputs, \"my_model_cache_frozen.ckpt\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Tying weights"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"It is common to tie the weights of the encoder and the decoder (`weights_decoder = tf.transpose(weights_encoder)`). Unfortunately this makes it impossible (or very tricky) to use the `fully_connected()` function, so we need to build the Autoencoder manually:"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"tf.reset_default_graph()\n",
"\n",
"n_inputs = 28 * 28\n",
"n_hidden1 = 300\n",
"n_hidden2 = 150 # codings\n",
"n_hidden3 = n_hidden1\n",
"n_outputs = n_inputs\n",
"\n",
"learning_rate = 0.01\n",
"l2_reg = 0.0005\n",
"\n",
"activation = tf.nn.elu\n",
"regularizer = tf.contrib.layers.l2_regularizer(l2_reg)\n",
"initializer = tf.contrib.layers.variance_scaling_initializer()\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
"\n",
"weights1_init = initializer([n_inputs, n_hidden1])\n",
"weights2_init = initializer([n_hidden1, n_hidden2])\n",
"\n",
"weights1 = tf.Variable(weights1_init, dtype=tf.float32, name=\"weights1\")\n",
"weights2 = tf.Variable(weights2_init, dtype=tf.float32, name=\"weights2\")\n",
"weights3 = tf.transpose(weights2, name=\"weights3\") # tied weights\n",
"weights4 = tf.transpose(weights1, name=\"weights4\") # tied weights\n",
"\n",
"biases1 = tf.Variable(tf.zeros(n_hidden1), name=\"biases1\")\n",
"biases2 = tf.Variable(tf.zeros(n_hidden2), name=\"biases2\")\n",
"biases3 = tf.Variable(tf.zeros(n_hidden3), name=\"biases3\")\n",
"biases4 = tf.Variable(tf.zeros(n_outputs), name=\"biases4\")\n",
"\n",
"hidden1 = activation(tf.matmul(X, weights1) + biases1)\n",
"hidden2 = activation(tf.matmul(hidden1, weights2) + biases2)\n",
"hidden3 = activation(tf.matmul(hidden2, weights3) + biases3)\n",
"outputs = tf.matmul(hidden3, weights4) + biases4\n",
"\n",
"mse = tf.reduce_mean(tf.square(outputs - X))\n",
"reg_loss = regularizer(weights1) + regularizer(weights2)\n",
"loss = mse + reg_loss\n",
"\n",
"optimizer = tf.train.AdamOptimizer(learning_rate)\n",
"training_op = optimizer.minimize(loss)\n",
"\n",
"init = tf.initialize_all_variables()\n",
"saver = tf.train.Saver()"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"n_epochs = 5\n",
"batch_size = 150\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for epoch in range(n_epochs):\n",
" n_batches = mnist.train.num_examples // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" X_batch, y_batch = mnist.train.next_batch(batch_size)\n",
" sess.run(training_op, feed_dict={X: X_batch})\n",
" mse_train = mse.eval(feed_dict={X: X_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train MSE:\", mse_train)\n",
" saver.save(sess, \"my_model_tying_weights.ckpt\")"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"show_reconstructed_digits(X, outputs, \"my_model_tying_weights.ckpt\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Unsupervised pretraining"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"tf.reset_default_graph()\n",
"\n",
"n_inputs = 28 * 28\n",
"n_hidden1 = 300\n",
"n_hidden2 = 150\n",
"n_outputs = 10\n",
"\n",
"learning_rate = 0.01\n",
"l2_reg = 0.0005\n",
"\n",
"activation = tf.nn.elu\n",
"regularizer = tf.contrib.layers.l2_regularizer(l2_reg)\n",
"initializer = tf.contrib.layers.variance_scaling_initializer()\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
"y = tf.placeholder(tf.int32, shape=[None])\n",
"\n",
"weights1_init = initializer([n_inputs, n_hidden1])\n",
"weights2_init = initializer([n_hidden1, n_hidden2])\n",
"weights3_init = initializer([n_hidden2, n_hidden3])\n",
"\n",
"weights1 = tf.Variable(weights1_init, dtype=tf.float32, name=\"weights1\")\n",
"weights2 = tf.Variable(weights2_init, dtype=tf.float32, name=\"weights2\")\n",
"weights3 = tf.Variable(weights3_init, dtype=tf.float32, name=\"weights3\")\n",
"\n",
"biases1 = tf.Variable(tf.zeros(n_hidden1), name=\"biases1\")\n",
"biases2 = tf.Variable(tf.zeros(n_hidden2), name=\"biases2\")\n",
"biases3 = tf.Variable(tf.zeros(n_hidden3), name=\"biases3\")\n",
"\n",
"hidden1 = activation(tf.matmul(X, weights1) + biases1)\n",
"hidden2 = activation(tf.matmul(hidden1, weights2) + biases2)\n",
"logits = tf.matmul(hidden2, weights3) + biases3\n",
"\n",
"cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits, y)\n",
"reg_loss = regularizer(weights1) + regularizer(weights2) + regularizer(weights3)\n",
"loss = cross_entropy + reg_loss\n",
"optimizer = tf.train.AdamOptimizer(learning_rate)\n",
"training_op = optimizer.minimize(loss)\n",
"\n",
"correct = tf.nn.in_top_k(logits, y, 1)\n",
"accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))\n",
"\n",
"init = tf.initialize_all_variables()\n",
"pretrain_saver = tf.train.Saver([weights1, weights2, biases1, biases2])\n",
"saver = tf.train.Saver()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Regular training (without pretraining):"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"n_epochs = 4\n",
"batch_size = 150\n",
"n_labeled_instances = 20000\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for epoch in range(n_epochs):\n",
" n_batches = n_labeled_instances // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" indices = rnd.permutation(n_labeled_instances)[:batch_size]\n",
" X_batch, y_batch = mnist.train.images[indices], mnist.train.labels[indices]\n",
" sess.run(training_op, feed_dict={X: X_batch, y: y_batch})\n",
" accuracy_val = accuracy.eval(feed_dict={X: X_batch, y: y_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train accuracy:\", accuracy_val, end=\" \")\n",
" saver.save(sess, \"my_model_supervised.ckpt\")\n",
" accuracy_val = accuracy.eval(feed_dict={X: mnist.test.images, y: mnist.test.labels})\n",
" print(\"Test accuracy:\", accuracy_val)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now reusing the first two layers of the autoencoder we pretrained:"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"n_epochs = 4\n",
"batch_size = 150\n",
"n_labeled_instances = 20000\n",
"\n",
"#training_op = optimizer.minimize(loss, var_list=[weights3, biases3]) # Freeze layers 1 and 2 (optional)\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" pretrain_saver.restore(sess, \"my_model_cache_frozen.ckpt\")\n",
" for epoch in range(n_epochs):\n",
" n_batches = n_labeled_instances // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" indices = rnd.permutation(n_labeled_instances)[:batch_size]\n",
" X_batch, y_batch = mnist.train.images[indices], mnist.train.labels[indices]\n",
" sess.run(training_op, feed_dict={X: X_batch, y: y_batch})\n",
" accuracy_val = accuracy.eval(feed_dict={X: X_batch, y: y_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train accuracy:\", accuracy_val, end=\"\\t\")\n",
" saver.save(sess, \"my_model_supervised_pretrained.ckpt\")\n",
" accuracy_val = accuracy.eval(feed_dict={X: mnist.test.images, y: mnist.test.labels})\n",
" print(\"Test accuracy:\", accuracy_val)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Stacked denoising Autoencoder"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"tf.reset_default_graph()\n",
"\n",
"from tensorflow.contrib.layers import dropout\n",
"\n",
"n_inputs = 28 * 28\n",
"n_hidden1 = 300\n",
"n_hidden2 = 150 # codings\n",
"n_hidden3 = n_hidden1\n",
"n_outputs = n_inputs\n",
"\n",
"learning_rate = 0.01\n",
"l2_reg = 0.00001\n",
"keep_prob = 0.7\n",
"\n",
"activation = tf.nn.elu\n",
"regularizer = tf.contrib.layers.l2_regularizer(l2_reg)\n",
"initializer = tf.contrib.layers.variance_scaling_initializer()\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
"is_training = tf.placeholder_with_default(False, shape=(), name='is_training')\n",
"\n",
"X_drop = dropout(X, keep_prob, is_training=is_training)\n",
"\n",
"weights1_init = initializer([n_inputs, n_hidden1])\n",
"weights2_init = initializer([n_hidden1, n_hidden2])\n",
"\n",
"weights1 = tf.Variable(weights1_init, dtype=tf.float32, name=\"weights1\")\n",
"weights2 = tf.Variable(weights2_init, dtype=tf.float32, name=\"weights2\")\n",
"weights3 = tf.transpose(weights2, name=\"weights3\") # tied weights\n",
"weights4 = tf.transpose(weights1, name=\"weights4\") # tied weights\n",
"\n",
"biases1 = tf.Variable(tf.zeros(n_hidden1), name=\"biases1\")\n",
"biases2 = tf.Variable(tf.zeros(n_hidden2), name=\"biases2\")\n",
"biases3 = tf.Variable(tf.zeros(n_hidden3), name=\"biases3\")\n",
"biases4 = tf.Variable(tf.zeros(n_outputs), name=\"biases4\")\n",
"\n",
"hidden1 = activation(tf.matmul(X_drop, weights1) + biases1)\n",
"hidden2 = activation(tf.matmul(hidden1, weights2) + biases2)\n",
"hidden3 = activation(tf.matmul(hidden2, weights3) + biases3)\n",
"outputs = tf.matmul(hidden3, weights4) + biases4\n",
"\n",
"optimizer = tf.train.AdamOptimizer(learning_rate)\n",
"mse = tf.reduce_mean(tf.square(outputs - X))\n",
"reg_loss = regularizer(weights1) + regularizer(weights2)\n",
"loss = mse + reg_loss\n",
"training_op = optimizer.minimize(loss)\n",
" \n",
"init = tf.initialize_all_variables()\n",
"saver = tf.train.Saver()"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"n_epochs = 10\n",
"batch_size = 150\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for epoch in range(n_epochs):\n",
" n_batches = mnist.train.num_examples // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" X_batch, y_batch = mnist.train.next_batch(batch_size)\n",
" sess.run(training_op, feed_dict={X: X_batch, is_training: True})\n",
" mse_train = mse.eval(feed_dict={X: X_batch, is_training: False})\n",
" print(\"\\r{}\".format(epoch), \"Train MSE:\", mse_train)\n",
" saver.save(sess, \"my_model_stacked_denoising.ckpt\")"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"show_reconstructed_digits(X, outputs, \"my_model_stacked_denoising.ckpt\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Visualizing the extracted features"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"with tf.Session() as sess:\n",
" saver.restore(sess, \"my_model_stacked_denoising.ckpt\")\n",
" weights1_val = weights1.eval()"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"for i in range(5):\n",
" plt.subplot(1, 5, i + 1)\n",
" plot_image(weights1_val.T[i])\n",
"\n",
"save_fig(\"extracted_features_plot\")\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Sparse Autoencoder"
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"p = 0.1\n",
"q = np.linspace(0, 1, 500)\n",
"kl_div = p * np.log(p / q) + (1 - p) * np.log((1 - p) / (1 - q))\n",
"mse = (p - q)**2\n",
"plt.plot([p, p], [0, 0.3], \"k:\")\n",
"plt.text(0.05, 0.32, \"Target\\nsparsity\", fontsize=14)\n",
"plt.plot(q, kl_div, \"b-\", label=\"KL divergence\")\n",
"plt.plot(q, mse, \"r--\", label=\"MSE\")\n",
"plt.legend(loc=\"upper left\")\n",
"plt.xlabel(\"Actual sparsity\")\n",
"plt.ylabel(\"Cost\", rotation=0)\n",
"plt.axis([0, 1, 0, 0.95])\n",
"save_fig(\"sparsity_loss_plot\")"
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def kl_divergence(p, q):\n",
" \"\"\"Kullback Leibler divergence\"\"\"\n",
" return p * tf.log(p / q) + (1 - p) * tf.log((1 - p) / (1 - q))"
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"tf.reset_default_graph()\n",
"\n",
"n_inputs = 28 * 28\n",
"n_hidden1 = 1000 # sparse codings\n",
"n_outputs = n_inputs\n",
"\n",
"learning_rate = 0.01\n",
"sparsity_target = 0.1\n",
"sparsity_weight = 0.2\n",
"\n",
"#activation = tf.nn.softplus # soft variant of ReLU\n",
"activation = tf.nn.sigmoid\n",
"initializer = tf.contrib.layers.variance_scaling_initializer()\n",
"\n",
"X = tf.placeholder(tf.float32, shape=[None, n_inputs])\n",
"\n",
"weights1_init = initializer([n_inputs, n_hidden1])\n",
"weights2_init = initializer([n_hidden1, n_outputs])\n",
"\n",
"weights1 = tf.Variable(weights1_init, dtype=tf.float32, name=\"weights1\")\n",
"weights2 = tf.Variable(weights2_init, dtype=tf.float32, name=\"weights2\")\n",
"\n",
"biases1 = tf.Variable(tf.zeros(n_hidden1), name=\"biases1\")\n",
"biases2 = tf.Variable(tf.zeros(n_outputs), name=\"biases2\")\n",
"\n",
"hidden1 = activation(tf.matmul(X, weights1) + biases1)\n",
"outputs = tf.matmul(hidden1, weights2) + biases2\n",
"\n",
"optimizer = tf.train.AdamOptimizer(learning_rate)\n",
"mse = tf.reduce_mean(tf.square(outputs - X))\n",
"\n",
"hidden1_mean = tf.reduce_mean(hidden1, reduction_indices=0) # batch mean\n",
"sparsity_loss = tf.reduce_sum(kl_divergence(sparsity_target, hidden1_mean))\n",
"loss = mse + sparsity_weight * sparsity_loss\n",
"training_op = optimizer.minimize(loss)\n",
"\n",
"init = tf.initialize_all_variables()\n",
"saver = tf.train.Saver()"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"n_epochs = 100\n",
"batch_size = 1000\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for epoch in range(n_epochs):\n",
" n_batches = mnist.train.num_examples // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" X_batch, y_batch = mnist.train.next_batch(batch_size)\n",
" sess.run(training_op, feed_dict={X: X_batch})\n",
" mse_val, sparsity_loss_val, loss_val = sess.run([mse, sparsity_loss, loss], feed_dict={X: X_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train MSE:\", mse_val, \"\\tSparsity loss:\", sparsity_loss_val, \"\\tTotal loss:\", loss_val)\n",
" saver.save(sess, \"my_model_sparse.ckpt\")"
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"show_reconstructed_digits(X, outputs, \"my_model_sparse.ckpt\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Variational Autoencoder"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"tf.reset_default_graph()\n",
"\n",
"n_inputs = 28*28\n",
"n_hidden1 = 500\n",
"n_hidden2 = 500\n",
"n_hidden3 = 20 # codings\n",
"n_hidden4 = n_hidden2\n",
"n_hidden5 = n_hidden1\n",
"n_outputs = n_inputs\n",
"\n",
"learning_rate = 0.001\n",
"\n",
"activation = tf.nn.elu\n",
"initializer = tf.contrib.layers.variance_scaling_initializer(mode=\"FAN_AVG\",\n",
" uniform=True)\n",
"\n",
"X = tf.placeholder(tf.float32, [None, n_inputs])\n",
"\n",
"weights1 = tf.Variable(initializer([n_inputs, n_hidden1]))\n",
"weights2 = tf.Variable(initializer([n_hidden1, n_hidden2]))\n",
"weights3_mean = tf.Variable(initializer([n_hidden2, n_hidden3]))\n",
"weights3_log_sigma = tf.Variable(initializer([n_hidden2, n_hidden3]))\n",
"weights4 = tf.Variable(initializer([n_hidden3, n_hidden4]))\n",
"weights5 = tf.Variable(initializer([n_hidden4, n_hidden5]))\n",
"weights6 = tf.Variable(initializer([n_hidden5, n_inputs]))\n",
"\n",
"biases1 = tf.Variable(tf.zeros([n_hidden1], dtype=tf.float32))\n",
"biases2 = tf.Variable(tf.zeros([n_hidden2], dtype=tf.float32))\n",
"biases3_mean = tf.Variable(tf.zeros([n_hidden3], dtype=tf.float32))\n",
"biases3_log_sigma = tf.Variable(tf.zeros([n_hidden3], dtype=tf.float32))\n",
"biases4 = tf.Variable(tf.zeros([n_hidden4], dtype=tf.float32))\n",
"biases5 = tf.Variable(tf.zeros([n_hidden5], dtype=tf.float32))\n",
"biases6 = tf.Variable(tf.zeros([n_inputs], dtype=tf.float32))\n",
"\n",
"hidden1 = activation(tf.matmul(X, weights1) + biases1)\n",
"hidden2 = activation(tf.matmul(hidden1, weights2) + biases2)\n",
"\n",
"hidden3_mean = tf.matmul(hidden2, weights3_mean) + biases3_mean\n",
"hidden3_log_sigma = tf.matmul(hidden2, weights3_log_sigma) + biases3_log_sigma\n",
"noise = tf.random_normal(tf.shape(hidden3_log_sigma), dtype=tf.float32)\n",
"hidden3 = hidden3_mean + tf.sqrt(tf.exp(hidden3_log_sigma)) * noise\n",
"\n",
"hidden4 = activation(tf.matmul(hidden3, weights4) + biases4)\n",
"hidden5 = activation(tf.matmul(hidden4, weights5) + biases5)\n",
"logits = tf.matmul(hidden5, weights6) + biases6\n",
"outputs = tf.sigmoid(logits)\n",
"\n",
"reconstruction_loss = tf.reduce_sum(tf.nn.sigmoid_cross_entropy_with_logits(logits, X))\n",
"latent_loss = 0.5 * tf.reduce_sum(tf.exp(hidden3_log_sigma) + tf.square(hidden3_mean) - 1 - hidden3_log_sigma)\n",
"cost = reconstruction_loss + latent_loss\n",
"\n",
"optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)\n",
"training_op = optimizer.minimize(cost)\n",
"\n",
"init = tf.initialize_all_variables()\n",
"saver = tf.train.Saver()"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"tf.reset_default_graph()\n",
"\n",
"n_inputs = 28*28\n",
"n_hidden1 = 500\n",
"n_hidden2 = 500\n",
"n_hidden3 = 20 # codings\n",
"n_hidden4 = n_hidden2\n",
"n_hidden5 = n_hidden1\n",
"n_outputs = n_inputs\n",
"\n",
"learning_rate = 0.001\n",
"\n",
"initializer = tf.contrib.layers.variance_scaling_initializer()\n",
"\n",
"with tf.contrib.framework.arg_scope([fully_connected],\n",
" activation_fn=tf.nn.elu,\n",
" weights_initializer=initializer):\n",
" X = tf.placeholder(tf.float32, [None, n_inputs])\n",
" hidden1 = fully_connected(X, n_hidden1)\n",
" hidden2 = fully_connected(hidden1, n_hidden2)\n",
" hidden3_mean = fully_connected(hidden2, n_hidden3, activation_fn=None)\n",
" hidden3_gamma = fully_connected(hidden2, n_hidden3, activation_fn=None)\n",
" noise = tf.random_normal(tf.shape(hidden3_gamma), dtype=tf.float32)\n",
" hidden3 = hidden3_mean + tf.exp(0.5 * hidden3_gamma) * noise\n",
" hidden4 = fully_connected(hidden3, n_hidden4)\n",
" hidden5 = fully_connected(hidden4, n_hidden5)\n",
" logits = fully_connected(hidden5, n_outputs, activation_fn=None)\n",
" outputs = tf.sigmoid(logits)\n",
"\n",
"reconstruction_loss = tf.reduce_sum(tf.nn.sigmoid_cross_entropy_with_logits(logits, X))\n",
"latent_loss = 0.5 * tf.reduce_sum(tf.exp(hidden3_gamma) + tf.square(hidden3_mean) - 1 - hidden3_gamma)\n",
"cost = reconstruction_loss + latent_loss\n",
"\n",
"optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)\n",
"training_op = optimizer.minimize(cost)\n",
"\n",
"init = tf.initialize_all_variables()\n",
"saver = tf.train.Saver()"
]
},
{
"cell_type": "code",
"execution_count": 43,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"n_epochs = 50\n",
"batch_size = 150\n",
"\n",
"with tf.Session() as sess:\n",
" init.run()\n",
" for epoch in range(n_epochs):\n",
" n_batches = mnist.train.num_examples // batch_size\n",
" for iteration in range(n_batches):\n",
" print(\"\\r{}%\".format(100 * iteration // n_batches), end=\"\")\n",
" sys.stdout.flush()\n",
" X_batch, y_batch = mnist.train.next_batch(batch_size)\n",
" sess.run(training_op, feed_dict={X: X_batch})\n",
" cost_val, reconstruction_loss_val, latent_loss_val = sess.run([cost, reconstruction_loss, latent_loss], feed_dict={X: X_batch})\n",
" print(\"\\r{}\".format(epoch), \"Train cost:\", cost_val, \"\\tReconstruction loss:\", reconstruction_loss_val, \"\\tLatent loss:\", latent_loss_val)\n",
" saver.save(sess, \"my_model_variational.ckpt\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Encode:"
]
},
{
"cell_type": "code",
"execution_count": 44,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"n_digits = 3\n",
"X_test, y_test = mnist.test.next_batch(batch_size)\n",
"codings = hidden3\n",
"\n",
"with tf.Session() as sess:\n",
" saver.restore(sess, \"my_model_variational.ckpt\")\n",
" codings_val = codings.eval(feed_dict={X: X_test})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Decode:"
]
},
{
"cell_type": "code",
"execution_count": 45,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"with tf.Session() as sess:\n",
" saver.restore(sess, \"my_model_variational.ckpt\")\n",
" outputs_val = outputs.eval(feed_dict={codings: codings_val})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's plot the reconstructions:"
]
},
{
"cell_type": "code",
"execution_count": 46,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"fig = plt.figure(figsize=(8, 2.5 * n_digits))\n",
"for iteration in range(n_digits):\n",
" plt.subplot(n_digits, 2, 1 + 2 * iteration)\n",
" plot_image(X_test[iteration])\n",
" plt.subplot(n_digits, 2, 2 + 2 * iteration)\n",
" plot_image(outputs_val[iteration])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Generate digits"
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"n_rows = 6\n",
"n_cols = 10\n",
"n_digits = n_rows * n_cols\n",
"codings_rnd = np.random.normal(size=[n_digits, n_hidden3])\n",
"\n",
"with tf.Session() as sess:\n",
" saver.restore(sess, \"my_model_variational.ckpt\")\n",
" outputs_val = outputs.eval(feed_dict={codings: codings_rnd})"
]
},
{
"cell_type": "code",
"execution_count": 48,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"plot_multiple_images(outputs_val.reshape(-1, 28, 28), n_rows, n_cols)\n",
"save_fig(\"generated_digits_plot\")\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": 49,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"n_rows = 6\n",
"n_cols = 10\n",
"n_digits = n_rows * n_cols\n",
"codings_rnd = np.random.normal(size=[n_digits, n_hidden3])\n",
"\n",
"with tf.Session() as sess:\n",
" saver.restore(sess, \"my_model_variational.ckpt\")\n",
" outputs_val = outputs.eval(feed_dict={codings: codings_rnd})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Interpolate digits"
]
},
{
"cell_type": "code",
"execution_count": 50,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [],
"source": [
"n_iterations = 3\n",
"n_digits = 6\n",
"codings_rnd = np.random.normal(size=[n_digits, n_hidden3])\n",
"\n",
"with tf.Session() as sess:\n",
" saver.restore(sess, \"my_model_variational.ckpt\")\n",
" target_codings = np.roll(codings_rnd, -1, axis=0)\n",
" for iteration in range(n_iterations + 1):\n",
" codings_interpolate = codings_rnd + (target_codings - codings_rnd) * iteration / n_iterations\n",
" outputs_val = outputs.eval(feed_dict={codings: codings_interpolate})\n",
" plt.figure(figsize=(11, 1.5*n_iterations))\n",
" for digit_index in range(n_digits):\n",
" plt.subplot(1, n_digits, digit_index + 1)\n",
" plot_image(outputs_val[digit_index])\n",
" plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"# Exercise solutions"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Coming soon..."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.1"
},
"nav_menu": {
"height": "381px",
"width": "453px"
},
"toc": {
"navigate_menu": true,
"number_sections": true,
"sideBar": true,
"threshold": 6,
"toc_cell": false,
"toc_section_display": "block",
"toc_window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 1
}