基于pytorch的LSTM进行字符级文本生成实战

您所在的位置:网站首页 pytorch英文单词 基于pytorch的LSTM进行字符级文本生成实战

基于pytorch的LSTM进行字符级文本生成实战

2023-12-28 08:45| 来源: 网络整理| 查看: 265

基于pytorch的LSTM进行字符级文本生成实战

文章目录 基于pytorch的LSTM进行字符级文本生成实战前言一、数据集二、代码实现1.导入库及LSTM模型构建2.数据预处理函数3.训练函数4.预测函数5.文本生成函数6.主函数 总结完整代码后续

前言

随着人工智能的不断发展,机器学习这门技术也越来越重要,很多人都开启了学习机器学习,本文就介绍了机器学习中深度学习的内容使用pytorch构建LSTM模型进行字符级文本生成任务

一、数据集

我已经上传到我的资源里了,0积分放心下载 https://download.csdn.net/download/qq_52785473/78428834

二、代码实现 1.导入库及LSTM模型构建

代码如下:

# coding: utf-8 import torch import torch.nn as nn import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.preprocessing import OneHotEncoder import torch.nn.functional as F class lstm_model(nn.Module): def __init__(self, vocab, hidden_size, num_layers, dropout=0.5): super(lstm_model, self).__init__() self.vocab = vocab # 字符数据集 # 索引,字符 self.int_char = {i: char for i, char in enumerate(vocab)} self.char_int = {char: i for i, char in self.int_char.items()} # 对字符进行one-hot encoding self.encoder = OneHotEncoder(sparse=True).fit(vocab.reshape(-1, 1)) self.hidden_size = hidden_size self.num_layers = num_layers # lstm层 self.lstm = nn.LSTM(len(vocab), hidden_size, num_layers, batch_first=True, dropout=dropout) # 全连接层 self.linear = nn.Linear(hidden_size, len(vocab)) def forward(self, sequence, hs=None): out, hs = self.lstm(sequence, hs) # lstm的输出格式(batch_size, sequence_length, hidden_size) out = out.reshape(-1, self.hidden_size) # 这里需要将out转换为linear的输入格式,即(batch_size * sequence_length, hidden_size) output = self.linear(out) # linear的输出格式,(batch_size * sequence_length, vocab_size) return output, hs def onehot_encode(self, data): # 对数据进行编码 return self.encoder.transform(data) def onehot_decode(self, data): # 对数据进行解码 return self.encoder.inverse_transform(data) def label_encode(self, data): # 对标签进行编码 return np.array([self.char_int[ch] for ch in data]) def label_decode(self, data): # 对标签进行解码 return np.array([self.int_char[ch] for ch in data]) 2.数据预处理函数

代码如下:

def get_batches(data, batch_size, seq_len): ''' :param data: 源数据,输入格式(num_samples, num_features) :param batch_size: batch的大小 :param seq_len: 序列的长度(精度) :return: (batch_size, seq_len, num_features) ''' num_features = data.shape[1] num_chars = batch_size * seq_len # 一个batch_size的长度 num_batches = int(np.floor(data.shape[0] / num_chars)) # 计算出有多少个batches need_chars = num_batches * num_chars # 计算出需要的总字符量 targets = np.vstack((data[1:].A, data[0].A)) # 可能版本问题,取成numpy比较好reshape inputs = data[:need_chars].A.astype("int") # 从原始数据data中截取所需的字符数量need_words targets = targets[:need_chars] targets = targets.reshape(batch_size, -1, num_features) inputs = inputs.reshape(batch_size, -1, num_features) for i in range(0, inputs.shape[1], seq_len): x = inputs[:, i: i+seq_len] y = targets[:, i: i+seq_len] yield x, y # 节省内存 3.训练函数 def train(model, data, batch_size, seq_len, epochs, lr=0.01, valid=None): device = 'cuda' if torch.cuda.is_available() else 'cpu' model = model.to(device) optimizer = torch.optim.Adam(model.parameters(), lr=lr) criterion = nn.CrossEntropyLoss() if valid is not None: data = model.onehot_encode(data.reshape(-1, 1)) valid = model.onehot_encode(valid.reshape(-1, 1)) else: data = model.onehot_encode(data.reshape(-1, 1)) train_loss = [] val_loss = [] for epoch in range(epochs): model.train() hs = None # hs等于hidden_size隐藏层节点 train_ls = 0.0 val_ls = 0.0 for x, y in get_batches(data, batch_size, seq_len): optimizer.zero_grad() x = torch.tensor(x).float().to(device) out, hs = model(x, hs) hs = ([h.data for h in hs]) y = y.reshape(-1, len(model.vocab)) y = model.onehot_decode(y) y = model.label_encode(y.squeeze()) y = torch.from_numpy(y).long().to(device) loss = criterion(out, y.squeeze()) loss.backward() optimizer.step() train_ls += loss.item() if valid is not None: model.eval() hs = None with torch.no_grad(): for x, y in get_batches(valid, batch_size, seq_len): x = torch.tensor(x).float().to(device) # x为一组测试数据,包含batch_size * seq_len个字 out, hs = model(x, hs) # out.shape输出为tensor[batch_size * seq_len, vocab_size] hs = ([h.data for h in hs]) # 更新参数 y = y.reshape(-1, len(model.vocab)) # y.shape为(128,100,43),因此需要转成两维,每行就代表一个字了,43为字典大小 y = model.onehot_decode(y) # y标签即为测试数据各个字的下一个字,进行one_hot解码,即变为字符 # 但是此时y 是[[..],[..]]形式 y = model.label_encode(y.squeeze()) # 因此需要去掉一维才能成功解码 # 此时y为[12...]成为一维的数组,每个代表自己字典里对应字符的字典序 y = torch.from_numpy(y).long().to(device) # 这里y和y.squeeze()出来的东西一样,可能这里没啥用,不太懂 loss = criterion(out, y.squeeze()) # 计算损失值 val_ls += loss.item() val_loss.append(np.mean(val_ls)) train_loss.append(np.mean(train_ls)) print("train_loss:", train_ls) plt.plot(train_loss, label="train_loss") plt.plot(val_loss, label="val loss") plt.title("loop vs epoch") plt.legend() plt.show() model_name = "lstm_model.net" with open(model_name, 'wb') as f: # 训练完了保存模型 torch.save(model.state_dict(), f) 4.预测函数 def predict(model, char, top_k=None, hidden_size=None): device = 'cuda' if torch.cuda.is_available() else 'cpu' model.to(device) model.eval() # 固定参数 with torch.no_grad(): char = np.array([char]) # 输入一个字符,预测下一个字是什么,先转成numpy char = char.reshape(-1, 1) # 变成二维才符合编码规范 char_encoding = model.onehot_encode(char).A # 对char进行编码,取成numpy比较方便reshape char_encoding = char_encoding.reshape(1, 1, -1) # char_encoding.shape为(1, 1, 43)变成三维才符合模型输入格式 char_tensor = torch.tensor(char_encoding, dtype=torch.float32) # 转成tensor char_tensor = char_tensor.to(device) out, hidden_size = model(char_tensor, hidden_size) # 放入模型进行预测,out为结果 probs = F.softmax(out, dim=1).squeeze() # 计算预测值,即所有字符的概率 if top_k is None: # 选择概率最大的top_k个 indices = np.arange(vocab_size) else: probs, indices = probs.topk(top_k) indices = indices.cpu().numpy() probs = probs.cpu().numpy() char_index = np.random.choice(indices, p=probs/probs.sum()) # 随机选择一个字符索引作为预测值 char = model.int_char[char_index] # 通过索引找出预测字符 return char, hidden_size 5.文本生成函数 def sample(model, length, top_k=None, sentence="c"): hidden_size = None new_sentence = [char for char in sentence] for i in range(length): next_char, hidden_size = predict(model, new_sentence[-1], top_k=top_k, hidden_size=hidden_size) new_sentence.append(next_char) return "".join(new_sentence) 6.主函数 def main(): hidden_size = 512 num_layers = 2 batch_size = 128 seq_len = 100 epochs = 2 lr = 0.01 f = pd.read_csv("../datasets/dev.tsv", sep="\t", header=None) f = f[0] text = list(f) text = ".".join(text) vocab = np.array(sorted(set(text))) # 建立字典 vocab_size = len(vocab) val_len = int(np.floor(0.2 * len(text))) # 划分训练测试集 trainset = np.array(list(text[:-val_len])) validset = np.array(list(text[-val_len:])) model = lstm_model(vocab, hidden_size, num_layers) # 模型实例化 train(model, trainset, batch_size, seq_len, epochs, lr=lr, valid=validset) # 训练模型 model.load_state_dict(torch.load("lstm_model.net")) # 调用保存的模型 new_text = sample(model, 100, top_k=5) # 预测模型,生成100个字符,预测时选择概率最大的前5个 print(new_text) # 输出预测文本 if __name__ == "__main__": main() 总结

这其实是b站上一个例子,感觉讲的很细致,我是照着他一行一行打下来的,然后因为版本问题,我对一些地方进行了修改,并且留了一些注释(千言万语都在注释里了),个人认为输入输出还是比较容易搞错的,以防以后忘记代码什么意思,这代码是在我的电脑上能跑,感觉流程应该没错吧,只是效果不行,因为我为了赶快理解这套流程,训练次数设的很小,而且训练数据也很小,所以看不到效果的。 这就是训练一次的结果,看得出很糟糕,但是至少出来结果了。 在这里插入图片描述

其实一般应该用面向对象的方法来写,用一些pytorch的一些类来写代码会显得更加简洁,奈何我的python水平限制了我对一些高级操作的理解,就先用简单而繁冗的代码好了

并且还是有很大改进空间的,例如进行词语级的文本生成,以及使用word2vec等引入词向量等,都可以是的模型获得更好的效果。

本文参考:https://www.bilibili.com/video/BV1Dv411W78n/?spm_id_from=pageDriver

完整代码 # coding: utf-8 import torch import torch.nn as nn import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.preprocessing import OneHotEncoder import torch.nn.functional as F class lstm_model(nn.Module): def __init__(self, vocab, hidden_size, num_layers, dropout=0.5): super(lstm_model, self).__init__() self.vocab = vocab # 字符数据集 # 索引,字符 self.int_char = {i: char for i, char in enumerate(vocab)} self.char_int = {char: i for i, char in self.int_char.items()} # 对字符进行one-hot encoding self.encoder = OneHotEncoder(sparse=True).fit(vocab.reshape(-1, 1)) self.hidden_size = hidden_size self.num_layers = num_layers # lstm层 self.lstm = nn.LSTM(len(vocab), hidden_size, num_layers, batch_first=True, dropout=dropout) # 全连接层 self.linear = nn.Linear(hidden_size, len(vocab)) def forward(self, sequence, hs=None): out, hs = self.lstm(sequence, hs) # lstm的输出格式(batch_size, sequence_length, hidden_size) out = out.reshape(-1, self.hidden_size) # 这里需要将out转换为linear的输入格式,即(batch_size * sequence_length, hidden_size) output = self.linear(out) # linear的输出格式,(batch_size * sequence_length, vocab_size) return output, hs def onehot_encode(self, data): return self.encoder.transform(data) def onehot_decode(self, data): return self.encoder.inverse_transform(data) def label_encode(self, data): return np.array([self.char_int[ch] for ch in data]) def label_decode(self, data): return np.array([self.int_char[ch] for ch in data]) def get_batches(data, batch_size, seq_len): ''' :param data: 源数据,输入格式(num_samples, num_features) :param batch_size: batch的大小 :param seq_len: 序列的长度(精度) :return: (batch_size, seq_len, num_features) ''' num_features = data.shape[1] num_chars = batch_size * seq_len # 一个batch_size的长度 num_batches = int(np.floor(data.shape[0] / num_chars)) # 计算出有多少个batches need_chars = num_batches * num_chars # 计算出需要的总字符量 targets = np.vstack((data[1:].A, data[0].A)) # 可能版本问题,取成numpy比较好reshape inputs = data[:need_chars].A.astype("int") # 从原始数据data中截取所需的字符数量need_words targets = targets[:need_chars] targets = targets.reshape(batch_size, -1, num_features) inputs = inputs.reshape(batch_size, -1, num_features) for i in range(0, inputs.shape[1], seq_len): x = inputs[:, i: i+seq_len] y = targets[:, i: i+seq_len] yield x, y # 节省内存 def train(model, data, batch_size, seq_len, epochs, lr=0.01, valid=None): device = 'cuda' if torch.cuda.is_available() else 'cpu' model = model.to(device) optimizer = torch.optim.Adam(model.parameters(), lr=lr) criterion = nn.CrossEntropyLoss() if valid is not None: data = model.onehot_encode(data.reshape(-1, 1)) valid = model.onehot_encode(valid.reshape(-1, 1)) else: data = model.onehot_encode(data.reshape(-1, 1)) train_loss = [] val_loss = [] for epoch in range(epochs): model.train() hs = None # hs等于hidden_size隐藏层节点 train_ls = 0.0 val_ls = 0.0 for x, y in get_batches(data, batch_size, seq_len): optimizer.zero_grad() x = torch.tensor(x).float().to(device) out, hs = model(x, hs) hs = ([h.data for h in hs]) y = y.reshape(-1, len(model.vocab)) y = model.onehot_decode(y) y = model.label_encode(y.squeeze()) y = torch.from_numpy(y).long().to(device) loss = criterion(out, y.squeeze()) loss.backward() optimizer.step() train_ls += loss.item() if valid is not None: model.eval() hs = None with torch.no_grad(): for x, y in get_batches(valid, batch_size, seq_len): x = torch.tensor(x).float().to(device) # x为一组测试数据,包含batch_size * seq_len个字 out, hs = model(x, hs) # out.shape输出为tensor[batch_size * seq_len, vocab_size] hs = ([h.data for h in hs]) # 更新参数 y = y.reshape(-1, len(model.vocab)) # y.shape为(128,100,43),因此需要转成两维,每行就代表一个字了,43为字典大小 y = model.onehot_decode(y) # y标签即为测试数据各个字的下一个字,进行one_hot解码,即变为字符 # 但是此时y 是[[..],[..]]形式 y = model.label_encode(y.squeeze()) # 因此需要去掉一维才能成功解码 # 此时y为[12...]成为一维的数组,每个代表自己字典里对应字符的字典序 y = torch.from_numpy(y).long().to(device) # 这里y和y.squeeze()出来的东西一样,可能这里没啥用,不太懂 loss = criterion(out, y.squeeze()) # 计算损失值 val_ls += loss.item() val_loss.append(np.mean(val_ls)) train_loss.append(np.mean(train_ls)) print("train_loss:", train_ls) plt.plot(train_loss, label="train_loss") plt.plot(val_loss, label="val loss") plt.title("loop vs epoch") plt.legend() plt.show() model_name = "lstm_model.net" with open(model_name, 'wb') as f: # 训练完了保存模型 torch.save(model.state_dict(), f) def predict(model, char, top_k=None, hidden_size=None): device = 'cuda' if torch.cuda.is_available() else 'cpu' model.to(device) model.eval() # 固定参数 with torch.no_grad(): char = np.array([char]) # 输入一个字符,预测下一个字是什么,先转成numpy char = char.reshape(-1, 1) # 变成二维才符合编码规范 char_encoding = model.onehot_encode(char).A # 对char进行编码,取成numpy比较方便reshape char_encoding = char_encoding.reshape(1, 1, -1) # char_encoding.shape为(1, 1, 43)变成三维才符合模型输入格式 char_tensor = torch.tensor(char_encoding, dtype=torch.float32) # 转成tensor char_tensor = char_tensor.to(device) out, hidden_size = model(char_tensor, hidden_size) # 放入模型进行预测,out为结果 probs = F.softmax(out, dim=1).squeeze() # 计算预测值,即所有字符的概率 if top_k is None: # 选择概率最大的top_k个 indices = np.arange(vocab_size) else: probs, indices = probs.topk(top_k) indices = indices.cpu().numpy() probs = probs.cpu().numpy() char_index = np.random.choice(indices, p=probs/probs.sum()) # 随机选择一个字符索引作为预测值 char = model.int_char[char_index] # 通过索引找出预测字符 return char, hidden_size def sample(model, length, top_k=None, sentence="c"): hidden_size = None new_sentence = [char for char in sentence] for i in range(length): next_char, hidden_size = predict(model, new_sentence[-1], top_k=top_k, hidden_size=hidden_size) new_sentence.append(next_char) return "".join(new_sentence) def main(): hidden_size = 512 num_layers = 2 batch_size = 128 seq_len = 100 epochs = 2 lr = 0.01 f = pd.read_csv("../datasets/dev.tsv", sep="\t", header=None) f = f[0] text = list(f) text = ".".join(text) vocab = np.array(sorted(set(text))) # 建立字典 vocab_size = len(vocab) val_len = int(np.floor(0.2 * len(text))) # 划分训练测试集 trainset = np.array(list(text[:-val_len])) validset = np.array(list(text[-val_len:])) model = lstm_model(vocab, hidden_size, num_layers) # 模型实例化 train(model, trainset, batch_size, seq_len, epochs, lr=lr, valid=validset) # 训练模型 model.load_state_dict(torch.load("lstm_model.net")) # 调用保存的模型 new_text = sample(model, 100, top_k=5) # 预测模型,生成100个字符,预测时选择概率最大的前5个 print(new_text) # 输出预测文本 if __name__ == "__main__": main() 后续

由于对文本生成任务不太了解,pytorch及LSTM模型也不太熟悉,我在b站上看了一些其他人的解释以及询问了学长之后,又仔细分析了一下这个案例的代码,发现这个代码其实存在比较大的问题。

以我目前的水平看来,这个案例他数据预处理的时候,一个序列对应一个序列的关系,例如abcd对应的标签为dabc,而不是一个字符,因此可能后面进行了某些操作使得他变成一个字符对应一个字符标签的操作了吧,从而使得预测的时候,只能通过一个字符预测其后面的字符,这就有点失去循环神经网络精髓的味道了,感觉是割裂字符之间的关系,变成一个普通单纯的分类了。

循环神经网络,因为能够处理序列位置的信息,需要设定一个滑动窗口值,或者说时间步长什么的,作用应该就是保留序列特征,例如abcdef为训练数据,设置滑动窗口为3的话,那么按照正常的序列思路可以划分为abc-d、bcd-e、cde-f作为训练数据的形式,即连续的三个字符对应的标签为其后面一个字符,那么我们训练出来的模型也是需要输入三个字符,然后生成一个字符,再用预测出来的字符加上他前面两个字符再预测新的字符,例如预测的初始序列为abc加入abc预测出来d,那么下一次预测就是bcd作为输入,就像一个窗口一步一步滑动过去一样,窗口的大小就为开始设定的3。

因此对于这个案例,他虽然seq_len=100,即滑动窗口为100,但是他的训练数据我感觉就不太对吧,而且模型预测时也是一个字符预测下一个字符,并没有体现滑动窗口的思想,因此我感觉这个案例有点问题。

下面是我对代码部分地方的改动,使得他能够按滑动窗口的思维来进行训练和预测。

数据预处理函数

def get_batches(data, batch_size, seq_len): ''' :param data: 源数据,输入格式(num_samples, num_features) :param batch_size: batch的大小 :param seq_len: 序列的长度(精度) :return: (batch_size, seq_len, num_features) ''' num_features = data.shape[1] num_chars = batch_size * seq_len # 一个batch_size的长度 num_batches = int(np.floor(data.shape[0] / num_chars)) # 计算出有多少个batches need_chars = num_batches * num_chars # 计算出需要的总字符量 targets = np.vstack((data[1:].A, data[0].A)) # 可能版本问题,取成numpy比较好reshape inputs = data[:need_chars].A.astype("int") # 从原始数据data中截取所需的字符数量need_words targets = targets[:need_chars] train_data = np.zeros((inputs.shape[0] - seq_len, seq_len, num_features)) train_label = np.zeros((inputs.shape[0] - seq_len, num_features)) for i in range(0, inputs.shape[0] - seq_len, 1): # inputs就是字符数 * 词向量大小(表示一个字符) # 思路就是abcd中ab-c, bc-d,一共4-3+1个 train_data[i] = inputs[i:i+seq_len] # 每seq_len=100的字符 train_label[i] = inputs[i+seq_len-1] # 训练标签就为他后面那个字符 print(train_data.shape) print(train_label.shape) for i in range(0, inputs.shape[0] - seq_len, batch_size): x = train_data[i:i+batch_size] # 每batch_size=128个一起进行训练更新参数 y = train_label[i:i+batch_size] # 对应的128个标签 print(x.shape) print(y.shape) print("-----------") yield x, y

模型构建部分

class lstm_model(nn.Module): def __init__(self, vocab, hidden_size, num_layers, dropout=0.5, seq_len=100): super(lstm_model, self).__init__() self.seq_len = seq_len self.vocab = vocab # 字符数据集 # 索引,字符 self.int_char = {i: char for i, char in enumerate(vocab)} self.char_int = {char: i for i, char in self.int_char.items()} # 对字符进行one-hot encoding self.encoder = OneHotEncoder(sparse=True).fit(vocab.reshape(-1, 1)) self.hidden_size = hidden_size self.num_layers = num_layers # lstm层 self.lstm = nn.LSTM(len(vocab), hidden_size, num_layers, batch_first=True, dropout=dropout) # 全连接层 self.linear = nn.Linear(hidden_size, len(vocab)) def forward(self, sequence, hs=None): # print("==========") # print("forward:", sequence.shape) out, hs = self.lstm(sequence, hs) # lstm的输出格式(batch_size, sequence_length, hidden_size) print("----", out.shape) # out = out.reshape(-1, self.hidden_size) # 这里需要将out转换为linear的输入格式,即(batch_size * sequence_length, hidden_size) print("========", out[:, -1].shape) output = self.linear(out[:, -1]) # 只取[bacth_size,hidden_size],即找到batch_size里每个元素的标签吧 print("output-----:", output.shape) return output, hs def onehot_encode(self, data): return self.encoder.transform(data) def onehot_decode(self, data): return self.encoder.inverse_transform(data) def label_encode(self, data): return np.array([self.char_int[ch] for ch in data]) def label_decode(self, data): return np.array([self.int_char[ch] for ch in data])

预测部分也得改,懒得改了。

我的想法和修改思路应该对吧,我也不是十分确定。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3