# -*- coding: utf-8 -*-
"""
@from:www.linerect.com
@author: duimu
@contact: duimu@qq.com
@Created on: 2019/12/2 16:11
"""
#导入所有必要的模块
import tensorflow as tf
import numpy as np
from tensorflow.examples.tutorials.mnist import input_data
import matplotlib.pyplot as plt
#从 TensorFlow 中获取 MNIST 数据,这里要注意的一点是,标签并没有进行独热编码,因为并没有使用标签来训练网络。
# 自动编码机是通过无监督学习进行训练的:
mnist=input_data.read_data_sets("mnist/")
trX,trY,teX,teY=mnist.train.images,mnist.train.labels,mnist.test.images,mnist.test.labels
#声明 AutoEncoder 类,使用 init 方法初始化自动编码机的权重、偏置和占位符,
# 也可以在 init 方法中构建全部的计算图。还需要定义编码器、解码器,set_session(会话建立)和 fit 方法。
# 此处构建的自动编码机使用简单的均方误差作为损失函数,使用 AdamOptimizer 进行优化:
class AutoEncoder(object):
def __init__(self,m,n,eta=0.01):
"""
:param m: number of neurons in input/output layer
:param n: number of neurons in hidder layer
:param eta:
:return:
"""
self._m=m
self._n=n
self.learning_rate=eta
self._w1=tf.Variable(tf.random_normal(shape=(self._m,self._n)))
self._b1 = tf.Variable(np.zeros(self._n).astype(np.float32))
self._w2=tf.Variable(tf.random_normal(shape=(self._n, self._m)))
self._b2 = tf.Variable(np.zeros(self._m).astype(np.float32))
self._X=tf.placeholder('float',[None,self._m])
self.y=self.encoder(self._X)
self.r=self.decoder(self.y)
error=self._X-self.r
self._loss=tf.reduce_mean(tf.pow(error,2))
self._opt=tf.train.AdamOptimizer(self.learning_rate).minimize(self._loss)
def encoder(self,x):
h = tf.matmul(x, self._w1) + self._b1
return tf.nn.sigmoid(h)
def decoder(self,x):
h=tf.matmul(x,self._w2)+self._b2
return tf.nn.sigmoid(h)
def set_session(self,session):
self.session=session
def reduced_dimension(self,x):
h=self.encoder(x)
return self.session.run(h,feed_dict={self._X:x})
def reconstruct(self,x):
h=self.encoder(x)
r=self.decoder(h)
return self.session.run(r,feed_dict={self._X:x})
def fit(self,X,epochs=1,batch_size=100):
N,D=X.shape
num_batches=N//batch_size
obj=[]
for i in range(epochs):
for j in range(num_batches):
batch=X[j*batch_size:(j*batch_size+batch_size)]
_,ob=self.session.run([self._opt,self._loss],feed_dict={self._X:batch})
if j%100==0 and i%100==0:
print('training epoch {0} batch {2} cost {1}'.format(i,ob,j))
obj.append(ob)
return obj
#训练时将输入数据转换为 float 型,初始化所有变量并运行会话。在计算时,目前只是测试自动编码机的重构能力:
Xtrain=trX.astype(np.float32)
Xtest=teX.astype(np.float32)
_,m=Xtrain.shape
autoEncoder=AutoEncoder(m,256)
init=tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init)
autoEncoder.set_session(sess)
err=autoEncoder.fit(Xtrain,epochs=10)
out=autoEncoder.reconstruct(Xtest[0:100])
#绘制误差在训练周期中的变化图,验证网络的均方误差在训练时是否得到优化,对于一个好的训练,误差应该随着训练周期的增加而减少
plt.plot(err)
plt.xlabel('epochs')
plt.ylabel('cost')
plt.show()
#观察重构的图像,对比原始图像和自动编码机生成的重构图像
row,col=2,8
idx=np.random.randint(0,100,row*col//2)
f,axarr=plt.subplots(row,col,sharex=True,sharey=True,figsize=(20,4))
for fig,row in zip([Xtest,out],axarr):
for i,ax in zip(idx,row):
ax.imshow(fig[i].reshape((28,28)),cmap='Greys_r')
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
#结果 --------------------------
training epoch 0 batch 0 cost 0.42432427406311035
training epoch 0 batch 100 cost 0.06590526551008224
training epoch 0 batch 200 cost 0.05194160342216492
training epoch 0 batch 300 cost 0.04785095900297165
training epoch 0 batch 400 cost 0.04127230867743492
training epoch 0 batch 500 cost 0.041603896766901016