BERTモデルを使った分類問題をやってみた(コンペティションでの試行錯誤の備忘録)

SIGNATEのコンペティションで、文章分類にチャレンジしました。結果はあまり良くありませんでしたが、間違っていること、よくなかったことも含めて、残しておきたいと思います。

基本的な処理

GPUの利用

以下の一文で、deviceを指定する。

import torch
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

利用時は、モデルやデータをデバイスに送る。モデルとデータが同じデバイス上にないとエラーになる。

#モデルをデバイスに送る。
model = AutoModel.from_pretrained(model_name)
model.to(device)

#データをデバイスに送る
for batch in test_dataloader:
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    labels = batch['labels'].to(device)

データの読み込み

テーブル形式

テキストデータと、ラベルデータがcsvファイルにある場合には、pandasのpd.read_csvを使う。

import pandas as pd

# 学習データをCSVファイルから読み込む
train = pd.read_csv('train.csv', sep=',', index_col=0)

ファイル形式

学習用データがtextファイルとして個別に格納されており、train_master.csvにはfile_nameとlabelとが入っている場合には、以下のように処理する。

# マスタデータの読み込み
train = pd.read_csv('train_master.csv', sep=',', index_col=0)
train.head()

--
file_name	      label
train_0000.txt	0
train_0001.txt	0
train_0002.txt	1
train_0003.txt	1
train_0004.txt	0

指定したフォルダにある”train_xxxx.text”ファイルをソートして順次読み込み、trainデータフレームに追加します。

import glob

# trainデータの作成
text_train = []
for file_name in np.sort(glob.glob('<train_dataの入っているフォルダ名>/train_*.txt')):
  with open(file_name) as f:
    text = f.read()
  text_train.append(text)
train['text'] = text_train

データの整形

Pandas上で、データの整形を行う。

#label=1の比率を確認。 極端に偏った比率の場合には、別途処理が必要。
border = len(train[train["label"] == 1]) / len(train["label"])

#NaN値を空白文字列で置換
train['text'].fillna('', inplace=True)

#小文字化
train['text'] = train['text'].str.lower()

#文字列の置換
train['text'] = train['text'].replace('model','MODEL')

#不要な列の削除
train = train.drop('dummy', axis=1)

#列の結合
## 2つの列をを結合して新しい特徴量を作成
train['text'] = train['subject']+ ':' + train['body']

#マッピング   labelの yesを1に noを0 に変換
mapping = {"yes": 1, "no": 0}
train['label'] = train['label'].map(mapping)

データの分割

データフレームからデータを取り出す。

# テキストデータとラベルを取り出す。
texts = train['text'].tolist()
labels = train['label'].tolist()

train_test_split

取り出したtextsとlabelsを trainデータとtestデータに分割する。test_size=0.2でtrain:testを8:2の割合で分割する。

labelの比率が極端な場合には、stratify=labelsオプションをつけて、labelの値が均等になるよう分割する

# データの分割 
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(texts, labels, stratify=labels, test_size=0.2, random_state=42)

K-fold交差検証

trainデータをn分割し、交差検証を行う場合。

from sklearn.model_selection import KFold

kf = KFold(n_splits=5, shuffle=True, random_state=42)  # 5分割の例

# K-Fold交差検証の実施
for train_index, test_index in kf.split(texts, labels):

labelの値に大きな偏りがある場合には、StratifiedKFoldを用いることで、可能な限り同じ分布になるように分割される。

from sklearn.model_selection import StratifiedKFold

#StratifiedKFold
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42) 

モデル

modelのロード


from transformers import AutoModel, AutoTokenizer, AutoConfig

# モデル名とトークナイザー名を定義
model_name = "<model>"

# 事前訓練済みのモデルとトークナイザーをロード
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)

modelの保存

学習したモデルを保存する場合。

# modelを学習させた後saveする

model_name = "<保存するモデル名>"
model.save_pretrained(model_name)

学習

PyTorch

基本形

PyTorchを使ったトレーニングの基本形は以下のとおりです。

import torch
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer, BertForSequenceClassification, AdamW
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from tqdm import tqdm

# データの分割
train_texts, val_texts, train_labels, val_labels = train_test_split(texts, labels, test_size=0.2)

# BERT トークナイザーの準備
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# データセットの定義
class CustomDataset(Dataset):
    def __init__(self, texts, labels):
        self.encodings = tokenizer(texts, padding=True, truncation=True, return_tensors='pt')
        self.labels = torch.tensor(labels)

    def __getitem__(self, idx):
        item = {key: val[idx].to(device) for key, val in self.encodings.items()}
        item['labels'] = self.labels[idx].to(device)
        return item

    def __len__(self):
        return len(self.labels)

# 訓練データと検証データの作成
train_dataset = CustomDataset(train_texts, train_labels)
val_dataset = CustomDataset(val_texts, val_labels)

# モデルのロード
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2).to(device)

# トレーニングの設定
optimizer = AdamW(model.parameters(), lr=1e-5)
num_epochs = 3
batch_size = 2

# データローダーの作成   
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# トレーニングループ
for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    for batch in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}', leave=False):
        optimizer.zero_grad()
        inputs = {key: batch[key] for key in batch if key != 'labels'}
        outputs = model(**inputs, labels=batch['labels'])
        loss = outputs.loss
        total_loss += loss.item()
        loss.backward()
        optimizer.step()

    average_loss = total_loss / len(train_loader)
    print(f'Training Loss: {average_loss}')

    # バリデーション
    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for batch in tqdm(val_loader, desc=f'Validation Epoch {epoch + 1}/{num_epochs}', leave=False):
            inputs = {key: batch[key] for key in batch if key != 'labels'}
            outputs = model(**inputs)
            logits = outputs.logits
            preds = torch.argmax(logits, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(batch['labels'].cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    print(f'Validation Accuracy: {accuracy}')

KFold交差検証を行う場合


# BERT トークナイザーの準備

# データセットの定義

# 訓練データと検証データの作成

# モデルのロード

# KFoldでデータを分割
num_splits = 5  # Flod数=5とする
kf = KFold(n_splits=num_splits, shuffle=True, random_state=42)

for fold, (train_indices, val_indices) in enumerate(kf.split(texts)):
    print(f"Training Fold {fold + 1}")

    # データセットの作成
    train_texts_fold = [texts[i] for i in train_indices]
    train_labels_fold = [labels[i] for i in train_indices]
    val_texts_fold = [texts[i] for i in val_indices]
    val_labels_fold = [labels[i] for i in val_indices]

    train_dataset = CustomDataset(train_texts_fold, train_labels_fold)
    val_dataset = CustomDataset(val_texts_fold, val_labels_fold)

        # トレーニングの設定
        # データローダーの作成
        # トレーニングループ
       for epoch in range(num_epochs):
              

スケジューラを使う場合

学習率を変化させながら学習させるにはスケジューラを設定する。

# モデルの学習率関連のハイパーパラメータ
max_lr = 0.0001  # 最大学習率
total_training_steps = int(len(train_dataloader)/5)  # エポックごとのトータルステップ数
num_warmup_steps = int(total_training_steps/10)  # ウォームアップステップ数

# オプティマイザの設定
optimizer = AdamW(model.parameters(), lr=max_lr)
# 学習率スケジューラの設定
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=num_warmup_steps, num_training_steps=total_training_steps)


# トレーニングループ
for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    for batch in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}', leave=False):
        optimizer.zero_grad()
        inputs = {key: batch[key] for key in batch if key != 'labels'}
        outputs = model(**inputs, labels=batch['labels'])
        loss = outputs.loss
        total_loss += loss.item()
        loss.backward()
        optimizer.step()
        scheduler.step()  # 学習率のスケジューリング

Trainer

Trainerは、HuggingFace が提供するライブラリtransformersのクラスです。

コードが見やすいので、コンペティションの終盤ではこちらを利用しました。

基本形

from transformers import BertTokenizer, BertForSequenceClassification, AdamW, Trainer, TrainingArguments
from sklearn.model_selection import train_test_split
import torch

# データセットの分割
train_texts, val_texts, train_labels, val_labels = train_test_split(texts, labels, test_size=0.2)

# BERT トークナイザーの準備
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# データセットの定義
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, texts, labels):
        self.encodings = tokenizer(texts, padding=True, truncation=True, return_tensors='pt')
        self.labels = torch.tensor(labels)

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item['labels'] = self.labels[idx]
        return item

    def __len__(self):
        return len(self.labels)

# 訓練データと検証データの作成
train_dataset = CustomDataset(train_texts, train_labels)
val_dataset = CustomDataset(val_texts, val_labels)

# モデルのロード
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2).to(device)

# トレーニングの設定
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=3,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    save_steps=100,
    save_total_limit=2,
)

# Trainerの初期化
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
)

# モデルの訓練(進捗バーが表示されます)
trainer.train()

実施例

以下は、コンペティション時の例です。Fベータの値を評価値としています。

# ファインチューニングの設定
training_args = TrainingArguments(
  per_device_train_batch_size=16,
  output_dir='./output',
  num_train_epochs=3,
  learning_rate=1e-5,
  load_best_model_at_end=True,       
  metric_for_best_model="f_beta",    #
  weight_decay=5e-5,
)

# Trainerのcompute_metricsメソッドを定義する
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    f_beta = fbeta_score(labels, preds, beta=7)  # betaは適宜調整する
    return {"f_beta": f_beta}

# トレーナーを作成してモデルをファインチューニング
trainer = Trainer(
  model=custom_model,
  args=training_args,
  data_collator=None,
  train_dataset=train_dataset,
  eval_dataset=val_dataset,
  compute_metrics=compute_metrics,      # 
)

trainer.train()


# モデルの評価
results = trainer.evaluate()
print(results)

推論

学習結果のモデルを使って、test_textsの分類を実行する例。以下の例では、softmaxで確率を求めて、閾値でlabelの判定をしています。


# モデルとトークナイザのロード
# 推論用データの読込み
# 学習時と同じデータの整形

# test_textsにデータを格納

test_encodings = tokenizer(test_texts, truncation=True, padding=True, return_tensors='pt')

# PyTorchのデータセットに変換
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, encodings):
        self.encodings = encodings

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        return item

    def __len__(self):
        return len(self.encodings.input_ids)

test_dataset = CustomDataset(test_encodings)

# 推論
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=16)
predictions = []
prob =[]

model.to(device)

# 確率の閾値(学習時のLabelに偏りがあった場合、2値分類の閾値を変更する)
threshold = border

for batch in test_dataloader:
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask)
    logits = outputs.logits
    probabilities = torch.softmax(logits, dim=1)

    predicted = (probabilities[:, 1] > threshold).int()  # 確率が閾値を超える場合に1とする
    predictions.extend(predicted.tolist())

    prob.extend(probabilities[:, 1] .tolist())

# 予測結果をDataFrameに保存
test_data['predictions'] = predictions
test_data['probabilities'] = prob

# 結果を1と0で出力
test_data['predictions'] = test_data['predictions'].astype(int)

# 結果をCSVファイルに出力
test_data.to_csv('predict.csv', sep=',', columns=['predictions'], header=False, index=True)