BERT实战

您所在的位置:网站首页 imdb数据集介绍 BERT实战

BERT实战

2024-01-08 08:55| 来源: 网络整理| 查看: 265

0 简介

之前学习了如何使用BERT实现文本分类,本次想融会贯通一下实现电影评论情感分类任务。完成之后感觉对整个过程有了更深的认识。本次相较于之前,主要工作是,a)对于新的任务,对原始数据进行预处理,以期满足特定任务;b)编写应用程序,对用户在控制台的输入电影评论进行情感判断。最终,由于训练时间太长了,就训练了一个epoch,在测试集上的测试精度为86.64%,在实际应用中可以较好地对电影评论情感进行分类。

1 训练设备

CPU:i7-12700H 2.30 GHz 内存 16GGPU:RTX3050 显存 4Gpython:3.8深度学习框架:pytorch 2.0.1+cu118

2 项目文件结构

image.png其中,由于墙的原因,bert-base-uncased需要提前下载,是在huggingface.co官网下载好的模型。image.png

3 IMDB影评的数据集介绍

这是用于二进制情感分类的数据集,其包含的数据比以前的基准数据集要多得多。 我们提供了25,000套极地电影评论供培训,而25,000套则用于测试。 也有其他未标记的数据可供使用。 提供原始文本和已处理的单词格式袋。Imdb 影评的数据集包含有25000 训练数据集25000 测试数据集数据集地址:http://ai.stanford.edu/~amaas/data/sentiment/点击下载即可image.png下载后解压,会看到有两个文件夹,test和train:我们点进train中,会发现正样本和负样本已经分好类了:neg和pos分别是负样本和正样本,unsup是未标注的样本,可能后续需要采用。其他的可以自行了解。每一条评论是一个文本,注意到,这些文本一般都不短…为了贴合之前的数据集要求,需要进行预处理。

4 数据预处理

处理数据集数据,数据集中正负样本是分开的,而且每个文件就一句话处理目标:归到一个文本文件中,两列,第一列为评论 第二列为标签 正样本1 负样本0deal_data.py

import os import random neg_test_path = r"E:\workspace\【4-python-workspace】\LLM\04_finetune\03_BERT_Pre-training+Fine-tuning(Movie review sentiment classification)\aclImdb\test\neg/" neg_train_path = r"E:\workspace\【4-python-workspace】\LLM\04_finetune\03_BERT_Pre-training+Fine-tuning(Movie review sentiment classification)\aclImdb\train\neg/" pos_test_path = r"E:\workspace\【4-python-workspace】\LLM\04_finetune\03_BERT_Pre-training+Fine-tuning(Movie review sentiment classification)\aclImdb\test\pos/" pos_train_path = r"E:\workspace\【4-python-workspace】\LLM\04_finetune\03_BERT_Pre-training+Fine-tuning(Movie review sentiment classification)\aclImdb\train\pos/" pos_train_listdir = os.listdir(pos_train_path) pos_test_listdir = os.listdir(pos_test_path) neg_train_listdir = os.listdir(neg_train_path) neg_test_listdir = os.listdir(neg_test_path) # 获取每个评论字符串 pos_content = [] neg_content = [] for txt_name in pos_train_listdir: file_path = os.path.join(pos_train_path, txt_name) with open(file_path, "r", encoding="utf-8") as file: content = file.read() content.replace("\t", "") pos_content.append([content, 1]) for txt_name in pos_test_listdir: file_path = os.path.join(pos_test_path, txt_name) with open(file_path, "r", encoding="utf-8") as file: content = file.read() content.replace("\t", "") pos_content.append([content, 1]) for txt_name in neg_train_listdir: file_path = os.path.join(neg_train_path, txt_name) with open(file_path, "r", encoding="utf-8") as file: content = file.read() content.replace("\t", "") neg_content.append([content, 0]) for txt_name in neg_test_listdir: file_path = os.path.join(neg_test_path, txt_name) with open(file_path, "r", encoding="utf-8") as file: content = file.read() content.replace("\t", "") neg_content.append([content, 0]) # 训练集 验证集 测试集比例 8:1:1 train_ratio = 0.8 val_ratio = 0.1 test_ratio = 0.1 pos_len = len(pos_content) neg_len = len(neg_content) train_data = pos_content[:int(pos_len * train_ratio)] + neg_content[:int(neg_len * train_ratio)] val_data = pos_content[int(pos_len * train_ratio):int(pos_len * (train_ratio + val_ratio))] + \ neg_content[int(neg_len * train_ratio):int(neg_len * (train_ratio + val_ratio))] test_data = pos_content[int(pos_len * (1 - test_ratio)):] + neg_content[int(neg_len * (1 - test_ratio)):] # 打乱顺序 random.shuffle(train_data) random.shuffle(val_data) random.shuffle(test_data) # # 格式化数据,并且写入文件 # # 打开文件,使用 'w' 模式表示写入 with open("./data/train.csv", "w", encoding="utf-8") as f: # 遍历字符串列表 for data in train_data: # 将字符串和制表符以及数字写入文件,每个字符串一行 f.write(f"{data[0]}\t\t{data[1]}\n") with open("./data/val.csv", "w", encoding="utf-8") as f: # 遍历字符串列表 for data in val_data: # 将字符串和制表符以及数字写入文件,每个字符串一行 f.write(f"{data[0]}\t\t{data[1]}\n") with open("./data/test.csv", "w", encoding="utf-8") as f: # 遍历字符串列表 for data in test_data: # 将字符串和制表符以及数字写入文件,每个字符串一行 f.write(f"{data[0]}\t\t{data[1]}\n") print("数据处理结束!") print("训练集个数:{}".format(len(train_data))) print("验证集个数:{}".format(len(val_data))) print("测试集个数:{}".format(len(test_data)))

5 预训练模型

5.1 在文本上训练标记器

首先读取数据集data_read.py

import pandas as pd root_data_dir = "./data/" # 分别读取训练集 验证集 测试集 train_data = pd.read_csv(root_data_dir + 'train.csv', sep='\t\t', header=None) val_data = pd.read_csv(root_data_dir + 'val.csv', sep='\t\t', header=None) test_data = pd.read_csv(root_data_dir + 'test.csv', sep='\t\t', header=None) # 电影评论 正负评价(1,0) train_data.columns = ['query', 'label'] val_data.columns = ['query', 'label'] test_data.columns = ['query', 'label'] # 将训练集、验证集和测试集的query列的内容合并到pretraining_data列表中。 pretraining_data = train_data['query'].tolist() + val_data['query'].tolist() + test_data['query'].tolist() # 文件写入 with open('./working/pretraining_data.txt', 'w', encoding="utf-8") as f: for sent in pretraining_data: f.write("%s\n" % sent)

在将数据转换为所需格式后,下一步是对输入数据进行标记器(tokenizer)的训练。当然,也可以使用原始数据集中自带的imdb.vocab。data_read.py

import pandas as pd root_data_dir = "./data/" # 分别读取训练集 验证集 测试集 train_data = pd.read_csv(root_data_dir + 'train.csv', sep='\t\t', header=None) val_data = pd.read_csv(root_data_dir + 'val.csv', sep='\t\t', header=None) test_data = pd.read_csv(root_data_dir + 'test.csv', sep='\t\t', header=None) # 电影评论 正负评价(1,0) train_data.columns = ['query', 'label'] val_data.columns = ['query', 'label'] test_data.columns = ['query', 'label'] # 将训练集、验证集和测试集的query列的内容合并到pretraining_data列表中。 pretraining_data = train_data['query'].tolist() + val_data['query'].tolist() + test_data['query'].tolist() # 文件写入 with open('./working/pretraining_data.txt', 'w', encoding="utf-8") as f: for sent in pretraining_data: f.write("%s\n" % sent)

5.2 训练BERT进行MLM任务

我们将使用与训练tokenizer相同的数据集。由于本次任务参数量更大,有112725648,同样的超参数下较之前需要更大的显存,因此需要更改部分超参数,比如可以减小batch_size。具体代码如下所示:pretrain_bert.py

# 使用预训练数据对BERT模型进行预训练,并保存预训练好的模型和词汇表。 import tokenizers from transformers import Trainer, TrainingArguments from transformers import BertTokenizer, LineByLineTextDataset, BertModel, BertConfig, BertForMaskedLM, \ DataCollatorForLanguageModeling from bert_config_movie import output_vocab_file # 设置词汇表文件的路径 vocab_file_dir = './models/vocab.txt' # 从预训练的词汇表文件加载BERT的分词器(Tokenizer) tokenizer = BertTokenizer.from_pretrained(vocab_file_dir) # 将预训练数据转换成适用于预训练的格式 dataset = LineByLineTextDataset( tokenizer=tokenizer, # 指定使用的分词器 file_path='./working/pretraining_data.txt', # 指定包含预训练数据的文本文件路径 block_size=128 # 设置每个样本的最大长度为128个token ) print('No. of lines: ', len(dataset)) # 定义BERT模型的配置 config = BertConfig( vocab_size=90000, # 词汇表大小 hidden_size=768, # 隐层大小 num_hidden_layers=6, # BERT模型的隐层层数 num_attention_heads=12, # 注意力头数 max_position_embeddings=512 # 最大位置嵌入 ) # 基于上述配置构建BERT的Masked Language Model(MLM)模型 model = BertForMaskedLM(config) print('No of parameters: ', model.num_parameters()) # 创建用于语言建模任务的数据收集器 data_collator = DataCollatorForLanguageModeling( tokenizer=tokenizer, # 指定使用的分词器 mlm=True, # 指定为Masked Language Model任务 mlm_probability=0.15 # 设定mask token的概率 ) # 定义BERT预训练的训练配置 training_args = TrainingArguments( output_dir='./working/', # 设置输出目录路径 overwrite_output_dir=True, # 若输出目录已存在,则覆盖 num_train_epochs=2, # 训练轮数 per_device_train_batch_size=8, # 每个设备的训练批次大小 save_steps=10_000, # 每隔10,000步保存一次模型 save_total_limit=2, # 最多保存2个模型 ) # 创建用于训练的Trainer对象 trainer = Trainer( model=model, # 指定要训练的BERT模型 args=training_args, # 指定训练配置 data_collator=data_collator, # 指定数据收集器 train_dataset=dataset, # 指定预训练数据集 ) # 执行BERT的预训练 trainer.train() # 保存模型 trainer.save_model('./models/') # 将训练好的分词器的词汇表保存到output_vocab_file指定的路径中。 tokenizer.save_vocabulary(output_vocab_file)

到现在为止,我们已经完成了预训练的部分。让我们进入微调部分。

6 微调模型

6.1 数据准备

在_微调阶段,数据的格式必须与我们在预训练部分使用的格式不同_。BERT接受三个输入,即input_ids、attention_mask和token_type_ids。fine_tuning_data_formatting.py

""" 在微调阶段,数据的格式必须与我们在预训练部分使用的格式不同 使用了torch.utils中的Dataset类和BERT的分词器将数据转换为所需的格式 """ import torch from torch.utils.data import Dataset, DataLoader from transformers import BertTokenizer from data_read import train_data, val_data, test_data from bert_config_movie import MAX_LEN, TRAIN_BATCH_SIZE, VALID_BATCH_SIZE # 设置预训练的BERT模型名称 离线下载 model_name = "./bert-base-uncased/" # 使用预训练的BERT模型的名称创建BERT分词器(Tokenizer) tokenizer = BertTokenizer.from_pretrained(model_name, truncation=True, do_lower_case=True) # 定义数据集类(QueryData),继承自PyTorch的Dataset类 class QueryData(Dataset): # 接收数据框(dataframe)、分词器(tokenizer)和最大长度(max_len)作为输入参数 def __init__(self, dataframe, tokenizer, max_len): self.tokenizer = tokenizer # 查询的文本数据 self.text = dataframe['query'] # 标签数据 self.targets = dataframe['label'] self.max_len = max_len def __len__(self): # 返回数据集长度 return len(self.text) # 根据索引获取一个样本的函数 def __getitem__(self, index): text = str(self.text[index]) # 将文本按空格拆分成词语,并使用空格重新连接,以确保所有的词语都以空格分隔 text = " ".join(text.split()) # 使用BERT分词器(tokenizer)将文本数据编码成BERT模型输入的格式 inputs = self.tokenizer.encode_plus( text, None, add_special_tokens=True, max_length=self.max_len, pad_to_max_length=True, return_token_type_ids=True ) ids = inputs['input_ids'] # 获取注意力掩码(attention_mask),用于标识哪些词元是实际文本内容,哪些是填充的。 mask = inputs['attention_mask'] # 获取词元类型标识(token_type_ids),用于区分两个句子或段落 token_type_ids = inputs["token_type_ids"] # 返回一个字典 # BERT接受三个输入,即input_ids、attention_mask和token_type_ids return { # 将词元输入转换为PyTorch张量,并指定数据类型为torch.long(整型) 'ids': torch.tensor(ids, dtype=torch.long), 'mask': torch.tensor(mask, dtype=torch.long), 'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long), # 将目标标签转换为PyTorch张量,并指定数据类型为torch.float(浮点型) 'targets': torch.tensor(self.targets[index], dtype=torch.float) } print("Train Dataset: {}".format(train_data.shape)) print("Validation Dataset: {}".format(val_data.shape)) print("Test Dataset: {}".format(test_data.shape)) training_set = QueryData(train_data, tokenizer, MAX_LEN) val_set = QueryData(val_data, tokenizer, MAX_LEN) # Defining training and testing paramerters train_params = {'batch_size': TRAIN_BATCH_SIZE, 'shuffle': True, 'num_workers': 0 } test_params = {'batch_size': VALID_BATCH_SIZE, 'shuffle': True, 'num_workers': 0 } # Creating dataloader for training and testing purposes training_loader = DataLoader(training_set, **train_params) val_loader = DataLoader(val_set, **test_params)

6.2 模型定义

现在让我们开始进行微调目的的模型构建部分。我将在BERT的顶部添加两个线性层,用于分类目的,并使用dropout = 0.1和ReLU作为激活函数。您也可以尝试不同的配置。我已经定义了一个PyTorch类来构建模型,代码如下所示:fine_tuning.py

import torch from transformers import BertModel # 定义了一个基于BERT的分类模型 继承自PyTorch的torch.nn.Module类 class BertClass(torch.nn.Module): def __init__(self): """ 初始化函数,用于定义模型结构和参数 """ super(BertClass, self).__init__() # 使用预训练的BERT模型加载权重,并作为模型的一部分。 self.l1 = BertModel.from_pretrained("./models/") # 定义一个线性层,用于对BERT模型的输出进行线性变换 self.pre_classifier = torch.nn.Linear(768, 768) # 定义一个Dropout层,用于在训练过程中进行随机失活 self.dropout = torch.nn.Dropout(0.1) # 定义一个线性层,用于将BERT模型的输出映射到2个输出类别上 self.classifier = torch.nn.Linear(768, 2) # 定义一个ReLU激活函数 self.relu = torch.nn.ReLU() def forward(self, input_ids, attention_mask, token_type_ids): """ 定义前向传播函数,用于执行模型的前向计算 """ """ 在BERT的顶部添加两个线性层 """ # 通过预训练的BERT模型进行前向计算,得到BERT模型的输出 output_1 = self.l1(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids) # 从BERT模型的输出中提取隐藏状态 hidden_state = output_1[0] # 将隐藏状态的第一个词元的输出作为池化器(pooler)的输入 pooler = hidden_state[:, 0] # 通过线性层进行线性变换 pooler = self.pre_classifier(pooler) # 通过ReLU激活函数进行非线性变换 pooler = self.relu(pooler) # 通过Dropout层进行随机失活 pooler = self.dropout(pooler) # 通过线性层进行分类预测 output = self.classifier(pooler) # 返回结果 return output

6.3 训练和验证

training.py

import torch from tqdm import tqdm from fine_tuning import BertClass from bert_config_movie import device, LEARNING_RATE, model_dir from fine_tuning_data_formatting import training_loader, val_loader # 实例化BertClass模型,并将模型移动到指定的计算设备(GPU或CPU)上 model = BertClass() model.to(device) # 定义交叉熵损失函数 loss_function = torch.nn.CrossEntropyLoss() # 定义Adam优化器 optimizer = torch.optim.Adam(params=model.parameters(), lr=LEARNING_RATE) # 计算准确率的函数 def calcuate_accuracy(preds, targets): # 预测和目标样本对比 相等(即正确的)的记为1 并求总个数 n_correct = (preds == targets).sum().item() return n_correct # 模型训练函数 def train(epoch, training_loader): # 初始化累计的训练损失 tr_loss = 0 # 初始化累计的正确预测数量 n_correct = 0 # 初始化累计的训练步数 nb_tr_steps = 0 # 初始化累计的训练样本数量 nb_tr_examples = 0 # 将BERT模型设置为训练模式,启用Dropout和BatchNormalization等训练时特有的操作 model.train() for _, data in tqdm(enumerate(training_loader, 0)): # 将输入数据的ids张量移动到指定的计算设备(GPU或CPU)上 ids = data['ids'].to(device, dtype=torch.long) # 将输入数据的mask张量移动到指定的计算设备上 mask = data['mask'].to(device, dtype=torch.long) # 将输入数据的token_type_ids张量移动到指定的计算设备上 token_type_ids = data['token_type_ids'].to(device, dtype=torch.long) # 将标签数据的targets张量移动到指定的计算设备上 targets = data['targets'].to(device, dtype=torch.long) # 通过BERT模型进行前向传播,获取预测输出 outputs = model(ids, mask, token_type_ids) # 计算loss loss = loss_function(outputs, targets) # 累计loss tr_loss += loss.item() # 获取预测输出中每个样本预测得分最高的类别索引 big_val, big_idx = torch.max(outputs.data, dim=1) # 计算当前训练步中预测正确的样本数量,并累计到n_correct中 n_correct += calcuate_accuracy(big_idx, targets) # 增加训练步数 nb_tr_steps += 1 # 增加训练样本数量 nb_tr_examples += targets.size(0) if _ % 100 == 0: # 计算当前步数中的平均训练损失 loss_step = tr_loss / nb_tr_steps # 计算当前步数中的训练准确率 accu_step = (n_correct * 100) / nb_tr_examples print(f"Training Loss per 500 steps: {loss_step}") print(f"Training Accuracy per 500 steps: {accu_step}") # 清空之前的梯度信息 optimizer.zero_grad() # 计算当前步数中的梯度 loss.backward() # 执行梯度更新,更新模型的参数 optimizer.step() # 打印当前epoch中的总准确率 print(f'The Total Accuracy for Epoch {epoch}: {(n_correct * 100) / nb_tr_examples}') # 计算当前epoch的平均训练损失 epoch_loss = tr_loss / nb_tr_steps # 计算当前epoch的训练准确率 epoch_accu = (n_correct * 100) / nb_tr_examples print(f"Training Loss Epoch: {epoch_loss}") print(f"Training Accuracy Epoch: {epoch_accu}") return EPOCHS = 1 for epoch in range(EPOCHS): train(epoch, training_loader) # 模型验证函数 def valid(model, testing_loader): # 将BERT模型设置为评估模式,禁用Dropout和BatchNormalization等训练时特有的操作 model.eval() # 初始化累计的正确预测数量 n_correct = 0 # 初始化累计的错误预测数量 n_wrong = 0 # 初始化样本总数 total = 0 # 初始化累计的验证损失 tr_loss = 0 # 初始化累计的验证步数 nb_tr_steps = 0 # 初始化累计的验证样本数量 nb_tr_examples = 0 # 使用torch.no_grad()上下文管理器,禁用梯度计算,节省内存并加快计算速度 with torch.no_grad(): for _, data in tqdm(enumerate(testing_loader, 0)): # 将输入数据的ids张量移动到指定的计算设备(GPU或CPU)上 ids = data['ids'].to(device, dtype=torch.long) mask = data['mask'].to(device, dtype=torch.long) token_type_ids = data['token_type_ids'].to(device, dtype=torch.long) targets = data['targets'].to(device, dtype=torch.long) # 通过BERT模型进行前向传播,获取预测输出 outputs = model(ids, mask, token_type_ids) # 计算预测输出和真实标签之间loss loss = loss_function(outputs, targets) # 累计loss tr_loss += loss.item() # 获取预测输出中每个样本预测得分最高的类别索引 big_val, big_idx = torch.max(outputs.data, dim=1) # 计算当前验证步中预测正确的样本数量,并累计到n_correct中 n_correct += calcuate_accuracy(big_idx, targets) # 增加验证步数 nb_tr_steps += 1 # 增加验证样本数量 nb_tr_examples += targets.size(0) if _ % 5000 == 0: # 计算当前步数中的平均验证损失 loss_step = tr_loss / nb_tr_steps # 计算当前步数中的验证准确率 accu_step = (n_correct * 100) / nb_tr_examples print(f"Validation Loss per 100 steps: {loss_step}") print(f"Validation Accuracy per 100 steps: {accu_step}") # 计算当前epoch的平均验证损失 epoch_loss = tr_loss / nb_tr_steps # 计算当前epoch的验证准确率 epoch_accu = (n_correct * 100) / nb_tr_examples print(f"Validation Loss Epoch: {epoch_loss}") print(f"Validation Accuracy Epoch: {epoch_accu}") # 返回验证准确率 return epoch_accu # 在验证数据集上评估模型的准确率 acc = valid(model, val_loader) print("Accuracy on validation data = %0.2f%%" % acc) # 保存模型 # .pth torch.save(model, model_dir)

训练过程与结果如下图所示。最终验证集上的准确率为86.64%。image.png

6.4 测试集测试

testing.py

import torch from torch.utils.data import Dataset, DataLoader from tqdm import tqdm from data_read import test_data from bert_config_movie import MAX_LEN, device, model_dir from fine_tuning_data_formatting import tokenizer from fine_tuning import BertClass # 定义一个自定义数据集类QueryData,继承自Dataset类 class QueryData(Dataset): def __init__(self, dataframe, tokenizer, max_len): # 初始化数据集的BertTokenizer self.tokenizer = tokenizer self.data = dataframe # 从DataFrame中获取查询文本列 self.text = dataframe['query'] # self.targets = self.data.citation_influence_label # 初始化数据集的最大文本长度 self.max_len = max_len # 定义数据集的长度,即数据集中样本的数量 def __len__(self): return len(self.text) # 定义数据集的获取方法,根据索引获取具体样本 def __getitem__(self, index): text = str(self.text[index]) # 对查询文本进行分词,去除多余的空格 text = " ".join(text.split()) # 使用self.tokenizer.encode_plus()方法对文本进行编码,并生成模型的输入张量 # 包含以下信息 # input_ids:编码后的文本,表示为数字序列。 # attention_mask:注意力掩码,用于指示哪些令牌在模型的注意力机制中被考虑,哪些令牌被掩盖。 # token_type_ids:用于区分输入中不同句子或段落的令牌类型。在这个任务中,token_type_ids全为0,因为只有一个文本序列。 inputs = self.tokenizer.encode_plus( text, # 要编码的文本 None, # 在此情况下,表示不存在关联的另一个文本。在这个任务中,文本之间没有关联,因此使用None add_special_tokens=True, # 表示在编码时添加特殊标记,例如[CLS]和[SEP]。 max_length=self.max_len, # 表示编码后的序列的最大长度。 pad_to_max_length=True, # 如果编码后的序列长度小于max_length,则在序列末尾填充0,使其长度与max_length相同。 return_token_type_ids=True # 用于区分输入中的不同句子或段落。在这个任务中,由于输入没有关联的另一个文本,token_type_ids全为0。 ) ids = inputs['input_ids'] mask = inputs['attention_mask'] token_type_ids = inputs["token_type_ids"] # 返回字典 return { 'ids': torch.tensor(ids, dtype=torch.long), 'mask': torch.tensor(mask, dtype=torch.long), 'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long) # 'targets': torch.tensor(self.targets[index], dtype=torch.float) } # 测试函数,用于在测试集上进行模型预测 def test(model, testing_loader): res = [] # 将BERT模型设置为评估模式 model.eval() n_correct = 0 n_wrong = 0 total = 0 tr_loss = 0 nb_tr_steps = 0 nb_tr_examples = 0 # 使用torch.no_grad()上下文管理器,禁用梯度计算 with torch.no_grad(): for _, data in tqdm(enumerate(testing_loader, 0)): # 将输入数据的ids张量移动到指定的计算设备 ids = data['ids'].to(device, dtype=torch.long) mask = data['mask'].to(device, dtype=torch.long) token_type_ids = data['token_type_ids'].to(device, dtype=torch.long) # targets = data['targets'].to(device, dtype = torch.long) # 通过BERT模型进行前向传播,得到预测输出。 outputs = model(ids, mask, token_type_ids) # 获取预测输出中每个样本预测得分最高的类别索引。 big_val, big_idx = torch.max(outputs, dim=1) # 将预测结果添加到列表res中。 res.extend(big_idx.tolist()) return res # 打印测试集的前两行数据 print(test_data.head(2)) # 创建测试数据集 data_to_test = QueryData(test_data[['query']], tokenizer, MAX_LEN) # 测试参数设置 test_params = {'batch_size': 32, # 批次大小 'shuffle': False, # 是否打乱顺序 'num_workers': 0 # 运行的线程数量 } # 用于测试的数据加载器。 testing_loader_f = DataLoader(data_to_test, **test_params) # 加载训练好的模型 model = torch.load(model_dir, map_location="cpu") # model = BertClass() # model.load_state_dict(torch.load(model_dir, map_location="cpu"), strict=False) # 对测试集进行预测,得到预测结果 res = test(model, testing_loader_f) # 根据预测结果和真实标签计算预测准确性 # 通过将模型预测的结果res与测试集中的真实标签test_data['label']逐个对应比较,并生成一个新的列表correct。 correct = [1 if pred == lab else 0 for pred, lab in zip(res, test_data['label'].tolist())] # 统计列表correct中值为1的元素的个数(即预测正确的样本数),除以测试集的总样本数,即可得到模型在测试集上的准确率 print('accuracy on test set is - ', sum(correct) / len(test_data['label'].tolist()) * 100, '%')

测试结果如下图所示。image.png

7 控制台交互应用

编写app.py,实现用户在控制台输入电影评论,程序返回分类结果,包含情感类别和概率值。具体代码如下所示。app.py

import torch from torch.utils.data import Dataset, DataLoader from tqdm import tqdm from bert_config_movie import MAX_LEN, device, model_dir from fine_tuning_data_formatting import tokenizer import pandas as pd from fine_tuning import BertClass import numpy as np import warnings warnings.filterwarnings("ignore") # 定义一个自定义数据集类QueryData,继承自Dataset类 class QueryData(Dataset): def __init__(self, dataframe, tokenizer, max_len): # 初始化数据集的BertTokenizer self.tokenizer = tokenizer self.data = dataframe # 从DataFrame中获取查询文本列 self.text = dataframe['query'] # self.targets = self.data.citation_influence_label # 初始化数据集的最大文本长度 self.max_len = max_len # 定义数据集的长度,即数据集中样本的数量 def __len__(self): return len(self.text) # 定义数据集的获取方法,根据索引获取具体样本 def __getitem__(self, index): text = str(self.text[index]) # 对查询文本进行分词,去除多余的空格 text = " ".join(text.split()) # 使用self.tokenizer.encode_plus()方法对文本进行编码,并生成模型的输入张量 # 包含以下信息 # input_ids:编码后的文本,表示为数字序列。 # attention_mask:注意力掩码,用于指示哪些令牌在模型的注意力机制中被考虑,哪些令牌被掩盖。 # token_type_ids:用于区分输入中不同句子或段落的令牌类型。在这个任务中,token_type_ids全为0,因为只有一个文本序列。 inputs = self.tokenizer.encode_plus( text, # 要编码的文本 None, # 在此情况下,表示不存在关联的另一个文本。在这个任务中,文本之间没有关联,因此使用None add_special_tokens=True, # 表示在编码时添加特殊标记,例如[CLS]和[SEP]。 max_length=self.max_len, # 表示编码后的序列的最大长度。 pad_to_max_length=True, # 如果编码后的序列长度小于max_length,则在序列末尾填充0,使其长度与max_length相同。 return_token_type_ids=True # 用于区分输入中的不同句子或段落。在这个任务中,由于输入没有关联的另一个文本,token_type_ids全为0。 ) ids = inputs['input_ids'] mask = inputs['attention_mask'] token_type_ids = inputs["token_type_ids"] # 返回字典 return { 'ids': torch.tensor(ids, dtype=torch.long), 'mask': torch.tensor(mask, dtype=torch.long), 'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long) # 'targets': torch.tensor(self.targets[index], dtype=torch.float) } # 测试函数,用于在测试集上进行模型预测 def test(model, testing_loader): res = [] # 将BERT模型设置为评估模式 model.eval() # 使用torch.no_grad()上下文管理器,禁用梯度计算 with torch.no_grad(): for _, data in tqdm(enumerate(testing_loader, 0)): # 将输入数据的ids张量移动到指定的计算设备 ids = data['ids'].to(device, dtype=torch.long) mask = data['mask'].to(device, dtype=torch.long) token_type_ids = data['token_type_ids'].to(device, dtype=torch.long) # targets = data['targets'].to(device, dtype = torch.long) # 通过BERT模型进行前向传播,得到预测输出。 outputs = model(ids, mask, token_type_ids) my_outputs = outputs.tolist() score = np.exp(my_outputs[0]) / np.sum(np.exp(my_outputs[0]), axis=0) idx = np.argmax(score) # 获取预测输出中每个样本预测得分最高的类别索引。 big_val, big_idx = torch.max(outputs, dim=1) # 将预测结果添加到列表res中。 res.extend(big_idx.tolist()) res.append(round(score[idx] * 100, 2)) print(res) return res if __name__ == '__main__': # 加载训练好的模型 model = torch.load(model_dir, map_location="cpu") # model = BertClass() # model.load_state_dict(torch.load(model_dir, map_location="cpu"), strict=False) while True: print("=" * 50) test_movie_review = input("Please enter your movie review > ") if test_movie_review == "#" or test_movie_review == "q": print("Exit the application !") exit(0) if test_movie_review == "": print("Please input something ~") continue test_data = pd.DataFrame([test_movie_review], columns=["query"]) # 创建测试数据集 data_to_test = QueryData(test_data[['query']], tokenizer, MAX_LEN) # 测试参数设置 test_params = {'batch_size': 1, # 批次大小 'shuffle': False, # 是否打乱顺序 'num_workers': 0 # 运行的线程数量 } # 用于测试的数据加载器。 testing_loader_f = DataLoader(data_to_test, **test_params) # 对测试集进行预测,得到预测结果 res = test(model, testing_loader_f) if res[0] == 1: print("result > {} This is a positive movie review ~".format(res)) else: print("result > {} This is a negative movie review !".format(res)) print("=" * 50)

运行的结果如下。实际应用中有一定的效果。image.png

8 总结

本次主要是对之前文本分类任务的融会贯通,对于特定的电影评论情感分类任务,制作数据集,并训练。对最终训练的模型在控制台模拟进行了应用,有一定的效果。不过,本次较之前并没有对微调模型进行改变。

完整的项目代码



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3