Strive236

Strive236

生命的起伏要认可
github
zhihu

ML2022春 HW3

はじめに#

今回の HW は画像分類タスクで、食べ物データセット food11 を分類します。具体的なタスクの要件は以下の通りです:

image

達成すべき目標とヒントはそれぞれ以下の通りです:

image

シンプル#

元のコードを実行するだけで、結果は以下の通りです:

image

image

中程度#

トレーニング拡張を行い、トレーニング時間を延長します(より大きな n_epoch を使用)。

トレーニング拡張の具体的な操作は以下の通りです:

train_tfm = transforms.Compose([
    # 画像を固定の形状にリサイズ(高さ = 幅 = 128)
    transforms.Resize((128, 128)),
    # ここにいくつかの変換を追加できます。
    transforms.RandomChoice([
        transforms.RandomRotation((-30,30)),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomVerticalFlip(p=0.5),
        transforms.ColorJitter(brightness=(0.5,1.5), contrast=(0.5, 1.5), saturation=(0.5,1.5), hue=(-0.25, 0.25)),
        transforms.RandomInvert(p=0.5),
        transforms.RandomAffine(degrees=(-30,30), translate=(0.1, 0.1), scale=(0.8, 1.2), shear=(-30, 30)),
        transforms.Grayscale(num_output_channels=3),
    ]),
    # ToTensor()は変換の最後にするべきです。
    transforms.ToTensor(),
])

具体的な説明は以下の通りです:

  • RandomRotation ((-30,30)) 画像をランダムに回転します。(-30, 30):回転角度の範囲(-30 度から + 30 度の間でランダムに選択)。
  • RandomHorizontalFlip (p=0.5) 50% の確率で画像を水平に反転します。p=0.5:実行確率(0.5 は 50% を示します)。
  • RandomVerticalFlip (p=0.5) 50% の確率で画像を垂直に反転します。p の意味は同上。
  • ColorJitter (brightness=(0.5,1.5), contrast=(0.5, 1.5), saturation=(0.5,1.5), hue=(-0.25, 0.25)) 色の属性(明るさ、コントラスト、彩度、色相)をランダムに調整します。brightness=(0.5, 1.5):明るさのスケーリング範囲(0.5 倍〜1.5 倍)。contrast=(0.5, 1.5):コントラストの調整範囲(0.5 倍〜1.5 倍)。saturation=(0.5, 1.5):彩度の調整範囲(0.5 倍〜1.5 倍)。hue=(-0.25, 0.25):色相のオフセット範囲(-0.25〜+0.25、色相環の - 90 度〜+90 度に対応)。
  • RandomInvert (p=0.5) 50% の確率で色を反転します(色を反転させる、例えば黒が白に、赤が青に)。p の意味は同上。
  • RandomAffine (degrees=(-30,30), translate=(0.1, 0.1), scale=(0.8, 1.2), shear=(-30, 30)) ランダムなアフィン変換(回転、平行移動、スケーリング、せん断)。degrees=(-30, 30):回転角度の範囲。translate=(0.1, 0.1):水平方向と垂直方向の最大平行移動比率(画像サイズの 10%)。scale=(0.8, 1.2):スケーリング範囲(0.8 倍〜1.2 倍)。shear=(-30, 30):せん断角度の範囲(-30 度〜+30 度)。せん断(Shear)は線形幾何変換で、画像の一部を傾けることで「傾斜変形」効果をシミュレートします。
  • Grayscale (num_output_channels=3) 画像をグレースケールに変換しますが、3 チャンネルを保持します(RGB 形式、各チャンネルの値は同じ)。num_output_channels=3:出力チャンネル数(3 は 3 チャンネルのグレースケール画像を生成し、モデルの入力に互換性があります)。カラー画像(RGB)をグレースケール画像に変換することは、本質的には 3 つのチャンネルの輝度情報を加重平均で合成し、単一チャンネルの画像を生成することです。

トレーニング時間を 90 エポックに延長し、結果は以下の通りです:

image

1747311545845

注意すべき点は、GPU の利用率が非常に低いことがわかり、問題はデータを読み込む際のトランスフォームの強化とディスク IO の時間が長すぎることにあるため、トレーニング時間が非常に遅くなっています。DataLoader の並行性を調整する必要があります。

ここでは 12 の並行性を使用しました(画像強化の複雑さによります)、persistent_workers=True を設定してプロセスの再生成 / 破棄を避け、オーバーヘッドを減らします。テストでは 8 の並行性を使用しました。改善後、1 エポックのトレーニング時間は大幅に短縮され、ここで使用しているのはローカルコンピュータで、デバイスは RTX4070 ラップトップです。

同時にバッチサイズを 128 に変更しました。

強い#

まず、モデルの構造を変更し、resnet18 と resnet34 を学び、resnet34 を基に少し変更しました:

class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)

        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.relu = nn.ReLU(inplace=True)

        self.downsample = None
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, 1, stride, 0),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(identity)

        out += identity
        out = self.relu(out)
        
        return out

class Classifier(nn.Module):
    def __init__(self):
        super(Classifier, self).__init__()
        # torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        # torch.nn.MaxPool2d(kernel_size, stride, padding)
        # 入力の次元 [3, 128, 128]
        # 初期の畳み込み層
        self.conv1 = nn.Conv2d(3, 64, kernel_size=5, stride=2, padding=2, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)

        self.block1 = BasicBlock(64, 64, 1) # [64, 64, 64]

        self.block2 = BasicBlock(64, 64) # [64, 64, 64]

        self.block3 = BasicBlock(64, 64) # [64, 64, 64]

        self.block4 = BasicBlock(64, 128, 2) # [128, 32, 32]

        self.block5 = BasicBlock(128, 128) # [128, 32, 32]

        self.block6 = BasicBlock(128, 128) # [128, 32, 32]

        self.block7 = BasicBlock(128, 128) # [128, 32, 32]

        self.block8 = BasicBlock(128, 256, 2) # [256, 16, 16]

        self.block9 = BasicBlock(256, 256) # [256, 16, 16]

        self.block10 = BasicBlock(256, 256) # [256, 16, 16]

        self.block11 = BasicBlock(256, 256) # [256, 16, 16]

        self.block12 = BasicBlock(256, 256) # [256, 16, 16]

        self.block13 = BasicBlock(256, 256) # [256, 16, 16]

        self.block14 = BasicBlock(256, 512, 2) # [512, 8, 8]

        self.block15 = BasicBlock(512, 512) # [512, 8, 8]

        self.block16 = BasicBlock(512, 512) # [512, 8, 8]

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))

        self.fc = nn.Linear(512, 11)

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.block1(out)
        out = self.block2(out)
        out = self.block3(out)
        out = self.block4(out)
        out = self.block5(out)
        out = self.block6(out)
        out = self.block7(out)
        out = self.block8(out)
        out = self.block9(out)
        out = self.block10(out)
        out = self.block11(out)
        out = self.block12(out)
        out = self.block13(out)
        out = self.block14(out)
        out = self.block15(out)
        out = self.block16(out)

        out = self.avgpool(out)

        out = out.view(out.size()[0], -1)
        return self.fc(out)

次にクロスバリデーションとアンサンブルを行いました:

クロスバリデーションは 5 分割交差検証を採用し、元のトレーニングセットと検証セットを統合してから 5 分割交差検証を行いました:

# "cuda"はGPUが利用可能な場合のみ。
device = "cuda" if torch.cuda.is_available() else "cpu"

# トレーニングエポック数と忍耐。
n_epochs = 200
patience = 50 # 'patience'エポックで改善がない場合、早期停止

import numpy as np
from sklearn.model_selection import KFold

from torch.utils.tensorboard import SummaryWriter
import datetime

# 5分割交差検証を初期化
n_folds = 5
kf = KFold(n_splits=n_folds, shuffle=True, random_state=42)

# 完全なトレーニングセットを読み込む(交差検証用)
train_set = FoodDataset(os.path.join(_dataset_dir, "training"), tfm=train_tfm)
valid_set = FoodDataset(os.path.join(_dataset_dir, "validation"), tfm=train_tfm)

# データセットを統合
combined_files = train_set.files + valid_set.files
full_dataset = FoodDataset(path="", tfm=train_tfm, files=combined_files)

oof_preds = np.zeros(len(full_dataset))  # OOF予測結果を保存
oof_labels = np.zeros(len(full_dataset)) # 実際のラベルを保存

# すべてのベースモデルを保存(後のアンサンブル用)
base_models = [] 

timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
log_dir = f"runs/food_classification_{timestamp}"
writer = SummaryWriter()

for fold, (train_idx, val_idx) in enumerate(kf.split(train_set)):
    print(f"\n====== Fold {fold+1}/{n_folds} ======")
    
    # トレーニングセットと検証サブセットを分割
    train_subset = Subset(train_set, train_idx)
    val_subset = Subset(train_set, val_idx)
    
    # DataLoader
    train_loader = DataLoader(
        train_subset, 
        batch_size=batch_size, 
        shuffle=True, 
        num_workers=12,
        pin_memory=True,
        persistent_workers=True
    )
    val_loader = DataLoader(
        val_subset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=8,
        pin_memory=True,
        persistent_workers=True
    )

    # 各折で独立にモデルとオプティマイザを初期化
    model = Classifier().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0003, weight_decay=1e-5)
    criterion = nn.CrossEntropyLoss()
    
    # 早期停止関連の変数(各折で独立)
    fold_best_acc = 0
    stale = 0

    # トレーニングループ(元のロジックを保持)
    for epoch in range(n_epochs):
        # ---------- トレーニング ----------
        model.train()
        train_loss, train_accs = [], []
        
        for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
            imgs, labels = batch
            imgs, labels = imgs.to(device), labels.to(device)
            
            logits = model(imgs)
            loss = criterion(logits, labels)
            
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=10)
            optimizer.step()
            
            acc = (logits.argmax(dim=-1) == labels).float().mean()
            train_loss.append(loss.item())
            train_accs.append(acc.item())
        
        # トレーニング情報を印刷
        avg_loss = np.mean(train_loss)
        avg_acc = np.mean(train_accs)

        # TensorBoardに書き込み
        writer.add_scalar(f'Fold_{fold}/Train/Loss', avg_loss, epoch)
        writer.add_scalar(f'Fold_{fold}/Train/Accuracy', avg_acc, epoch)

        print(f"[ Train | {epoch+1:03d}/{n_epochs:03d} ] loss = {avg_loss:.5f}, acc = {avg_acc:.5f}")

        # ---------- 検証 ----------
        model.eval()
        val_loss, val_accs, val_preds = [], [], []
        val_labels = []  # すべての検証バッチのラベルを蓄積

        for batch in tqdm(val_loader, desc="Validating"):
            imgs, labels = batch
            imgs = imgs.to(device)
            labels_np = labels.numpy()
            val_labels.extend(labels_np)  # ラベルを蓄積
            
            with torch.no_grad():
                logits = model(imgs)
                preds = logits.argmax(dim=-1).cpu().numpy()
            
            loss = criterion(logits, labels.to(device))
            val_loss.append(loss.item())
            val_accs.append((preds == labels_np).mean())
            val_preds.extend(preds)

        # OOF予測とラベルを記録
        oof_preds[val_idx] = np.array(val_preds)
        oof_labels[val_idx] = np.array(val_labels) 

        # 検証情報を印刷
        avg_val_loss = np.mean(val_loss)
        avg_val_acc = np.mean(val_accs)

        # TensorBoardに書き込み
        writer.add_scalar(f'Fold_{fold}/Val/Loss', avg_val_loss, epoch)
        writer.add_scalar(f'Fold_{fold}/Val/Accuracy', avg_val_acc, epoch)

        print(f"[ Valid | {epoch+1:03d}/{n_epochs:03d} ] loss = {avg_val_loss:.5f}, acc = {avg_val_acc:.5f}")

        # 早期停止ロジック(各折で独立)
        if avg_val_acc > fold_best_acc:
            print(f"Fold {fold} best model at epoch {epoch}")
            torch.save(model.state_dict(), f"fold{fold}_best.ckpt")
            fold_best_acc = avg_val_acc
            stale = 0
        else:
            stale += 1
            if stale > patience:
                print(f"Early stopping at epoch {epoch}")
                break
    
    # 現在の折のモデルを保存
    base_models.append(model)

# TensorBoardのwriterを閉じる
writer.close()

# ---------- 後処理 ----------
# OOF精度を計算
oof_acc = (oof_preds == oof_labels).mean()
print(f"\n[OOF Accuracy] {oof_acc:.4f}")

5 つのベースモデルを保存した後、テスト部分でアンサンブルを使用します:

# アンサンブル予測(ソフトボーティング法)
all_preds = []
for model in base_models:
    model.eval()
    fold_preds = []
    for data, _ in test_loader:  # 元のtest_loaderと一致させる
        with torch.no_grad():
            logits = model(data.to(device))
            # 各モデルの元のlogits(確率)を保存し、直接argmaxを取らない
            fold_preds.append(logits.cpu().numpy())
    # 現在のモデルのすべてのバッチ予測結果を統合
    fold_preds = np.concatenate(fold_preds, axis=0)
    all_preds.append(fold_preds)

# ソフトボーティング:すべてのモデルのlogitsを平均してargmaxを取る
all_preds = np.stack(all_preds)  # shape: (n_models, n_samples, n_classes)
prediction = all_preds.mean(axis=0).argmax(axis=1)  # shape: (n_samples,)

最終結果は以下の通りです:

1747886535400

すでにボスに非常に近づいています。

時間の都合でボスは作成しませんでしたが、後で時間があれば戻って補完します。

2 つの report_problem はデータ拡張と残差ネットワークの設計であり、中程度と強いのプロセスの中で言及されているため、追加で説明はありません。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。