BERTモデルを使った分類問題をやってみた(コンペティションでの試行錯誤の備忘録)
SIGNATEのコンペティションで、文章分類にチャレンジしました。結果はあまり良くありませんでしたが、間違っていること、よくなかったことも含めて、残しておきたいと思います。
基本的な処理
GPUの利用
以下の一文で、deviceを指定する。
import torch
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
利用時は、モデルやデータをデバイスに送る。モデルとデータが同じデバイス上にないとエラーになる。
#モデルをデバイスに送る。
model = AutoModel.from_pretrained(model_name)
model.to(device)
#データをデバイスに送る
for batch in test_dataloader:
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
labels = batch['labels'].to(device)
データの読み込み
テーブル形式
テキストデータと、ラベルデータがcsvファイルにある場合には、pandasのpd.read_csvを使う。
import pandas as pd
# 学習データをCSVファイルから読み込む
train = pd.read_csv('train.csv', sep=',', index_col=0)
ファイル形式
学習用データがtextファイルとして個別に格納されており、train_master.csvにはfile_nameとlabelとが入っている場合には、以下のように処理する。
# マスタデータの読み込み
train = pd.read_csv('train_master.csv', sep=',', index_col=0)
train.head()
--
file_name label
train_0000.txt 0
train_0001.txt 0
train_0002.txt 1
train_0003.txt 1
train_0004.txt 0
指定したフォルダにある”train_xxxx.text”ファイルをソートして順次読み込み、trainデータフレームに追加します。
import glob
# trainデータの作成
text_train = []
for file_name in np.sort(glob.glob('<train_dataの入っているフォルダ名>/train_*.txt')):
with open(file_name) as f:
text = f.read()
text_train.append(text)
train['text'] = text_train
データの整形
Pandas上で、データの整形を行う。
#label=1の比率を確認。 極端に偏った比率の場合には、別途処理が必要。
border = len(train[train["label"] == 1]) / len(train["label"])
#NaN値を空白文字列で置換
train['text'].fillna('', inplace=True)
#小文字化
train['text'] = train['text'].str.lower()
#文字列の置換
train['text'] = train['text'].replace('model','MODEL')
#不要な列の削除
train = train.drop('dummy', axis=1)
#列の結合
## 2つの列をを結合して新しい特徴量を作成
train['text'] = train['subject']+ ':' + train['body']
#マッピング labelの yesを1に noを0 に変換
mapping = {"yes": 1, "no": 0}
train['label'] = train['label'].map(mapping)
データの分割
データフレームからデータを取り出す。
# テキストデータとラベルを取り出す。
texts = train['text'].tolist()
labels = train['label'].tolist()
train_test_split
取り出したtextsとlabelsを trainデータとtestデータに分割する。test_size=0.2でtrain:testを8:2の割合で分割する。
labelの比率が極端な場合には、stratify=labelsオプションをつけて、labelの値が均等になるよう分割する
# データの分割
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(texts, labels, stratify=labels, test_size=0.2, random_state=42)
K-fold交差検証
trainデータをn分割し、交差検証を行う場合。
from sklearn.model_selection import KFold
kf = KFold(n_splits=5, shuffle=True, random_state=42) # 5分割の例
# K-Fold交差検証の実施
for train_index, test_index in kf.split(texts, labels):
labelの値に大きな偏りがある場合には、StratifiedKFoldを用いることで、可能な限り同じ分布になるように分割される。
from sklearn.model_selection import StratifiedKFold
#StratifiedKFold
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
モデル
modelのロード
from transformers import AutoModel, AutoTokenizer, AutoConfig
# モデル名とトークナイザー名を定義
model_name = "<model>"
# 事前訓練済みのモデルとトークナイザーをロード
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)
modelの保存
学習したモデルを保存する場合。
# modelを学習させた後saveする
model_name = "<保存するモデル名>"
model.save_pretrained(model_name)
学習
PyTorch
基本形
PyTorchを使ったトレーニングの基本形は以下のとおりです。
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer, BertForSequenceClassification, AdamW
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from tqdm import tqdm
# データの分割
train_texts, val_texts, train_labels, val_labels = train_test_split(texts, labels, test_size=0.2)
# BERT トークナイザーの準備
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
# データセットの定義
class CustomDataset(Dataset):
def __init__(self, texts, labels):
self.encodings = tokenizer(texts, padding=True, truncation=True, return_tensors='pt')
self.labels = torch.tensor(labels)
def __getitem__(self, idx):
item = {key: val[idx].to(device) for key, val in self.encodings.items()}
item['labels'] = self.labels[idx].to(device)
return item
def __len__(self):
return len(self.labels)
# 訓練データと検証データの作成
train_dataset = CustomDataset(train_texts, train_labels)
val_dataset = CustomDataset(val_texts, val_labels)
# モデルのロード
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2).to(device)
# トレーニングの設定
optimizer = AdamW(model.parameters(), lr=1e-5)
num_epochs = 3
batch_size = 2
# データローダーの作成
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
# トレーニングループ
for epoch in range(num_epochs):
model.train()
total_loss = 0
for batch in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}', leave=False):
optimizer.zero_grad()
inputs = {key: batch[key] for key in batch if key != 'labels'}
outputs = model(**inputs, labels=batch['labels'])
loss = outputs.loss
total_loss += loss.item()
loss.backward()
optimizer.step()
average_loss = total_loss / len(train_loader)
print(f'Training Loss: {average_loss}')
# バリデーション
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
for batch in tqdm(val_loader, desc=f'Validation Epoch {epoch + 1}/{num_epochs}', leave=False):
inputs = {key: batch[key] for key in batch if key != 'labels'}
outputs = model(**inputs)
logits = outputs.logits
preds = torch.argmax(logits, dim=1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(batch['labels'].cpu().numpy())
accuracy = accuracy_score(all_labels, all_preds)
print(f'Validation Accuracy: {accuracy}')
KFold交差検証を行う場合
# BERT トークナイザーの準備
# データセットの定義
# 訓練データと検証データの作成
# モデルのロード
# KFoldでデータを分割
num_splits = 5 # Flod数=5とする
kf = KFold(n_splits=num_splits, shuffle=True, random_state=42)
for fold, (train_indices, val_indices) in enumerate(kf.split(texts)):
print(f"Training Fold {fold + 1}")
# データセットの作成
train_texts_fold = [texts[i] for i in train_indices]
train_labels_fold = [labels[i] for i in train_indices]
val_texts_fold = [texts[i] for i in val_indices]
val_labels_fold = [labels[i] for i in val_indices]
train_dataset = CustomDataset(train_texts_fold, train_labels_fold)
val_dataset = CustomDataset(val_texts_fold, val_labels_fold)
# トレーニングの設定
# データローダーの作成
# トレーニングループ
for epoch in range(num_epochs):
スケジューラを使う場合
学習率を変化させながら学習させるにはスケジューラを設定する。
# モデルの学習率関連のハイパーパラメータ
max_lr = 0.0001 # 最大学習率
total_training_steps = int(len(train_dataloader)/5) # エポックごとのトータルステップ数
num_warmup_steps = int(total_training_steps/10) # ウォームアップステップ数
# オプティマイザの設定
optimizer = AdamW(model.parameters(), lr=max_lr)
# 学習率スケジューラの設定
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=num_warmup_steps, num_training_steps=total_training_steps)
# トレーニングループ
for epoch in range(num_epochs):
model.train()
total_loss = 0
for batch in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}', leave=False):
optimizer.zero_grad()
inputs = {key: batch[key] for key in batch if key != 'labels'}
outputs = model(**inputs, labels=batch['labels'])
loss = outputs.loss
total_loss += loss.item()
loss.backward()
optimizer.step()
scheduler.step() # 学習率のスケジューリング
Trainer
Trainerは、HuggingFace が提供するライブラリtransformersのクラスです。
コードが見やすいので、コンペティションの終盤ではこちらを利用しました。
基本形
from transformers import BertTokenizer, BertForSequenceClassification, AdamW, Trainer, TrainingArguments
from sklearn.model_selection import train_test_split
import torch
# データセットの分割
train_texts, val_texts, train_labels, val_labels = train_test_split(texts, labels, test_size=0.2)
# BERT トークナイザーの準備
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
# データセットの定義
class CustomDataset(torch.utils.data.Dataset):
def __init__(self, texts, labels):
self.encodings = tokenizer(texts, padding=True, truncation=True, return_tensors='pt')
self.labels = torch.tensor(labels)
def __getitem__(self, idx):
item = {key: val[idx] for key, val in self.encodings.items()}
item['labels'] = self.labels[idx]
return item
def __len__(self):
return len(self.labels)
# 訓練データと検証データの作成
train_dataset = CustomDataset(train_texts, train_labels)
val_dataset = CustomDataset(val_texts, val_labels)
# モデルのロード
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2).to(device)
# トレーニングの設定
training_args = TrainingArguments(
output_dir='./results',
num_train_epochs=3,
per_device_train_batch_size=2,
per_device_eval_batch_size=2,
save_steps=100,
save_total_limit=2,
)
# Trainerの初期化
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
)
# モデルの訓練(進捗バーが表示されます)
trainer.train()
実施例
以下は、コンペティション時の例です。Fベータの値を評価値としています。
# ファインチューニングの設定
training_args = TrainingArguments(
per_device_train_batch_size=16,
output_dir='./output',
num_train_epochs=3,
learning_rate=1e-5,
load_best_model_at_end=True,
metric_for_best_model="f_beta", #
weight_decay=5e-5,
)
# Trainerのcompute_metricsメソッドを定義する
def compute_metrics(pred):
labels = pred.label_ids
preds = pred.predictions.argmax(-1)
f_beta = fbeta_score(labels, preds, beta=7) # betaは適宜調整する
return {"f_beta": f_beta}
# トレーナーを作成してモデルをファインチューニング
trainer = Trainer(
model=custom_model,
args=training_args,
data_collator=None,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_metrics, #
)
trainer.train()
# モデルの評価
results = trainer.evaluate()
print(results)
推論
学習結果のモデルを使って、test_textsの分類を実行する例。以下の例では、softmaxで確率を求めて、閾値でlabelの判定をしています。
# モデルとトークナイザのロード
# 推論用データの読込み
# 学習時と同じデータの整形
# test_textsにデータを格納
test_encodings = tokenizer(test_texts, truncation=True, padding=True, return_tensors='pt')
# PyTorchのデータセットに変換
class CustomDataset(torch.utils.data.Dataset):
def __init__(self, encodings):
self.encodings = encodings
def __getitem__(self, idx):
item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
return item
def __len__(self):
return len(self.encodings.input_ids)
test_dataset = CustomDataset(test_encodings)
# 推論
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=16)
predictions = []
prob =[]
model.to(device)
# 確率の閾値(学習時のLabelに偏りがあった場合、2値分類の閾値を変更する)
threshold = border
for batch in test_dataloader:
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
with torch.no_grad():
outputs = model(input_ids, attention_mask=attention_mask)
logits = outputs.logits
probabilities = torch.softmax(logits, dim=1)
predicted = (probabilities[:, 1] > threshold).int() # 確率が閾値を超える場合に1とする
predictions.extend(predicted.tolist())
prob.extend(probabilities[:, 1] .tolist())
# 予測結果をDataFrameに保存
test_data['predictions'] = predictions
test_data['probabilities'] = prob
# 結果を1と0で出力
test_data['predictions'] = test_data['predictions'].astype(int)
# 結果をCSVファイルに出力
test_data.to_csv('predict.csv', sep=',', columns=['predictions'], header=False, index=True)