はじめに#
今回の HW は画像分類タスクで、食べ物データセット food11 を分類します。具体的なタスクの要件は以下の通りです:
達成すべき目標とヒントはそれぞれ以下の通りです:
シンプル#
元のコードを実行するだけで、結果は以下の通りです:
中程度#
トレーニング拡張を行い、トレーニング時間を延長します(より大きな n_epoch を使用)。
トレーニング拡張の具体的な操作は以下の通りです:
train_tfm = transforms.Compose([
# 画像を固定の形状にリサイズ(高さ = 幅 = 128)
transforms.Resize((128, 128)),
# ここにいくつかの変換を追加できます。
transforms.RandomChoice([
transforms.RandomRotation((-30,30)),
transforms.RandomHorizontalFlip(p=0.5),
transforms.RandomVerticalFlip(p=0.5),
transforms.ColorJitter(brightness=(0.5,1.5), contrast=(0.5, 1.5), saturation=(0.5,1.5), hue=(-0.25, 0.25)),
transforms.RandomInvert(p=0.5),
transforms.RandomAffine(degrees=(-30,30), translate=(0.1, 0.1), scale=(0.8, 1.2), shear=(-30, 30)),
transforms.Grayscale(num_output_channels=3),
]),
# ToTensor()は変換の最後にするべきです。
transforms.ToTensor(),
])
具体的な説明は以下の通りです:
- RandomRotation ((-30,30)) 画像をランダムに回転します。(-30, 30):回転角度の範囲(-30 度から + 30 度の間でランダムに選択)。
- RandomHorizontalFlip (p=0.5) 50% の確率で画像を水平に反転します。p=0.5:実行確率(0.5 は 50% を示します)。
- RandomVerticalFlip (p=0.5) 50% の確率で画像を垂直に反転します。p の意味は同上。
- ColorJitter (brightness=(0.5,1.5), contrast=(0.5, 1.5), saturation=(0.5,1.5), hue=(-0.25, 0.25)) 色の属性(明るさ、コントラスト、彩度、色相)をランダムに調整します。brightness=(0.5, 1.5):明るさのスケーリング範囲(0.5 倍〜1.5 倍)。contrast=(0.5, 1.5):コントラストの調整範囲(0.5 倍〜1.5 倍)。saturation=(0.5, 1.5):彩度の調整範囲(0.5 倍〜1.5 倍)。hue=(-0.25, 0.25):色相のオフセット範囲(-0.25〜+0.25、色相環の - 90 度〜+90 度に対応)。
- RandomInvert (p=0.5) 50% の確率で色を反転します(色を反転させる、例えば黒が白に、赤が青に)。p の意味は同上。
- RandomAffine (degrees=(-30,30), translate=(0.1, 0.1), scale=(0.8, 1.2), shear=(-30, 30)) ランダムなアフィン変換(回転、平行移動、スケーリング、せん断)。degrees=(-30, 30):回転角度の範囲。translate=(0.1, 0.1):水平方向と垂直方向の最大平行移動比率(画像サイズの 10%)。scale=(0.8, 1.2):スケーリング範囲(0.8 倍〜1.2 倍)。shear=(-30, 30):せん断角度の範囲(-30 度〜+30 度)。せん断(Shear)は線形幾何変換で、画像の一部を傾けることで「傾斜変形」効果をシミュレートします。
- Grayscale (num_output_channels=3) 画像をグレースケールに変換しますが、3 チャンネルを保持します(RGB 形式、各チャンネルの値は同じ)。num_output_channels=3:出力チャンネル数(3 は 3 チャンネルのグレースケール画像を生成し、モデルの入力に互換性があります)。カラー画像(RGB)をグレースケール画像に変換することは、本質的には 3 つのチャンネルの輝度情報を加重平均で合成し、単一チャンネルの画像を生成することです。
トレーニング時間を 90 エポックに延長し、結果は以下の通りです:
注意すべき点は、GPU の利用率が非常に低いことがわかり、問題はデータを読み込む際のトランスフォームの強化とディスク IO の時間が長すぎることにあるため、トレーニング時間が非常に遅くなっています。DataLoader の並行性を調整する必要があります。
ここでは 12 の並行性を使用しました(画像強化の複雑さによります)、persistent_workers=True を設定してプロセスの再生成 / 破棄を避け、オーバーヘッドを減らします。テストでは 8 の並行性を使用しました。改善後、1 エポックのトレーニング時間は大幅に短縮され、ここで使用しているのはローカルコンピュータで、デバイスは RTX4070 ラップトップです。
同時にバッチサイズを 128 に変更しました。
強い#
まず、モデルの構造を変更し、resnet18 と resnet34 を学び、resnet34 を基に少し変更しました:
class BasicBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=1):
super().__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False)
self.bn1 = nn.BatchNorm2d(out_channels)
self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False)
self.bn2 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
self.downsample = None
if stride != 1 or in_channels != out_channels:
self.downsample = nn.Sequential(
nn.Conv2d(in_channels, out_channels, 1, stride, 0),
nn.BatchNorm2d(out_channels)
)
def forward(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity = self.downsample(identity)
out += identity
out = self.relu(out)
return out
class Classifier(nn.Module):
def __init__(self):
super(Classifier, self).__init__()
# torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
# torch.nn.MaxPool2d(kernel_size, stride, padding)
# 入力の次元 [3, 128, 128]
# 初期の畳み込み層
self.conv1 = nn.Conv2d(3, 64, kernel_size=5, stride=2, padding=2, bias=False)
self.bn1 = nn.BatchNorm2d(64)
self.relu = nn.ReLU(inplace=True)
self.block1 = BasicBlock(64, 64, 1) # [64, 64, 64]
self.block2 = BasicBlock(64, 64) # [64, 64, 64]
self.block3 = BasicBlock(64, 64) # [64, 64, 64]
self.block4 = BasicBlock(64, 128, 2) # [128, 32, 32]
self.block5 = BasicBlock(128, 128) # [128, 32, 32]
self.block6 = BasicBlock(128, 128) # [128, 32, 32]
self.block7 = BasicBlock(128, 128) # [128, 32, 32]
self.block8 = BasicBlock(128, 256, 2) # [256, 16, 16]
self.block9 = BasicBlock(256, 256) # [256, 16, 16]
self.block10 = BasicBlock(256, 256) # [256, 16, 16]
self.block11 = BasicBlock(256, 256) # [256, 16, 16]
self.block12 = BasicBlock(256, 256) # [256, 16, 16]
self.block13 = BasicBlock(256, 256) # [256, 16, 16]
self.block14 = BasicBlock(256, 512, 2) # [512, 8, 8]
self.block15 = BasicBlock(512, 512) # [512, 8, 8]
self.block16 = BasicBlock(512, 512) # [512, 8, 8]
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(512, 11)
def forward(self, x):
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.block1(out)
out = self.block2(out)
out = self.block3(out)
out = self.block4(out)
out = self.block5(out)
out = self.block6(out)
out = self.block7(out)
out = self.block8(out)
out = self.block9(out)
out = self.block10(out)
out = self.block11(out)
out = self.block12(out)
out = self.block13(out)
out = self.block14(out)
out = self.block15(out)
out = self.block16(out)
out = self.avgpool(out)
out = out.view(out.size()[0], -1)
return self.fc(out)
次にクロスバリデーションとアンサンブルを行いました:
クロスバリデーションは 5 分割交差検証を採用し、元のトレーニングセットと検証セットを統合してから 5 分割交差検証を行いました:
# "cuda"はGPUが利用可能な場合のみ。
device = "cuda" if torch.cuda.is_available() else "cpu"
# トレーニングエポック数と忍耐。
n_epochs = 200
patience = 50 # 'patience'エポックで改善がない場合、早期停止
import numpy as np
from sklearn.model_selection import KFold
from torch.utils.tensorboard import SummaryWriter
import datetime
# 5分割交差検証を初期化
n_folds = 5
kf = KFold(n_splits=n_folds, shuffle=True, random_state=42)
# 完全なトレーニングセットを読み込む(交差検証用)
train_set = FoodDataset(os.path.join(_dataset_dir, "training"), tfm=train_tfm)
valid_set = FoodDataset(os.path.join(_dataset_dir, "validation"), tfm=train_tfm)
# データセットを統合
combined_files = train_set.files + valid_set.files
full_dataset = FoodDataset(path="", tfm=train_tfm, files=combined_files)
oof_preds = np.zeros(len(full_dataset)) # OOF予測結果を保存
oof_labels = np.zeros(len(full_dataset)) # 実際のラベルを保存
# すべてのベースモデルを保存(後のアンサンブル用)
base_models = []
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
log_dir = f"runs/food_classification_{timestamp}"
writer = SummaryWriter()
for fold, (train_idx, val_idx) in enumerate(kf.split(train_set)):
print(f"\n====== Fold {fold+1}/{n_folds} ======")
# トレーニングセットと検証サブセットを分割
train_subset = Subset(train_set, train_idx)
val_subset = Subset(train_set, val_idx)
# DataLoader
train_loader = DataLoader(
train_subset,
batch_size=batch_size,
shuffle=True,
num_workers=12,
pin_memory=True,
persistent_workers=True
)
val_loader = DataLoader(
val_subset,
batch_size=batch_size,
shuffle=False,
num_workers=8,
pin_memory=True,
persistent_workers=True
)
# 各折で独立にモデルとオプティマイザを初期化
model = Classifier().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0003, weight_decay=1e-5)
criterion = nn.CrossEntropyLoss()
# 早期停止関連の変数(各折で独立)
fold_best_acc = 0
stale = 0
# トレーニングループ(元のロジックを保持)
for epoch in range(n_epochs):
# ---------- トレーニング ----------
model.train()
train_loss, train_accs = [], []
for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
imgs, labels = batch
imgs, labels = imgs.to(device), labels.to(device)
logits = model(imgs)
loss = criterion(logits, labels)
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=10)
optimizer.step()
acc = (logits.argmax(dim=-1) == labels).float().mean()
train_loss.append(loss.item())
train_accs.append(acc.item())
# トレーニング情報を印刷
avg_loss = np.mean(train_loss)
avg_acc = np.mean(train_accs)
# TensorBoardに書き込み
writer.add_scalar(f'Fold_{fold}/Train/Loss', avg_loss, epoch)
writer.add_scalar(f'Fold_{fold}/Train/Accuracy', avg_acc, epoch)
print(f"[ Train | {epoch+1:03d}/{n_epochs:03d} ] loss = {avg_loss:.5f}, acc = {avg_acc:.5f}")
# ---------- 検証 ----------
model.eval()
val_loss, val_accs, val_preds = [], [], []
val_labels = [] # すべての検証バッチのラベルを蓄積
for batch in tqdm(val_loader, desc="Validating"):
imgs, labels = batch
imgs = imgs.to(device)
labels_np = labels.numpy()
val_labels.extend(labels_np) # ラベルを蓄積
with torch.no_grad():
logits = model(imgs)
preds = logits.argmax(dim=-1).cpu().numpy()
loss = criterion(logits, labels.to(device))
val_loss.append(loss.item())
val_accs.append((preds == labels_np).mean())
val_preds.extend(preds)
# OOF予測とラベルを記録
oof_preds[val_idx] = np.array(val_preds)
oof_labels[val_idx] = np.array(val_labels)
# 検証情報を印刷
avg_val_loss = np.mean(val_loss)
avg_val_acc = np.mean(val_accs)
# TensorBoardに書き込み
writer.add_scalar(f'Fold_{fold}/Val/Loss', avg_val_loss, epoch)
writer.add_scalar(f'Fold_{fold}/Val/Accuracy', avg_val_acc, epoch)
print(f"[ Valid | {epoch+1:03d}/{n_epochs:03d} ] loss = {avg_val_loss:.5f}, acc = {avg_val_acc:.5f}")
# 早期停止ロジック(各折で独立)
if avg_val_acc > fold_best_acc:
print(f"Fold {fold} best model at epoch {epoch}")
torch.save(model.state_dict(), f"fold{fold}_best.ckpt")
fold_best_acc = avg_val_acc
stale = 0
else:
stale += 1
if stale > patience:
print(f"Early stopping at epoch {epoch}")
break
# 現在の折のモデルを保存
base_models.append(model)
# TensorBoardのwriterを閉じる
writer.close()
# ---------- 後処理 ----------
# OOF精度を計算
oof_acc = (oof_preds == oof_labels).mean()
print(f"\n[OOF Accuracy] {oof_acc:.4f}")
5 つのベースモデルを保存した後、テスト部分でアンサンブルを使用します:
# アンサンブル予測(ソフトボーティング法)
all_preds = []
for model in base_models:
model.eval()
fold_preds = []
for data, _ in test_loader: # 元のtest_loaderと一致させる
with torch.no_grad():
logits = model(data.to(device))
# 各モデルの元のlogits(確率)を保存し、直接argmaxを取らない
fold_preds.append(logits.cpu().numpy())
# 現在のモデルのすべてのバッチ予測結果を統合
fold_preds = np.concatenate(fold_preds, axis=0)
all_preds.append(fold_preds)
# ソフトボーティング:すべてのモデルのlogitsを平均してargmaxを取る
all_preds = np.stack(all_preds) # shape: (n_models, n_samples, n_classes)
prediction = all_preds.mean(axis=0).argmax(axis=1) # shape: (n_samples,)
最終結果は以下の通りです:
すでにボスに非常に近づいています。
時間の都合でボスは作成しませんでしたが、後で時間があれば戻って補完します。
2 つの report_problem はデータ拡張と残差ネットワークの設計であり、中程度と強いのプロセスの中で言及されているため、追加で説明はありません。