引言:什么是传输训练及其重要性
传输训练(Transfer Learning)是机器学习领域的一项革命性技术,它允许我们将从一个任务中学到的知识应用到另一个相关但不同的任务上。对于新手来说,理解传输训练的核心概念是迈向深度学习精通之路的关键一步。
想象一下,你已经学会了骑自行车,那么学习骑摩托车就会容易得多,因为平衡、转向等基本技能是可以”传输”的。传输训练在神经网络中扮演着类似的角色:我们利用在大规模数据集(如ImageNet)上预训练的模型,将其知识迁移到我们自己的数据集上,即使我们的数据集规模较小。
为什么传输训练如此重要?
- 数据效率:许多实际问题缺乏足够的标注数据来训练深度神经网络
- 计算资源:从头训练大型模型需要巨大的计算成本
- 训练速度:微调预训练模型比从头训练快得多
- 性能提升:通常能获得比从头训练更好的结果
第一部分:传输训练的核心概念与数学基础
1.1 传输训练的基本范式
传输训练主要分为四个阶段:
- 预训练(Pre-training):在大规模数据集上训练基础模型
- 特征提取(Feature Extraction):使用预训练模型作为特征提取器
- 微调(Fine-tuning):调整预训练模型的权重以适应新任务
- 领域自适应(Domain Adaptation):处理源域和目标域分布不同的情况
1.2 数学基础:优化目标
传输训练的优化目标可以表示为:
\[ \mathstrut\mathop{\mathrm{argmin}}\limits_{\theta} \mathcal{L}_{\text{target}}(\theta) + \lambda \cdot \mathcal{R}(\theta) \]
其中:
- \(\mathcal{L}_{\text{target}}\) 是目标任务的损失函数
- \(\mathcal{R}\) 是正则化项,通常使用源模型的权重作为先验
- \(\lambda\) 是权衡参数
1.3 知识迁移的三种主要方式
- 基于实例的迁移(Instance-based):重用源域中的部分实例
- 基于特征的迁移(Feature-based):学习可迁移的特征表示
- 基于模型的迁移(Model-based):重用源域模型的部分结构和参数
第二部分:PyTorch实战:从零实现传输训练
2.1 环境准备
首先安装必要的库:
pip install torch torchvision torchaudio
pip install matplotlib numpy pillow
2.2 基础示例:使用ResNet进行图像分类
让我们通过一个完整的例子来展示如何使用PyTorch实现传输训练。我们将使用预训练的ResNet-18模型,并将其应用于一个自定义的花卉分类任务。
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy
# 设置随机种子以确保结果可复现
torch.manual_seed(42)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
2.3 数据准备与预处理
# 定义数据变换
data_transforms = {
'train': transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
'val': transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
}
# 加载数据(假设数据按以下目录结构组织)
# data/
# train/
# class1/
# class2/
# val/
# class1/
# class2/
data_dir = 'data'
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
data_transforms[x])
for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=4,
shuffle=True, num_workers=4)
for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes
print(f"训练集大小: {dataset_sizes['train']}")
print(f"验证集大小: {dataset_sizes['val']}")
print(f"类别: {class_names}")
2.4 模型构建与初始化
# 加载预训练的ResNet-18模型
model = models.resnet18(pretrained=True)
# 冻结所有卷积层参数(特征提取模式)
for param in model.parameters():
param.requires_grad = False
# 替换最后的全连接层以适应我们的类别数
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, len(class_names))
# 将模型移动到GPU(如果可用)
model = model.to(device)
print("模型结构:")
print(model)
2.5 训练函数实现
def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
since = time.time()
best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0
for epoch in range(num_epochs):
print(f'Epoch {epoch}/{num_epochs - 1}')
print('-' * 10)
# 每个epoch都有一个训练和验证阶段
for phase in ['train', 'val']:
if phase == 'train':
model.train() # 设置模型为训练模式
else:
model.eval() # 设置模型为评估模式
running_loss = 0.0
running_corrects = 0
# 遍历数据
for inputs, labels in dataloaders[phase]:
inputs = inputs.to(device)
labels = labels.to(device)
# 梯度清零
optimizer.zero_grad()
# 前向传播
# 只在训练阶段跟踪历史记录
with torch.set_grad_enabled(phase == 'train'):
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)
# 只在训练阶段进行反向传播和优化
if phase == 'train':
loss.backward()
optimizer.step()
# 统计
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
if phase == 'train' and scheduler:
scheduler.step()
epoch_loss = running_loss / dataset_sizes[phase]
epoch_acc = running_corrects.double() / dataset_sizes[phase]
print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
# 深度拷贝模型
if phase == 'val' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = copy.deepcopy(model.state_dict())
print()
time_elapsed = time.time() - since
print(f'训练完成,耗时 {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
print(f'最佳验证准确率: {best_acc:.4f}')
# 加载最佳模型权重
model.load_state_dict(best_model_wts)
return model
2.6 训练执行
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
# 只优化最后一层的参数(因为我们冻结了其他层)
optimizer = optim.SGD(model.fc.parameters(), lr=0.001, momentum=0.9)
# 学习率调度器
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
# 开始训练
model = train_model(model, criterion, optimizer, exp_lr_scheduler, num_epochs=25)
2.7 可视化预测结果
def visualize_model(model, num_images=6):
was_training = model.training
model.eval()
images_so_far = 0
fig = plt.figure()
with torch.no_grad():
for i, (inputs, labels) in enumerate(dataloaders['val']):
inputs = inputs.to(device)
labels = labels.to(device)
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
for j in range(inputs.size()[0]):
images_so_far += 1
ax = plt.subplot(num_images//2, 2, images_so_far)
ax.axis('off')
ax.set_title(f'predicted: {class_names[preds[j]]}')
# 显示图像(需要反归一化)
img = inputs.cpu().data[j].numpy().transpose((1, 2, 0))
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
img = std * img + mean
img = np.clip(img, 0, 1)
ax.imshow(img)
if images_so_far == num_images:
model.train(mode=was_training)
return
model.train(mode=was_training)
# 调用可视化函数
visualize_model(model)
plt.show()
第三部分:高级传输训练技术
3.1 完全微调 vs 特征提取
在实际应用中,我们有两种主要策略:
- 特征提取:只训练最后一层(如上面的例子)
- 完全微调:解冻所有层并以较低学习率训练
# 完全微调示例
model_ft = models.resnet18(pretrained=True)
num_ftrs = model_ft.fc.in_features
model_ft.fc = nn.Linear(num_ftrs, len(class_names))
model_ft = model_ft.to(device)
# 解冻所有层
for param in model_ft.parameters():
param.requires_grad = True
# 使用更小的学习率
optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.0001, momentum=0.9)
3.2 差分学习率
对于完全微调,我们可以为不同层设置不同的学习率:
# 为不同层设置不同学习率
def get_optimizer_with_differential_lr(model, base_lr=0.0001):
# 最后一层使用较高的学习率
# 其他层使用较低的学习率
optimizer = optim.SGD([
{'params': model.fc.parameters(), 'lr': base_lr * 10},
{'params': model.layer4.parameters(), 'lr': base_lr},
{'params': model.layer3.parameters(), 'lr': base_lr * 0.1},
{'params': model.layer2.parameters(), 'lr': base_lr * 0.01},
{'params': model.layer1.parameters(), 'lr': base_lr * 0.001},
{'params': model.bn1.parameters(), 'lr': base_lr * 0.001},
{'params': model.conv1.parameters(), 'lr': base_lr * 0.001},
], momentum=0.9)
return optimizer
3.3 使用不同的预训练模型
PyTorch提供了多种预训练模型,选择合适的模型很重要:
# 比较不同模型
models_dict = {
'resnet18': models.resnet18(pretrained=True),
'resnet50': models.resnet50(pretrained=True),
'vgg16': models.vgg16(pretrained=True),
'densenet121': models.densenet121(pretrained=True),
'mobilenet_v2': models.mobilenet_v2(pretrained=True),
}
# 每种模型的输入尺寸可能不同,需要调整
model_names = ['resnet18', 'resnet50', 'resnet101', 'resnet152']
for name in model_names:
model = getattr(models, name)(pretrained=True)
print(f"{name}: {model.fc.in_features} features")
3.4 数据增强策略
强大的数据增强可以显著提升传输训练效果:
# 高级数据增强
advanced_transforms = {
'train': transforms.Compose([
transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
transforms.RandomHorizontalFlip(p=0.5),
transforms.RandomVerticalFlip(p=0.1),
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
transforms.RandomRotation(15),
transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
transforms.GaussianBlur(kernel_size=3, sigma=(0.1, 2.0)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
# 随机擦除
transforms.RandomErasing(p=0.1, scale=(0.02, 0.33), ratio=(0.3, 3.3))
]),
'val': transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
}
第四部分:传输训练的常见问题与解决方案
4.1 过拟合问题
问题:当目标数据集很小时,模型容易过拟合。
解决方案:
- 使用更强的正则化
- 早期停止
- 增加数据增强
- 使用Dropout
# 在模型中添加Dropout
class TransferModelWithDropout(nn.Module):
def __init__(self, base_model, num_classes, dropout_rate=0.5):
super().__init__()
self.base = nn.Sequential(*list(base_model.children())[:-1]) # 移除原始fc层
self.dropout = nn.Dropout(dropout_rate)
self.fc = nn.Linear(base_model.fc.in_features, num_classes)
def forward(self, x):
x = self.base(x)
x = torch.flatten(x, 1)
x = self.dropout(x)
x = self.fc(x)
return x
# 早期停止实现
class EarlyStopping:
def __init__(self, patience=7, min_delta=0):
self.patience = patience
self.min_delta = min_delta
self.counter = 0
self.best_loss = None
self.early_stop = False
def __call__(self, val_loss):
if self.best_loss is None:
self.best_loss = val_loss
elif val_loss > self.best_loss - self.min_delta:
self.counter += 1
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_loss = val_loss
self.counter = 0
4.2 类别不平衡问题
问题:目标数据集类别分布不均衡。
解决方案:使用加权损失函数
# 计算类别权重
def calculate_class_weights(dataset):
class_counts = {}
for _, label in dataset:
class_counts[label] = class_counts.get(label, 0) + 1
total = sum(class_counts.values())
weights = [total / (len(class_counts) * class_counts[i])
for i in range(len(class_counts))]
return torch.FloatTensor(weights)
# 使用加权损失
class_weights = calculate_class_weights(image_datasets['train']).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)
4.3 领域漂移问题
问题:源域和目标域数据分布差异大。
解决方案:领域自适应技术
# 简单的领域自适应损失(MMD近似)
class DomainAdaptationLoss:
def __init__(self, alpha=0.1):
self.alpha = alpha
def __call__(self, source_features, target_features):
# 计算源域和目标域特征的MMD距离
source_mean = source_features.mean(0)
target_mean = target_features.mean(0)
mmd_loss = torch.norm(source_mean - target_mean, p=2)
return self.alpha * mmd_loss
# 在训练循环中使用
domain_criterion = DomainAdaptationLoss(alpha=0.1)
# 在训练循环中:
# outputs = model(inputs)
# features = model.base(inputs) # 获取特征
# loss = criterion(outputs, labels) + domain_criterion(features, target_features)
第五部分:传输训练的高级主题
5.1 自监督预训练
自监督学习是传输训练的前沿方向:
# 简单的自监督预训练示例:旋转预测
class RotationPredictionModel(nn.Module):
def __init__(self, base_model, num_classes):
super().__init__()
self.base = base_model
self.rotation_classifier = nn.Linear(num_classes, 4) # 4个旋转角度
def forward(self, x, rotation_label=None):
features = self.base(x)
rotation_logits = self.rotation_classifier(features)
if rotation_label is not None:
# 训练模式:返回旋转预测损失
return nn.CrossEntropyLoss()(rotation_logits, rotation_label)
else:
# 推理模式:返回旋转预测
return torch.argmax(rotation_logits, dim=1)
# 数据准备:创建旋转图像
def create_rotated_batch(batch):
rotated_batch = []
rotation_labels = []
for img in batch:
for angle in [0, 90, 180, 270]:
rotated_img = transforms.functional.rotate(img, angle)
rotated_batch.append(rotated_img)
rotation_labels.append(angle // 90)
return torch.stack(rotated_batch), torch.tensor(rotation_labels)
5.2 对比学习
对比学习是当前最热门的自监督方法:
# 简化版SimCLR实现
class SimCLR(nn.Module):
def __init__(self, base_encoder, projection_dim=128):
super().__init__()
self.encoder = base_encoder
# 获取编码器输出维度
with torch.no_grad():
dummy_input = torch.randn(1, 3, 224, 224)
encoder_dim = self.encoder(dummy_input).shape[1]
# 投影头
self.projection = nn.Sequential(
nn.Linear(encoder_dim, encoder_dim),
nn.ReLU(),
nn.Linear(encoder_dim, projection_dim)
)
def forward(self, x1, x2):
# 编码两个视图
h1 = self.encoder(x1)
h2 = self.encoder(x2)
# 投影
z1 = self.projection(h1)
z2 = self.projection(h2)
return z1, z2
# 对比损失(InfoNCE)
def contrastive_loss(z1, z2, temperature=0.5):
# 归一化
z1 = nn.functional.normalize(z1, dim=1)
z2 = nn.functional.normalize(z2, dim=1)
# 拼接
features = torch.cat([z1, z2], dim=0)
similarity_matrix = torch.matmul(features, features.T) / temperature
# 避免对角线上的自身相似性
mask = torch.eye(2 * z1.shape[0], device=z1.device).bool()
similarity_matrix = similarity_matrix.masked_fill(mask, -9e15)
# 正样本对(z1和z2)
labels = torch.arange(z1.shape[0], device=z1.device)
loss = nn.CrossEntropyLoss()(similarity_matrix, labels)
return loss
5.3 多任务学习
多任务学习可以提升传输训练效果:
class MultiTaskModel(nn.Module):
def __init__(self, base_model, num_classes_main, num_classes_aux):
super().__init__()
self.base = base_model
# 主任务分类头
self.main_head = nn.Linear(base_model.fc.in_features, num_classes_main)
# 辅助任务分类头
self.aux_head = nn.Linear(base_model.fc.in_features, num_classes_aux)
def forward(self, x):
features = self.base(x)
main_out = self.main_head(features)
aux_out = self.aux_head(features)
return main_out, aux_out
# 训练循环
def multi_task_train(model, optimizer, main_criterion, aux_criterion,
main_weight=1.0, aux_weight=0.5):
model.train()
for inputs, (main_labels, aux_labels) in dataloader:
inputs = inputs.to(device)
main_labels = main_labels.to(device)
aux_labels = aux_labels.to(device)
optimizer.zero_grad()
main_out, aux_out = model(inputs)
main_loss = main_criterion(main_out, main_labels)
aux_loss = aux_criterion(aux_out, aux_labels)
total_loss = main_weight * main_loss + aux_weight * aux_loss
total_loss.backward()
optimizer.step()
第六部分:实战技巧与最佳实践
6.1 模型选择指南
| 模型 | 参数量 | 推理速度 | 适用场景 |
|---|---|---|---|
| ResNet18 | 11M | 快 | 移动端、实时应用 |
| ResNet50 | 25M | 中等 | 通用场景 |
| EfficientNet-B0 | 5M | 快 | 资源受限环境 |
| ViT-B/16 | 86M | 慢 | 高精度要求 |
6.2 超参数调优策略
# 网格搜索示例
def hyperparameter_search():
results = {}
for lr in [1e-3, 1e-4, 1e-5]:
for batch_size in [16, 32, 64]:
for dropout in [0.3, 0.5, 0.7]:
print(f"Testing: lr={lr}, bs={batch_size}, dropout={dropout}")
# 训练模型
model = create_model(dropout)
optimizer = optim.SGD(model.parameters(), lr=lr)
# ... 训练代码 ...
# 记录结果
results[(lr, batch_size, dropout)] = best_acc
return results
# 使用Optuna进行贝叶斯优化
import optuna
def objective(trial):
lr = trial.suggest_loguniform('lr', 1e-5, 1e-2)
dropout = trial.suggest_uniform('dropout', 0.2, 0.7)
weight_decay = trial.suggest_loguniform('weight_decay', 1e-6, 1e-3)
model = create_model(dropout)
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
# ... 训练并返回验证准确率 ...
return val_accuracy
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50)
6.3 模型压缩与部署
# 量化感知训练
class QuantizedTransferModel(nn.Module):
def __init__(self, base_model):
super().__init__()
self.quant = torch.quantization.QuantStub()
self.base = base_model
self.dequant = torch.quantization.DeQuantStub()
def forward(self, x):
x = self.quant(x)
x = self.base(x)
x = self.dequant(x)
return x
# 准备量化
model = QuantizedTransferModel(pretrained_model)
model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
torch.quantization.prepare(model, inplace=True)
# 校准(使用少量数据)
# ... 校准代码 ...
# 转换为量化模型
torch.quantization.convert(model, inplace=True)
# 知识蒸馏
def distillation_loss(student_outputs, teacher_outputs, labels, temperature=3.0, alpha=0.7):
# 软标签损失
soft_loss = nn.KLDivLoss()(nn.functional.log_softmax(student_outputs/temperature, dim=1),
nn.functional.softmax(teacher_outputs/temperature, dim=1))
# 硬标签损失
hard_loss = nn.CrossEntropyLoss()(student_outputs, labels)
return alpha * (temperature**2) * soft_loss + (1 - alpha) * hard_loss
第七部分:实战案例:医疗图像分类
7.1 项目背景
假设我们要构建一个肺炎检测系统,使用胸部X光片。
7.2 完整代码实现
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as1
import os
# 自定义数据集类
class ChestXrayDataset(Dataset):
def __init__(self, data_dir, transform=None):
self.data_dir = data_dir
self.transform = transform
self.image_paths = []
self.labels = []
# 假设目录结构: data_dir/normal/*.jpg, data_dir/pneumonia/*.jpg
for label_idx, class_name in enumerate(['normal', 'pneumonia']):
class_dir = os.path.join(data_dir, class_name)
if os.path.exists(class_dir):
for img_name in os.listdir(class_dir):
if img_name.endswith(('.jpg', '.png')):
self.image_paths.append(os.path.join(class_dir, img_name))
self.labels.append(label_idx)
def __len__(self):
return len(self.image_paths)
def __getitem__(self, idx):
img_path = self.image_paths[idx]
image = Image.open(img_path).convert('RGB')
label = self.labels[idx]
if self.transform:
image = self.transform(image)
return image, label
# 数据增强策略
def get_xray_transforms():
return {
'train': transforms.Compose([
transforms.Resize((224, 224)),
transforms.RandomHorizontalFlip(p=0.5),
transforms.RandomRotation(10),
transforms.ColorJitter(brightness=0.1, contrast=0.1),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
'val': transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
}
# 创建模型
def create_pneumonia_model():
# 使用DenseNet121作为基础模型
base_model = models.densenet121(pretrained=True)
# 冻结前面的层
for param in base_model.parameters():
param.requires_grad = False
# 解冻最后两个dense block
for param in base_model.features.denseblock3.parameters():
param.requires_grad = True
for param in base_model.features.denseblock4.parameters():
param.requires_grad = True
# 替换分类器
num_features = base_model.classifier.in_features
base_model.classifier = nn.Sequential(
nn.Dropout(0.5),
nn.Linear(num_features, 256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, 2) # 二分类:正常 vs 肺炎
)
return base_model
# 训练函数(带类别不平衡处理)
def train_pneumonia_model(model, train_loader, val_loader, num_epochs=30):
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)
# 计算类别权重(处理不平衡)
class_counts = [0, 0]
for _, labels in train_loader:
for label in labels:
class_counts[label] += 1
total = sum(class_counts)
weights = torch.FloatTensor([total / (2 * class_counts[0]),
total / (2 * class_counts[1])]).to(device)
criterion = nn.CrossEntropyLoss(weight=weights)
optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()),
lr=0.0001, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max',
patience=5, factor=0.5)
best_acc = 0.0
history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
for epoch in range(num_epochs):
# 训练阶段
model.train()
train_loss = 0.0
train_correct = 0
train_total = 0
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item() * inputs.size(0)
_, predicted = torch.max(outputs, 1)
train_total += labels.size(0)
train_correct += (predicted == labels).sum().item()
train_epoch_loss = train_loss / train_total
train_epoch_acc = train_correct / train_total
# 验证阶段
model.eval()
val_loss = 0.0
val_correct = 0
val_total = 0
all_preds = []
all_labels = []
with torch.no_grad():
for inputs, labels in val_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
val_loss += loss.item() * inputs.size(0)
_, predicted = torch.max(outputs, 1)
val_total += labels.size(0)
val_correct += (predicted == labels).sum().item()
all_preds.extend(predicted.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
val_epoch_loss = val_loss / val_total
val_epoch_acc = val_correct / val_total
# 记录历史
history['train_loss'].append(train_epoch_loss)
history['train_acc'].append(train_epoch_acc)
history['val_loss'].append(val_epoch_loss)
history['val_acc'].append(val_epoch_acc)
# 学习率调度
scheduler.step(val_epoch_acc)
print(f'Epoch {epoch+1}/{num_epochs}')
print(f'Train Loss: {train_epoch_loss:.4f} Acc: {train_epoch_acc:.4f}')
print(f'Val Loss: {val_epoch_loss:.4f} Acc: {val_epoch_acc:.4f}')
# 保存最佳模型
if val_epoch_acc > best_acc:
best_acc = val_epoch_acc
torch.save(model.state_dict(), 'best_pneumonia_model.pth')
print(f'New best model saved with accuracy: {best_acc:.4f}')
print('-' * 50)
# 加载最佳模型
model.load_state_dict(torch.load('best_pneumonia_model.pth'))
return model, history
# 评估函数
def evaluate_model(model, test_loader):
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.eval()
all_preds = []
all_labels = []
all_probs = []
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
probs = torch.softmax(outputs, dim=1)
_, predicted = torch.max(outputs, 1)
all_preds.extend(predicted.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
all_probs.extend(probs[:, 1].cpu().numpy()) # 肺炎概率
# 分类报告
print("Classification Report:")
print(classification_report(all_labels, all_preds,
target_names=['Normal', 'Pneumonia']))
# 混淆矩阵
cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=['Normal', 'Pneumonia'],
yticklabels=['Normal', 'Pneumonia'])
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()
return all_preds, all_labels, all_probs
# 主函数
def main():
# 数据路径
data_dir = 'chest_xray'
train_dir = os.path.join(data_dir, 'train')
val_dir = os.path.join(data_dir, 'val')
test_dir = os.path.join(data_dir, 'test')
# 创建数据集
transforms_dict = get_xray_transforms()
train_dataset = ChestXrayDataset(train_dir, transform=transforms_dict['train'])
val_dataset = ChestXrayDataset(val_dir, transform=transforms_dict['val'])
test_dataset = ChestXrayDataset(test_dir, transform=transforms_dict['val'])
# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=4)
# 创建模型
model = create_pneumonia_model()
# 训练
model, history = train_pneumonia_model(model, train_loader, val_loader, num_epochs=30)
# 评估
preds, labels, probs = evaluate_model(model, test_loader)
# 绘制训练曲线
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history['train_loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Val Loss')
plt.title('Loss Curves')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(history['train_acc'], label='Train Acc')
plt.plot(history['val_acc'], label='Val Acc')
plt.title('Accuracy Curves')
plt.legend()
plt.show()
if __name__ == '__main__':
main()
第八部分:总结与进阶学习路径
8.1 核心要点回顾
- 传输训练的核心价值:数据效率、计算效率、性能提升
- 关键策略:
- 特征提取 vs 完全微调
- 差分学习率
- 强数据增强
- 类别不平衡处理
- 高级技术:自监督学习、对比学习、多任务学习
8.2 常见陷阱与规避方法
- 数据泄露:确保预处理不使用测试集信息
- 过拟合小数据集:使用强正则化和数据增强
- 错误的预处理:保持与预训练模型相同的归一化参数
- 忽略类别不平衡:使用加权损失或重采样
8.3 进阶学习路径
理论基础:
- 深入理解优化算法(AdamW, LAMB)
- 学习Transformer架构(ViT, DeiT)
- 研究领域自适应理论
实践技能:
- 掌握PyTorch Lightning或FastAI框架
- 学习模型部署(ONNX, TensorRT)
- 实践模型量化和剪枝
前沿方向:
- 大规模自监督学习(CLIP, DINO)
- 提示学习(Prompt Learning)
- 持续学习(Continual Learning)
8.4 推荐资源
- 书籍:《Deep Learning with Python》(François Chollet)
- 课程:Stanford CS231n, Fast.ai Practical Deep Learning
- 论文:
- “A Survey on Transfer Learning” (Pan & Yang, 2010)
- “BERT: Pre-training of Deep Bidirectional Transformers” (Devlin et al., 2019)
- “SimCLR: A Simple Framework for Contrastive Learning” (Chen et al., 2020)
- 代码库:Hugging Face Transformers, timm (PyTorch Image Models)
8.5 最后的建议
传输训练不是万能钥匙,但它是解决实际问题的强大工具。记住:
- 从简单开始:先用特征提取验证可行性
- 理解你的数据:数据质量决定模型上限
- 迭代优化:小步快跑,持续改进
- 监控训练:使用TensorBoard或WandB跟踪实验
- 保持好奇:持续学习新技术和方法
通过本指南的学习和实践,你已经掌握了从零基础到精通传输训练的核心知识。现在,是时候将这些知识应用到你的实际项目中了!祝你在深度学习之旅中取得成功!
