实现 LSTM 模型
我们将扩展我们的 RNN 模型,以便通过在此秘籍中引入 LSTM 单元来使用更长的序列。
做好准备
长短期记忆(LSTM)是传统 RNN 的变体。 LSTM 是一种解决可变长度 RNN 所具有的消失/爆炸梯度问题的方法。为了解决这个问题,LSTM 单元引入了一个内部遗忘门,它可以修改从一个单元到下一个单元的信息流。为了概念化它的工作原理,我们将逐步介绍一个无偏置的 LSTM 方程式。第一步与常规 RNN 相同:
为了确定我们想要忘记或通过的值,我们将如下评估候选值。这些值通常称为存储单元:
现在我们用一个遗忘矩阵修改候选存储单元,其计算方法如下:
我们现在将遗忘存储器与先前的存储器步骤相结合,并将其添加到候选存储器单元以获得新的存储器值:
现在我们将所有内容组合起来以获取单元格的输出:
然后,对于下一次迭代,我们更新 h 如下:
LSTM 的想法是通过基于输入到细胞的信息可以忘记或修改的细胞具有自我调节的信息流。
在这里使用 TensorFlow 的一个好处是我们不必跟踪这些操作及其相应的反向传播属性。 TensorFlow 将跟踪这些并根据我们的损失函数,优化器和学习率指定的梯度自动更新模型变量。
对于这个秘籍,我们将使用具有 LSTM 细胞的序列 RNN 来尝试预测接下来的单词,对莎士比亚的作品进行训练。为了测试我们的工作方式,我们将提供模型候选短语,例如thou art more
,并查看模型是否可以找出短语后面应该包含的单词。
操作步骤
- 首先,我们为脚本加载必要的库:
import os
import re
import string
import requests
import numpy as np
import collections
import random
import pickle
import matplotlib.pyplot as plt
import tensorflow as tf
- 接下来,我们启动图会话并设置 RNN 参数:
sess = tf.Session()
# Set RNN Parameters
min_word_freq = 5
rnn_size = 128
epochs = 10
batch_size = 100
learning_rate = 0.001
training_seq_len = 50
embedding_size = rnn_size
save_every = 500
eval_every = 50
prime_texts = ['thou art more', 'to be or not to', 'wherefore art thou']
- 我们设置数据和模型文件夹和文件名,同时声明要删除的标点符号。我们希望保留连字符和撇号,因为莎士比亚经常使用它们来组合单词和音节:
data_dir = 'temp'
data_file = 'shakespeare.txt'
model_path = 'shakespeare_model'
full_model_dir = os.path.join(data_dir, model_path)
# Declare punctuation to remove, everything except hyphens and apostrophe's
punctuation = string.punctuation
punctuation = ''.join([x for x in punctuation if x not in ['-', "'"]])
- 接下来,我们获取数据。如果数据文件不存在,我们下载并保存莎士比亚文本。如果确实存在,我们加载数据:
if not os.path.exists(full_model_dir):
os.makedirs(full_model_dir)
# Make data directory
if not os.path.exists(data_dir):
os.makedirs(data_dir)
print('Loading Shakespeare Data')
# Check if file is downloaded.
if not os.path.isfile(os.path.join(data_dir, data_file)):
print('Not found, downloading Shakespeare texts from www.gutenberg.org')
shakespeare_url = 'http://www.gutenberg.org/cache/epub/100/pg100.txt'
# Get Shakespeare text
response = requests.get(shakespeare_url)
shakespeare_file = response.content
# Decode binary into string
s_text = shakespeare_file.decode('utf-8')
# Drop first few descriptive paragraphs.
s_text = s_text[7675:]
# Remove newlines
s_text = s_text.replace('\r\n', '')
s_text = s_text.replace('\n', '')
# Write to file
with open(os.path.join(data_dir, data_file), 'w') as out_conn:
out_conn.write(s_text)
else:
# If file has been saved, load from that file
with open(os.path.join(data_dir, data_file), 'r') as file_conn:
s_text = file_conn.read().replace('\n', '')
- 我们通过删除标点符号和额外的空格来清理莎士比亚的文本:
s_text = re.sub(r'[{}]'.format(punctuation), ' ', s_text)
s_text = re.sub('s+', ' ', s_text ).strip().lower()
- 我们现在处理创建要使用的莎士比亚词汇。我们创建一个函数,它将返回两个字典(单词到索引和索引到单词),其中的单词出现的频率超过指定的频率:
def build_vocab(text, min_word_freq):
word_counts = collections.Counter(text.split(' '))
# limit word counts to those more frequent than cutoff
word_counts = {key:val for key, val in word_counts.items() if val>min_word_freq}
# Create vocab --> index mapping
words = word_counts.keys()
vocab_to_ix_dict = {key:(ix+1) for ix, key in enumerate(words)}
# Add unknown key --> 0 index
vocab_to_ix_dict['unknown']=0
# Create index --> vocab mapping
ix_to_vocab_dict = {val:key for key,val in vocab_to_ix_dict.items()}
return ix_to_vocab_dict, vocab_to_ix_dict
ix2vocab, vocab2ix = build_vocab(s_text, min_word_freq)
vocab_size = len(ix2vocab) + 1
请注意,在处理文本时,我们必须小心索引值为零的单词。我们应该保存填充的零值,也可能保存未知单词。
- 现在我们有了词汇量,我们将莎士比亚的文本变成了一系列索引:
s_text_words = s_text.split(' ')
s_text_ix = []
for ix, x in enumerate(s_text_words):
try:
s_text_ix.append(vocab2ix[x])
except:
s_text_ix.append(0)
s_text_ix = np.array(s_text_ix)
- 在本文中,我们将展示如何在类对象中创建模型。这对我们很有帮助,因为我们希望使用相同的模型(具有相同的权重)来批量训练并从示例文本生成文本。如果没有采用内部抽样方法的课程,这将很难做到。理想情况下,此类代码应位于单独的 Python 文件中,我们可以在此脚本的开头导入该文件:
class LSTM_Model():
def __init__(self, rnn_size, batch_size, learning_rate,
training_seq_len, vocab_size, infer =False):
self.rnn_size = rnn_size
self.vocab_size = vocab_size
self.infer = infer
self.learning_rate = learning_rate
if infer:
self.batch_size = 1
self.training_seq_len = 1
else:
self.batch_size = batch_size
self.training_seq_len = training_seq_len
self.lstm_cell = tf.nn.rnn_cell.BasicLSTMCell(rnn_size)
self.initial_state = self.lstm_cell.zero_state(self.batch_size, tf.float32)
self.x_data = tf.placeholder(tf.int32, [self.batch_size, self.training_seq_len])
self.y_output = tf.placeholder(tf.int32, [self.batch_size, self.training_seq_len])
with tf.variable_scope('lstm_vars'):
# Softmax Output Weights
W = tf.get_variable('W', [self.rnn_size, self.vocab_size], tf.float32, tf.random_normal_initializer())
b = tf.get_variable('b', [self.vocab_size], tf.float32, tf.constant_initializer(0.0))
# Define Embedding
embedding_mat = tf.get_variable('embedding_mat', [self.vocab_size, self.rnn_size], tf.float32, tf.random_normal_initializer())
embedding_output = tf.nn.embedding_lookup(embedding_mat, self.x_data)
rnn_inputs = tf.split(embedding_output, num_or_size_splits=self.training_seq_len, axis=1)
rnn_inputs_trimmed = [tf.squeeze(x, [1]) for x in rnn_inputs]
# If we are inferring (generating text), we add a 'loop' function
# Define how to get the i+1 th input from the i th output
def inferred_loop(prev, count):
prev_transformed = tf.matmul(prev, W) + b
prev_symbol = tf.stop_gradient(tf.argmax(prev_transformed, 1))
output = tf.nn.embedding_lookup(embedding_mat, prev_symbol)
return output
decoder = tf.nn.seq2seq.rnn_decoder
outputs, last_state = decoder(rnn_inputs_trimmed,
self.initial_state,
self.lstm_cell,
loop_function=inferred_loop if infer else None)
# Non inferred outputs
output = tf.reshape(tf.concat(1, outputs), [-1, self.rnn_size])
# Logits and output
self.logit_output = tf.matmul(output, W) + b
self.model_output = tf.nn.softmax(self.logit_output)
loss_fun = tf.contrib.legacy_seq2seq.sequence_loss_by_example
loss = loss_fun([self.logit_output],[tf.reshape(self.y_output, [-1])],
[tf.ones([self.batch_size * self.training_seq_len])],
self.vocab_size)
self.cost = tf.reduce_sum(loss) / (self.batch_size * self.training_seq_len)
self.final_state = last_state
gradients, _ = tf.clip_by_global_norm(tf.gradients(self.cost, tf.trainable_variables()), 4.5)
optimizer = tf.train.AdamOptimizer(self.learning_rate)
self.train_op = optimizer.apply_gradients(zip(gradients, tf.trainable_variables()))
def sample(self, sess, words=ix2vocab, vocab=vocab2ix, num=10, prime_text='thou art'):
state = sess.run(self.lstm_cell.zero_state(1, tf.float32))
word_list = prime_text.split()
for word in word_list[:-1]:
x = np.zeros((1, 1))
x[0, 0] = vocab[word]
feed_dict = {self.x_data: x, self.initial_state:state}
[state] = sess.run([self.final_state], feed_dict=feed_dict)
out_sentence = prime_text
word = word_list[-1]
for n in range(num):
x = np.zeros((1, 1))
x[0, 0] = vocab[word]
feed_dict = {self.x_data: x, self.initial_state:state}
[model_output, state] = sess.run([self.model_output, self.final_state], feed_dict=feed_dict)
sample = np.argmax(model_output[0])
if sample == 0:
break
word = words[sample]
out_sentence = out_sentence + ' ' + word
return out_sentence
- 现在我们将声明 LSTM 模型以及测试模型。我们将在变量范围内执行此操作,并告诉范围我们将重用测试 LSTM 模型的变量:
with tf.variable_scope('lstm_model', reuse=tf.AUTO_REUSE) as scope:
# Define LSTM Model
lstm_model = LSTM_Model(rnn_size, batch_size, learning_rate,
training_seq_len, vocab_size)
scope.reuse_variables()
test_lstm_model = LSTM_Model(rnn_size, batch_size, learning_rate,
training_seq_len, vocab_size, infer=True)
- 我们创建一个保存操作,并将输入文本拆分为相等的批量大小的块。然后我们初始化模型的变量:
saver = tf.train.Saver()
# Create batches for each epoch
num_batches = int(len(s_text_ix)/(batch_size * training_seq_len)) + 1
# Split up text indices into subarrays, of equal size
batches = np.array_split(s_text_ix, num_batches)
# Reshape each split into [batch_size, training_seq_len]
batches = [np.resize(x, [batch_size, training_seq_len]) for x in batches]
# Initialize all variables
init = tf.global_variables_initializer()
sess.run(init)
- 我们现在可以遍历我们的周期,在每个周期开始之前对数据进行混洗。我们数据的目标只是相同的数据,但是移动了 1(使用
numpy.roll()
函数):
train_loss = []
iteration_count = 1
for epoch in range(epochs):
# Shuffle word indices
random.shuffle(batches)
# Create targets from shuffled batches
targets = [np.roll(x, -1, axis=1) for x in batches]
# Run a through one epoch
print('Starting Epoch #{} of {}.'.format(epoch+1, epochs))
# Reset initial LSTM state every epoch
state = sess.run(lstm_model.initial_state)
for ix, batch in enumerate(batches):
training_dict = {lstm_model.x_data: batch, lstm_model.y_output: targets[ix]}
c, h = lstm_model.initial_state
training_dict[c] = state.c
training_dict[h] = state.h
temp_loss, state, _ = sess.run([lstm_model.cost, lstm_model.final_state, lstm_model.train_op], feed_dict=training_dict)
train_loss.append(temp_loss)
# Print status every 10 gens
if iteration_count % 10 == 0:
summary_nums = (iteration_count, epoch+1, ix+1, num_batches+1, temp_loss)
print('Iteration: {}, Epoch: {}, Batch: {} out of {}, Loss: {:.2f}'.format(*summary_nums))
# Save the model and the vocab
if iteration_count % save_every == 0:
# Save model
model_file_name = os.path.join(full_model_dir, 'model')
saver.save(sess, model_file_name, global_step = iteration_count)
print('Model Saved To: {}'.format(model_file_name))
# Save vocabulary
dictionary_file = os.path.join(full_model_dir, 'vocab.pkl')
with open(dictionary_file, 'wb') as dict_file_conn:
pickle.dump([vocab2ix, ix2vocab], dict_file_conn)
if iteration_count % eval_every == 0:
for sample in prime_texts:
print(test_lstm_model.sample(sess, ix2vocab, vocab2ix, num=10, prime_text=sample))
iteration_count += 1
- 这导致以下输出:
Loading Shakespeare Data
Cleaning Text
Building Shakespeare Vocab
Vocabulary Length = 8009
Starting Epoch #1 of 10\.
Iteration: 10, Epoch: 1, Batch: 10 out of 182, Loss: 10.37
Iteration: 20, Epoch: 1, Batch: 20 out of 182, Loss: 9.54
...
Iteration: 1790, Epoch: 10, Batch: 161 out of 182, Loss: 5.68
Iteration: 1800, Epoch: 10, Batch: 171 out of 182, Loss: 6.05
thou art more than i am a
to be or not to the man i have
wherefore art thou art of the long
Iteration: 1810, Epoch: 10, Batch: 181 out of 182, Loss: 5.99
- 最后,以下是我们如何绘制历史上的训练损失:
plt.plot(train_loss, 'k-')
plt.title('Sequence to Sequence Loss')
plt.xlabel('Generation')
plt.ylabel('Loss')
plt.show()
This results in the following plot of our loss values:
图 4:模型所有代的序列到序列损失
工作原理
在这个例子中,我们基于莎士比亚词汇构建了一个带有 LSTM 单元的 RNN 模型来预测下一个单词。可以采取一些措施来改进模型,可能会增加序列大小,具有衰减的学习率,或者训练模型以获得更多的周期。
更多
为了抽样,我们实现了一个贪婪的采样器。贪婪的采样器可能会一遍又一遍地重复相同的短语;例如,他们可能会卡住for the for the
for the....
为了防止这种情况,我们还可以实现一种更随机的采样方式,可能是根据输出的对数或概率分布制作加权采样器。