Thanks for an awesome set of tutorials! I was tinkering with this a bit, trying to merge the chapter on convolutional autoencoders with tied weights (09) and the variational autoencoder (11). The adapted (messy!) code goes below. Everything works fine, except for the part where example reconstructions are made from latent representations: there, tensorflow now complains that I do not provide a value for x
in the line: recon = sess.run(ae['y'], feed_dict={ae['z']: z})
, which results in:
tensorflow.python.framework.errors.InvalidArgumentError: You must feed a value for placeholder tensor 'x' with dtype float
[[Node: x = Placeholder[dtype=DT_FLOAT, shape=[], _device="/job:localhost/replica:0/task:0/cpu:0"]()]]
etc.
So I started wondering: is it possible at all to use the architecture for this kind of reconstruction purpose (because of the weight tying?), or am I missing an obvious workaround?
Thanks in advance for any help!
import tensorflow as tf
import math
from libs.activations import lrelu
import numpy as np
from libs.utils import weight_variable, bias_variable, montage_batch
# %%
def VAE(input_shape=[None, 28, 28, 1],
n_filters=[1, 10, 10, 10],
filter_sizes=[3, 3, 3, 3],
n_hidden=2,
activation = lrelu):
# %%
# input to the network
x = tf.placeholder(shape=input_shape, dtype=tf.float32, name='x')
x_tensor = x
current_input = x_tensor
encoder, shapes = [], []
for layer_i, n_output in enumerate(n_filters[1:]):
n_input = current_input.get_shape().as_list()[3]
shapes.append(current_input.get_shape().as_list())
W = tf.Variable(
tf.random_uniform([
filter_sizes[layer_i],
filter_sizes[layer_i],
n_input, n_output],
-1.0 / math.sqrt(n_input),
1.0 / math.sqrt(n_input)))
b = tf.Variable(tf.zeros([n_output]))
encoder.append(W)
output = lrelu(
tf.add(tf.nn.conv2d(
current_input, W, strides=[1, 2, 2, 1], padding='SAME'), b))
current_input = output
dims = current_input.get_shape().as_list()
nb_flat = dims[1] * dims[2] * dims[3]
flattened = tf.reshape(current_input, [-1, nb_flat])
###############################################
W_mu = weight_variable([nb_flat, n_hidden])
b_mu = bias_variable([n_hidden])
W_log_sigma = weight_variable([nb_flat, n_hidden])
b_log_sigma = bias_variable([n_hidden])
z_mu = tf.matmul(flattened, W_mu) + b_mu
z_log_sigma = 0.5 * (tf.matmul(flattened, W_log_sigma) + b_log_sigma)
###############################################
###############################################
# %%
# Sample from noise distribution p(eps) ~ N(0, 1)
#epsilon = tf.random_normal(
# tf.pack([tf.shape(x)[0], n_hidden]))
epsilon = tf.random_normal(
[1, n_hidden])
# Sample from posterior
z = z_mu + tf.exp(z_log_sigma) * epsilon
print(z.get_shape(), '+++')
###############################################
W_ = tf.transpose(W_mu)
b_ = tf.Variable(tf.zeros([nb_flat]))
dense = tf.nn.tanh(tf.matmul(z, W_) + b_)
current_input = tf.reshape(dense, [-1, dims[1], dims[2], dims[3]])
print(dims)
###############################################
encoder.reverse()
shapes.reverse()
# Build the decoder using the same weights
for layer_i, shape in enumerate(shapes):
W = encoder[layer_i]
b = tf.Variable(tf.zeros([W.get_shape().as_list()[2]]))
output = lrelu(tf.add(
tf.nn.conv2d_transpose(
current_input, W,
tf.pack([tf.shape(x)[0], shape[1], shape[2], shape[3]]),
strides=[1, 2, 2, 1], padding='SAME'), b))
current_input = output
# now have the reconstruction through the network
y = current_input
actual_cost = tf.reduce_sum(tf.square(y - x_tensor))
kl_div = -0.5 * tf.reduce_sum(
1.0 + 2.0 * z_log_sigma - tf.square(z_mu) - tf.exp(2.0 * z_log_sigma),
1)
loss = tf.reduce_mean(actual_cost + kl_div)
###############################################
return {'cost': loss, 'x': x, 'z': z, 'y': y}
# %%
def test_mnist():
"""Summary
Returns
-------
name : TYPE
Description
"""
# %%
import tensorflow as tf
import tensorflow.examples.tutorials.mnist.input_data as input_data
import matplotlib.pyplot as plt
# %%
# load MNIST as before
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
ae = VAE()
# %%
learning_rate = 0.001
optimizer = tf.train.AdamOptimizer(learning_rate).minimize(ae['cost'])
# %%
# We create a session to use the graph
sess = tf.Session()
sess.run(tf.initialize_all_variables())
# %%
# Fit all training data
t_i = 0
batch_size = 100
n_epochs = 50
n_examples = 10
test_xs, _ = mnist.test.next_batch(n_examples)
xs, ys = mnist.test.images, mnist.test.labels
test_xs = test_xs.reshape((n_examples, 28, 28, 1))
xs = xs.reshape((xs.shape[0], 28, 28, 1))
fig_manifold, ax_manifold = plt.subplots(1, 1)
fig_reconstruction, axs_reconstruction = plt.subplots(2, n_examples, figsize=(10, 2))
fig_image_manifold, ax_image_manifold = plt.subplots(1, 1)
for epoch_i in range(n_epochs):
print('--- Epoch', epoch_i)
train_cost = 0
for batch_i in range(mnist.train.num_examples // batch_size):
batch_xs, _ = mnist.train.next_batch(batch_size)
batch_xs = batch_xs.reshape((batch_size, 28, 28, 1))
train_cost += sess.run([ae['cost'], optimizer],
feed_dict={ae['x']: batch_xs,})[0]
if batch_i % 20 == 0:
# Plot example reconstructions from latent layer
imgs = []
for img_i in np.linspace(-3, 3, n_examples):
for img_j in np.linspace(-3, 3, n_examples):
z = np.array([[img_i, img_j]], dtype=np.float32)
print(z)
print(z.dtype)
recon = sess.run(ae['y'], feed_dict={ae['z']: z})
imgs.append(np.reshape(recon, (1, 28, 28, 1)))
imgs_cat = np.concatenate(imgs)
ax_manifold.imshow(montage_batch(imgs_cat))
fig_manifold.savefig('vizes/manifold_%08d.png' % t_i)
# Plot example reconstructions
recon = sess.run(ae['y'], feed_dict={ae['x']: test_xs})
for example_i in range(n_examples):
axs_reconstruction[0][example_i].imshow(
np.reshape(test_xs[example_i, :], (28, 28)))
axs_reconstruction[1][example_i].imshow(
np.reshape(
np.reshape(recon[example_i, ...], (784,)),
(28, 28)))
axs_reconstruction[0][example_i].axis('off')
axs_reconstruction[1][example_i].axis('off')
fig_reconstruction.savefig('vizes/reconstruction_%08d.png' % t_i)
# %%
# Plot manifold of latent layer
zs = sess.run(ae['z'], feed_dict={ae['x']: xs})
ax_image_manifold.clear()
ax_image_manifold.scatter(zs[:, 0], zs[:, 1],
c=np.argmax(ys, 1), alpha=0.2)
ax_image_manifold.set_xlim([-6, 6])
ax_image_manifold.set_ylim([-6, 6])
ax_image_manifold.axis('off')
fig_image_manifold.savefig('vizes/image_manifold_%08d.png' % t_i)
t_i += 1
print('Train cost:', train_cost /
(mnist.train.num_examples // batch_size))
valid_cost = 0
for batch_i in range(mnist.validation.num_examples // batch_size):
batch_xs, _ = mnist.validation.next_batch(batch_size)
batch_xs = batch_xs.reshape((batch_size, 28, 28, 1))
valid_cost += sess.run([ae['cost']],
feed_dict={ae['x']: batch_xs})[0]
print('Validation cost:', valid_cost /
(mnist.validation.num_examples // batch_size))
if __name__ == '__main__':
test_mnist()