引言:什么是传输训练及其重要性
传输训练(Transfer Learning)是机器学习领域的一项革命性技术,它允许我们将从一个任务中学到的知识应用到另一个相关但不同的任务上。对于新手来说,这就像学会了骑自行车后,学习骑摩托车会变得更容易——因为平衡感这种基础知识已经掌握了。
在深度学习中,传输训练的重要性体现在以下几个方面:
- 数据效率提升:不需要海量数据就能训练出高性能模型
- 训练时间缩短:利用预训练模型,训练时间可缩短50-90%
- 性能提升:在小数据集上也能达到甚至超过从头训练的模型
- 降低计算成本:减少GPU/TPU的使用时间,降低实验成本
第一部分:基础概念理解(零基础入门)
1.1 传输训练的核心原理
传输训练基于一个核心假设:不同任务之间存在共享的特征表示。例如,图像识别中的边缘、纹理、形状等特征在不同图像任务中是通用的。
类比说明:
- 学习英语单词:先掌握基础词汇(通用知识),再学习专业术语(特定知识)
- 学习乐器:先掌握乐理(通用知识),再学习特定曲目(特定知识)
1.2 关键术语解释
预训练模型(Pre-trained Model):
- 在大规模数据集(如ImageNet)上预先训练好的模型
- 包含了通用的特征提取能力
- 例如:ResNet, VGG, BERT, GPT等
特征提取器(Feature Extractor):
- 模型的前几层,负责提取通用特征
- 在传输训练中通常被冻结(freeze)不参与训练
分类头(Classifier Head):
- 模型的最后几层,负责特定任务的输出
- 在传输训练中通常需要重新训练
1.3 传输训练的三种主要模式
特征提取(Feature Extraction):
- 冻结预训练模型的所有层
- 只训练新添加的分类层
- 适用于数据量较小的情况
微调(Fine-tuning):
- 解冻部分或全部预训练层
- 使用较低的学习率进行训练
- 适用于数据量中等的情况
端到端训练(End-to-End Training):
- 所有层都参与训练
- 通常需要大量数据和计算资源
- 适用于数据量非常大的情况
第二部分:环境准备与工具选择
2.1 推荐的开发环境
Python环境:
- Python 3.8+
- 推荐使用Anaconda或Miniconda管理环境
深度学习框架:
- PyTorch:灵活易用,适合研究和实验
- TensorFlow/Keras:工业级部署友好
- 本文将以PyTorch为主进行讲解
硬件要求:
- 最低配置:CPU(训练速度慢)
- 推荐配置:NVIDIA GPU(RTX 3060 12GB或更高)
- 云平台:Google Colab(免费)、AWS、Azure等
2.2 安装必要的库
# 创建conda环境
conda create -n transfer_learning python=3.9
conda activate transfer_learning
# 安装PyTorch(根据你的CUDA版本)
# 如果你有NVIDIA GPU
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
# 如果没有GPU
pip install torch torchvision torchaudio
# 安装其他必要库
pip install matplotlib seaborn pandas numpy scikit-learn
pip install tqdm # 进度条工具
2.3 验证安装
import torch
import torchvision
import matplotlib.pyplot as plt
# 检查GPU是否可用
device = torch.device("cuda" if torch.cuda.is() else "cpu")
print(f"使用设备: {device}")
# 棔查PyTorch版本
print(f"PyTorch版本: {torch.__version__}")
第三部分:第一个传输训练项目(图像分类)
3.1 项目概述
我们将使用PyTorch实现一个猫狗分类器,基于预训练的ResNet18模型。这是一个经典的入门项目,能让你快速理解传输训练的完整流程。
3.2 数据准备
数据集结构:
dataset/
├── train/
│ ├── cat/
│ │ ├── cat_001.jpg
│ │ ├── cat_002.jpg
│ │ └── ...
│ └── dog/
│ ├── dog_001.jpg
│ ├── dog_002.jpg
│ └── ...
└── val/
├── cat/
└── dog/
数据下载: 如果你没有自己的数据集,可以使用Kaggle的Dogs vs. Cats数据集:
# 安装kaggle命令行工具
pip install kaggle
# 下载数据集(需要Kaggle账号)
kaggle competitions download -c dogs-vs-cats
3.3 完整代码实现
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
from tqdm import tqdm
import os
from pathlib import Path
# 1. 数据预处理和增强
def get_transforms():
"""定义训练和验证的数据变换"""
train_transform = transforms.Compose([
transforms.Resize((224, 224)), # ResNet输入尺寸
transforms.RandomHorizontalFlip(p=0.5), # 随机水平翻转
transforms.RandomRotation(15), # 随机旋转15度
transforms.ColorJitter(brightness=0.2, contrast=0.2), # 颜色抖动
transforms.ToTensor(), # 转为Tensor
transforms.Normalize(mean=[0.485, 0.456, 0.406], # ImageNet标准
std=[0.229, 0.224, 0.225])
])
val_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
return train_transform, val_transform
# 2. 创建自定义数据集类(如果需要处理特殊格式)
class CustomCatDogDataset(torch.utils.data.Dataset):
"""自定义猫狗数据集类"""
def __init__(self, data_dir, transform=None):
self.data_dir = Path(data_dir)
self.transform = transform
self.classes = ['cat', 'dog']
self.class_to_idx = {'cat': 0, 'dog': 1}
# 收集所有图片路径
self.image_paths = []
self.labels = []
for class_name in self.classes:
class_dir = self.data_dir / class_name
if class_dir.exists():
for img_path in class_dir.glob('*.jpg'):
self.image_paths.append(img_path)
self.labels.append(self.class_to_idx[class_name])
def __len__(self):
return len(self.image_paths)
torchvision.datasets.ImageFolder
def __getitem__(self, idx):
img_path = self.image_paths[idx]
label = self.labels[idx]
# 加载图像
image = Image.open(img_path).convert('RGB')
if self.transform:
image = self.transform(image)
return image, label
# 3. 构建传输学习模型
def create_model(num_classes=2, use_pretrained=True):
"""创建基于ResNet18的传输学习模型"""
# 加载预训练的ResNet18
model = models.resnet18(pretrained=use_pretrained)
# 冻结所有卷积层参数
for param in model.parameters():
param.requires_grad = False
# 替换最后的全连接层(分类头)
# ResNet18的fc层输入特征数是512
model.fc = nn.Sequential(
nn.Dropout(0.5), # 防止过拟合
nn.Linear(512, 256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, num_classes)
)
return model
# 4. 训练函数
def train_model(model, train_loader, val_loader, criterion, optimizer,
device, num_epochs=10, patience=3):
"""训练模型并返回最佳模型"""
best_val_acc = 0.0
patience_counter = 0
history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
for epoch in range(num_epochs):
# 训练阶段
model.train()
train_loss = 0.0
train_correct = 0
train_total = 0
train_pbar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs} [Train]')
for inputs, labels in train_pbar:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item() * inputs.size(0)
_, predicted = torch.max(outputs, 1)
train_total += labels.size(0)
train_correct += (predicted == labels).sum().item()
train_pbar.set_postfix({
'loss': f'{loss.item():.4f}',
'acc': f'{100 * train_correct / train_total:.2f}%'
})
# 验证阶段
model.eval()
val_loss = 0.0
val_correct = 0
val_total = 0
with torch.no_grad():
val_pbar = tqdm(val_loader, desc=f'Epoch {epoch+1}/{num_epochs} [Val]')
for inputs, labels in val_pbar:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
val_loss += loss.item() * inputs.size(0)
_, predicted = torch.max(outputs, 1)
val_total += labels.size(0)
val_correct += (predicted == labels).sum().item()
val_pbar.set_postfix({
'loss': f'{loss.item():.4f}',
'acc': f'{100 * val_correct / val_total:.2f}%'
})
# 计算平均损失和准确率
train_loss = train_loss / len(train_loader.dataset)
train_acc = 100 * train_correct / train_total
val_loss = val_loss / len(val_loader.dataset)
val_acc = 100 * val_correct / val_total
# 记录历史
history['train_loss'].append(train_loss)
history['train_acc'].append(train_acc)
history['val_loss'].append(val_loss)
history['val_acc'].append(val_acc)
print(f'Epoch {epoch+1} Summary: '
f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}% | '
f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
# 早停机制
if val_acc > best_val_acc:
best_val_acc = val_acc
patience_counter = 0
# 保存最佳模型
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'val_acc': val_acc,
}, 'best_model.pth')
print(f"✨ 保存新最佳模型,验证准确率: {val_acc:.2f}%")
else:
patience_counter += 1
if patience_counter >= patience:
print(f"早停触发,连续{patience}个epoch验证准确率未提升")
break
return model, history
# 5. 主函数
def main():
# 参数设置
data_dir = 'dataset' # 修改为你的数据集路径
batch_size = 32
num_epochs = 15
learning_rate = 0.001
# 检查数据目录
if not os.path.exists(data_dir):
print(f"错误:数据目录 {data_dir} 不存在")
print("请创建如下结构:")
print("dataset/")
print("├── train/")
print("│ ├── cat/")
print("│ └── dog/")
print("└── val/")
print(" ├── cat/")
print(" └── dog/")
return
# 获取设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")
# 数据变换
train_transform, val_transform = get_transforms()
# 加载数据集
try:
# 方法1:使用ImageFolder(推荐)
train_dataset = datasets.ImageFolder(
os.path.join(data_dir, 'train'),
transform=train_transform
)
val_dataset = datasets.ImageFolder(
os.path.join(data_dir, 'val'),
transform=val_transform
)
except:
# 方法2:使用自定义数据集
train_dataset = CustomCatDogDataset(
os.path.join(data_dir, 'train'),
transform=train_transform
)
val_dataset = CustomCatDogDataset(
os.path.join(data_dir, 'val'),
transform=val_transform
)
# 创建数据加载器
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=4,
pin_memory=True if torch.cuda.is_available() else False
)
val_loader = DataLoader(
val_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=4,
pin_memory=True if torch.cuda.is_available() else False
)
print(f"训练集大小: {len(train_dataset)}")
print(f"验证集大小: {len(val_dataset)}")
print(f"类别: {train_dataset.classes}")
# 创建模型
model = create_model(num_classes=len(train_dataset.classes))
model = model.to(device)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
# 只优化分类头的参数(因为卷积层被冻结)
optimizer = optim.Adam(
filter(lambda p: p.requires_grad, model.parameters()),
lr=learning_rate
)
# 学习率调度器
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='max', factor=0.5, patience=2, verbose=True
)
# 训练模型
print("\n开始训练...")
model, history = train_model(
model, train_loader, val_loader,
criterion, optimizer, device,
num_epochs=num_epochs
)
# 加载最佳模型
checkpoint = torch.load('best_model.pth')
model.load_state_dict(checkpoint['model_state_dict'])
print(f"\n训练完成!最佳验证准确率: {checkpoint['val_acc']:.2f}%")
# 绘制训练曲线
plot_training_history(history)
def plot_training_history(history):
"""绘制训练历史曲线"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
# 损失曲线
ax1.plot(history['train_loss'], label='Train Loss', marker='o')
ax1.plot(history['val_loss'], label='Val Loss', marker='s')
ax1.set_title('Loss over Epochs')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 准确率曲线
ax2.plot(history['train_acc'], label='Train Acc', marker='o')
ax2.plot(history['200'] # 200% 准确率?这不可能!
# 修正:应该是历史记录中的准确率
ax2.plot(history['train_acc'], label='Train Acc', marker='o')
ax2.plot(history['val_acc'], label='Val Acc', marker='s')
ax2.set_title('Accuracy over Epochs')
ax2.set_xlabel('Epoch')
$ax2.set_ylabel('Accuracy (%)')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('training_history.png', dpi=300, bbox_inches='tight')
plt.show()
if __name__ == '__main__':
main()
3.4 代码详细解析
3.4.1 数据预处理(Transforms)
train_transform = transforms.Compose([
transforms.Resize((224, 224)), # ResNet输入尺寸
transforms.RandomHorizontalFlip(p=0.5), # 随机水平翻转
transforms.RandomRotation(15), # 齐次旋转15度
transforms.ColorJitter(brightness=0.2, contrast=0.2), # 颜色抖动
transforms.ToTensor(), # 转为Tensor
transforms.Normalize(mean=[0.485, 0.456, 0.406], # ImageNet标准
std=[0.229, 0.224, 0.225])
])
为什么这样设计?
- Resize((224, 224)):ResNet18的标准输入尺寸
- RandomHorizontalFlip:模拟不同角度的拍摄,增加数据多样性
- RandomRotation:模拟物体倾斜的情况
- ColorJitter:模拟不同光照条件
- Normalize:使用ImageNet的统计量,与预训练权重匹配
3.4.2 模型构建详解
def create_model(num_classes=2, use_pretrained=True):
model = models.resnet18(pretrained=use_pretrained)
# 冻结卷积层
for param in model.parameters():
param.requires_grad = False
# 替换分类头
model.fc = nn.Sequential(
nn.Dropout(0.5),
nn.Linear(512, 256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, num_classes)
)
return model
关键点:
- 冻结参数:
param.requires_grad = False防止卷积层被更新 - 自定义分类头:原ResNet的fc层是
Linear(512, 1000),我们改为适合二分类的结构 - Dropout:防止过拟合,特别是在小数据集上
3.4.3 训练循环详解
# 训练阶段
model.train() # 启用dropout和batchnorm
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad() # 清空梯度
outputs = model(inputs) # 前向传播
loss = criterion(outputs, labels) # 计算损失
loss.backward() # 反向传播
optimizer.step() # 更新参数
# 验证阶段
model.eval() # 禁用dropout和batchnorm
with torch.no_grad(): # 不计算梯度,节省内存
for inputs, labels in val_loader:
# ... 同样的前向传播,但不更新参数
为什么需要model.train()和model.eval()?
- BatchNorm:训练时使用batch统计量,验证时使用移动平均
- Dropout:训练时随机丢弃,验证时使用所有神经元
第四部分:进阶技巧与最佳实践
4.1 学习率策略
# 1. 学习率预热(Warmup)
class WarmupScheduler:
def __init__(self, optimizer, warmup_epochs, base_lr):
self.optimizer = optimizer
self.warmup_epochs = warmup_epochs
self.base_lr = base_lr
self.current_epoch = 0
def step(self):
self.current_epoch += 1
if self.current_epoch <= self.warmup_epochs:
lr = self.base_lr * (self.current_epoch / self.warmup_epochs)
for param_group in self.optimizer.param_groups:
param_group['lr'] = lr
# 2. 余弦退火(Cosine Annealing)
scheduler = optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=50, eta_min=1e-6
)
# 3. 使用学习率查找器(Learning Rate Finder)
def find_lr(model, train_loader, criterion, device, start_lr=1e-7, end_lr=10, num_iter=100):
"""实现Smith学习率查找器"""
lr_finder = LRFinder(model, optimizer, criterion, device)
lr_finder.range_test(train_loader, start_lr=start_lr, end_lr=end_lr, num_iter=num_iter)
lr_finder.plot() # 绘制损失vs学习率曲线
lr_finder.reset()
4.2 数据增强进阶
# 使用Albumentations库(更强大的数据增强)
import albumentations as A
from albumentations.pytorch import ToTensorV2
def get_advanced_transforms():
train_transform = A.Compose([
A.Resize(224, 224),
A.HorizontalFlip(p=0.5),
A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=15, p=0.5),
A.RandomBrightnessContrast(p=0.5),
A.GaussNoise(var_limit=(10.0, 50.0), p=0.3),
A.Cutout(num_holes=8, max_h_size=16, max_w_size=16, fill_value=0, p=0.5),
A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
ToTensorV2(),
])
val_transform = A.Compose([
A.Resize(224, 224),
A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
ToTensorV2(),
])
return train_transform, val_transform
4.3 模型微调策略
# 1. 差异化学习率(Different Learning Rates)
def get_param_groups(model, base_lr=1e-3, head_lr=1e-2):
"""为不同层设置不同学习率"""
params = []
# 卷积层使用较低学习率
params.append({
'params': model.parameters(),
'lr': base_lr,
'weight_decay': 1e-4
})
# 分类头使用较高学习率
params.append({
'params': model.fc.parameters(),
'lr': head_lr,
'weight_decay': 1e-4
})
return params
# 2. 逐步解冻(Gradual Unfreezing)
def gradual_unfreeze(model, epoch, unfreeze_epochs=5):
"""逐步解冻模型层"""
if epoch >= unfreeze_epochs:
# 解冻所有层
for param in model.parameters():
param.requires_grad = True
print(f"Epoch {epoch}: 所有层已解冻")
elif epoch >= unfreeze_epochs - 2:
# 解冻最后几层
for param in model.layer4.parameters():
param.requires_grad = True
print(f"Epoch {epoch}: 解冻layer4")
4.4 损失函数与评估指标
# 1. 类别不平衡处理
class WeightedLoss(nn.Module):
def __init__(self, class_weights):
super().__init__()
self.class_weights = torch.tensor(class_weights).to(device)
def forward(self, outputs, labels):
return nn.functional.cross_entropy(outputs, labels, weight=self.class_weights)
# 2. 标签平滑(Label Smoothing)
class LabelSmoothingLoss(nn.Module):
def __init__(self, num_classes, smoothing=0.1):
super().__init__()
self.num_classes = num_classes
self.smoothing = smoothing
def forward(self, pred, target):
confidence = 1.0 - self.smoothing
log_probs = torch.log_softmax(pred, dim=-1)
with torch.no_grad():
true_dist = torch.zeros_like(pred)
true_dist.fill_(self.smoothing / (self.num_classes - 1))
true_dist.scatter_(1, target.data.unsqueeze(1), confidence)
return torch.mean(torch.sum(-true_dist * log_probs, dim=-1))
# 3. 自定义评估指标
def calculate_metrics(outputs, labels):
"""计算准确率、精确率、召回率、F1"""
_, predicted = torch.max(outputs, 1)
# 准确率
acc = (predicted == labels).float().mean().item()
# 精确率、召回率、F1(多分类需要average='macro')
from sklearn.metrics import precision_recall_fscore_support
pred_cpu = predicted.cpu().numpy()
label_cpu = labels.cpu().numpy()
precision, recall, f1, _ = precision_recall_fscore_support(
label_cpu, pred_cpu, average='macro', zero_division=0
)
return {
'accuracy': acc,
'precision': precision,
'recall': recall,
'f1': f1
}
4.5 混合精度训练(Mixed Precision)
from torch.cuda.amp import autocast, GradScaler
def train_mixed_precision(model, train_loader, optimizer, device, num_epochs=10):
"""混合精度训练,节省显存并加速"""
scaler = GradScaler() # 梯度缩放器
for epoch in range(num_epochs):
model.train()
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
# 自动混合精度上下文
with autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
# 缩放梯度并反向传播
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
4.6 模型集成(Ensemble)
# 1. 简单平均集成
class EnsembleModel(nn.Module):
def __init__(self, models):
super().__init__()
self.models = nn.ModuleList(models)
def forward(self, x):
outputs = [model(x) for model in self.models]
avg_output = torch.stack(outputs).mean(0)
return avg_output
# 2. 加权集成(根据验证集性能)
def create_weighted_ensemble(model_paths, weights):
"""创建加权集成模型"""
models = []
for path in model_paths:
model = torch.load(path)
models.append(model)
class WeightedEnsemble(nn.Module):
def __init__(self, models, weights):
super().__init__()
self.models = nn.ModuleList(models)
self.weights = torch.tensor(weights)
def forward(self, x):
outputs = [model(x) for model in self.models]
weighted_sum = sum(w * out for w, out in zip(self.weights, outputs))
return weighted_sum / self.weights.sum()
return WeightedEnsemble(models, weights)
第五部分:常见问题与解决方案
5.1 过拟合问题
症状:训练准确率很高(>95%),但验证准确率很低(<70%)
解决方案:
# 1. 增强正则化
model.fc = nn.Sequential(
nn.Dropout(0.6), # 增加dropout率
nn.Linear(512, 256),
nn.BatchNorm1d(256), # 添加BatchNorm
nn.ReLU(),
nn.Dropout(0.4),
nn.Linear(256, num_classes)
)
# 2. 权重衰减(L2正则化)
optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
# 3. 早停(已在代码中实现)
# 4. 数据增强(见4.2节)
5.2 欠拟合问题
症状:训练和验证准确率都很低
解决方案:
# 1. 增加模型容量(解冻更多层)
for param in model.layer3.parameters(): # 解冻layer3
param.requires_grad = True
# 2. 增加训练轮数
num_epochs = 30
# 3. 调整学习率
optimizer = optim.Adam(model.parameters(), lr=1e-2) # 提高学习率
# 4. 减少正则化
model.fc = nn.Sequential(
nn.Dropout(0.2), # 降低dropout
nn.Linear(512, num_classes)
)
5.3 梯度消失/爆炸
症状:损失不下降或出现NaN
解决方案:
# 1. 使用梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# 2. 使用更好的初始化
def init_weights(m):
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
nn.init.constant_(m.bias, 0)
model.fc.apply(init_weights)
# 3. 使用BatchNorm
model.fc = nn.Sequential(
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Linear(512, num_classes)
)
# 4. 使用更稳定的优化器
optimizer = optim.AdamW(model.parameters(), lr=1e-3, betas=(0.9, 0.999))
5.4 类别不平衡
症状:多数类准确率高,少数类准确率低
解决方案:
# 1. 计算类别权重
from sklearn.utils.class_weight import compute_class_weight
class_weights = compute_class_weight(
'balanced',
classes=np.unique(train_dataset.targets),
y=train_dataset.targets
)
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)
# 2. 使用加权损失
criterion = nn.CrossEntropyLoss(weight=class_weights)
# 3. 重采样
from torch.utils.data import WeightedRandomSampler
# 计算每个样本的权重
sample_weights = [class_weights[label] for _, label in train_dataset]
sampler = WeightedRandomSampler(sample_weights, len(sample_weights))
train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=sampler)
5.5 显存不足(OOM)
症状:CUDA out of memory
解决方案:
# 1. 减小batch size
batch_size = 8 # 从32减小到8
# 2. 使用梯度累积
accumulation_steps = 4 # 累积4个batch的梯度
for i, (inputs, labels) in enumerate(train_loader):
outputs = model(inputs)
loss = criterion(outputs, labels) / accumulation_steps
loss.backward()
if (i + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
# 3. 使用混合精度(见4.5节)
# 4. 使用更小的模型
model = models.resnet18(pretrained=True) # 而不是resnet50
# 5. 及时清理缓存
torch.cuda.empty_cache()
第六部分:实战项目扩展
6.1 多标签分类
# 修改模型输出和损失函数
class MultiLabelModel(nn.Module):
def __init__(self, num_classes=5):
super().__init__()
self.backbone = models.resnet18(pretrained=True)
for param in self.backbone.parameters():
param.requires_grad = False
# 修改最后一层为多标签
self.backbone.fc = nn.Sequential(
nn.Dropout(0.5),
nn.Linear(512, num_classes)
)
def forward(self, x):
return torch.sigmoid(self.backbone(x)) # 使用sigmoid而不是softmax
# 损失函数
criterion = nn.BCELoss() # 二元交叉熵
6.2 目标检测入门
# 使用Faster R-CNN预训练模型
from torchvision.models.detection import fasterrcnn_resnet50_fpn
def create_detection_model(num_classes=2):
# 加载预训练模型
model = fasterrcnn_resnet50_fpn(pretrained=True)
# 替换分类器
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
return model
# 数据格式需要是 (boxes, labels) 的形式
6.3 自然语言处理(NLP)传输学习
# 使用Hugging Face Transformers
from transformers import BertTokenizer, BertForSequenceClassification
from torch.utils.data import Dataset
class TextDataset(Dataset):
def __init__(self, texts, labels, tokenizer, max_length=128):
self.texts = texts
self.labels = labels
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = str(self.texts[idx])
label = self.labels[idx]
encoding = self.tokenizer(
text,
truncation=True,
padding='max_length',
max_length=self.max_length,
return_tensors='pt'
)
return {
'input_ids': encoding['input_ids'].flatten(),
'attention_mask': encoding['attention_mask'].flatten(),
'labels': torch.tensor(label, dtype=torch.long)
}
# 使用BERT进行文本分类
def create_bert_model(num_labels=2):
model = BertForSequenceClassification.from_pretrained(
'bert-base-chinese', # 或 'bert-base-uncased'
num_labels=num_labels
)
return model
# 训练循环(类似图像分类,但数据加载不同)
第七部分:模型部署与推理
7.1 模型导出
# 1. 保存完整模型
torch.save(model.state_dict(), 'model_weights.pth')
# 2. 保存整个模型(包括结构)
torch.save(model, 'full_model.pth')
# 3. 导出为ONNX格式(跨平台)
dummy_input = torch.randn(1, 3, 224, 224).to(device)
torch.onnx.export(
model,
dummy_input,
'model.onnx',
input_names=['input'],
output_names=['output'],
dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}
)
# 4. TorchScript(PyTorch专用)
model_scripted = torch.jit.script(model)
model_scripted.save('model_scripted.pt')
7.2 推理代码
class Predictor:
def __init__(self, model_path, device='cpu'):
self.device = torch.device(device)
self.model = self.load_model(model_path)
self.transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
def load_model(self, model_path):
# 加载模型结构
model = create_model(num_classes=2)
# 加载权重
checkpoint = torch.load(model_path, map_location=self.device)
if 'model_state_dict' in checkpoint:
model.load_state_dict(checkpoint['model_state_dict'])
else:
model.load_state_dict(checkpoint)
model = model.to(self.device)
model.eval()
return model
def predict(self, image_path, top_k=3):
"""预测单张图片"""
from PIL import Image
# 加载和预处理图像
image = Image.open(image_path).convert('RGB')
input_tensor = self.transform(image).unsqueeze(0).to(self.device)
# 推理
with torch.no_grad():
outputs = self.model(input_tensor)
probabilities = torch.softmax(outputs, dim=1)
top_probs, top_indices = torch.topk(probabilities, k=top_k, dim=1)
# 格式化结果
results = []
for i in range(top_k):
results.append({
'class_idx': top_indices[0][i].item(),
'probability': top_probs[0][i].item(),
'class_name': ['cat', 'dog'][top_indices[0][i].item()]
})
return results
# 使用示例
predictor = Predictor('best_model.pth', device='cuda')
result = predictor.predict('test_cat.jpg')
print(result)
# 输出: [{'class_idx': 0, 'probability': 0.98, 'class_name': 'cat'}, ...]
7.3 Web API部署
# 使用Flask创建API
from flask import Flask, request, jsonify
from PIL import Image
import io
app = Flask(__name__)
predictor = Predictor('best_model.pth')
@app.route('/predict', methods=['POST'])
def predict():
if 'file' not in request.files:
return jsonify({'error': 'No file uploaded'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
try:
# 读取图像
image = Image.open(io.BytesIO(file.read()))
# 保存临时文件(或直接处理)
image.save('temp.jpg')
# 预测
results = predictor.predict('temp.jpg')
return jsonify({
'success': True,
'results': results
})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False)
第八部分:性能优化与调试技巧
8.1 性能分析
# 1. 使用PyTorch Profiler分析瓶颈
from torch.profiler import profile, record_function, ProfilerActivity
def profile_training():
with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
record_shapes=True) as prof:
with record_function("model_inference"):
model(inputs)
prof.export_chrome_trace("trace.json") # 在chrome://tracing中查看
# 2. 监控GPU使用情况
import nvidia_ml_py3 as nvml
nvml.nvmlInit()
handle = nvml.nvmlDeviceGetHandleByIndex(0)
info = nvml.nvmlDeviceGetMemoryInfo(handle)
print(f"GPU显存使用: {info.used / 1024**3:.2f} GB / {info.total / 1024**3:.2f} GB")
8.2 调试技巧
# 1. 检查数据加载
def debug_data_loader(dataloader, num_batches=3):
"""检查数据加载是否正确"""
for i, (inputs, labels) in enumerate(dataloader):
if i >= num_batches:
break
print(f"Batch {i}:")
print(f" Input shape: {inputs.shape}")
print(f" Input range: [{inputs.min():.3f}, {inputs.max():.3f}]")
print(f" Labels: {labels}")
print(f" Labels distribution: {torch.bincount(labels)}")
# 2. 检查模型输出
def debug_model_output(model, dataloader, device):
model.eval()
with torch.no_grad():
inputs, labels = next(iter(dataloader))
inputs = inputs.to(device)
outputs = model(inputs)
print(f"Output shape: {outputs.shape}")
print(f"Output range: [{outputs.min():.3f}, {outputs.max():.3f}]")
print(f"Softmax probabilities: {torch.softmax(outputs, dim=1)}")
print(f"Predicted classes: {torch.argmax(outputs, dim=1)}")
print(f"True classes: {labels}")
# 3. 检查梯度
def debug_gradients(model):
"""检查梯度是否存在和合理"""
has_grad = False
for name, param in model.named_parameters():
if param.grad is not None:
grad_norm = param.grad.norm().item()
print(f"{name}: grad norm = {grad_norm:.6f}")
has_grad = True
else:
print(f"{name}: No gradient")
if not has_grad:
print("警告:模型没有任何梯度!")
8.3 日志与监控
import logging
from torch.utils.tensorboard import SummaryWriter
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('training.log'),
logging.StreamHandler()
]
)
# TensorBoard监控
writer = SummaryWriter(log_dir='runs/experiment_1')
# 在训练循环中
for epoch in range(num_epochs):
# ... 训练代码 ...
# 记录指标
writer.add_scalar('Loss/Train', train_loss, epoch)
writer.add_scalar('Loss/Val', val_loss, epoch)
writer.add_scalar('Accuracy/Train', train_acc, epoch)
writer.add_scalar('Accuracy/Val', val_acc, epoch)
writer.add_scalar('Learning_Rate', optimizer.param_groups[0]['lr'], epoch)
# 记录模型图
if epoch == 0:
writer.add_graph(model, inputs.to(device))
# 记录权重分布
for name, param in model.named_parameters():
if param.grad is not None:
writer.add_histogram(f'Gradients/{name}', param.grad, epoch)
writer.add_histogram(f'Weights/{name}', param, epoch)
writer.close()
第九部分:高级主题与前沿技术
9.1 自监督学习(Self-Supervised Learning)
# SimCLR风格的对比学习
class ContrastiveLearningViewGenerator:
"""生成两个增强视图用于对比学习"""
def __init__(self, base_transform, n_views=2):
self.base_transform = base_transform
self.n_views = n_views
def __call__(self, x):
return [self.base_transform(x) for _ in range(self.n_views)]
# 对比损失
class ContrastiveLoss(nn.Module):
def __init__(self, temperature=0.5):
super().__init__()
self.temperature = temperature
def forward(self, features, labels=None):
# features: [2N, D] where N is batch size, D is feature dim
features = nn.functional.normalize(features, dim=1)
# 计算相似度矩阵
sim_matrix = torch.matmul(features, features.T) / self.temperature
# 对角线设为极小值(避免自身相似)
sim_matrix = sim_matrix - torch.eye(sim_matrix.size(0)) * 1e9
# 对比损失
labels = torch.arange(features.size(0)).to(features.device)
loss = nn.functional.cross_entropy(sim_matrix, labels)
return loss
9.2 领域自适应(Domain Adaptation)
# 使用DANN(Domain Adversarial Neural Network)
class DANN(nn.Module):
def __init__(self, num_classes):
super().__init__()
# 特征提取器
self.feature_extractor = models.resnet18(pretrained=True)
self.feature_extractor.fc = nn.Identity() # 移除原分类头
# 分类器(任务特定)
self.classifier = nn.Sequential(
nn.Linear(512, 256),
nn.ReLU(),
nn.Linear(256, num_classes)
)
# 领域判别器(对抗训练)
self.domain_discriminator = nn.Sequential(
nn.Linear(512, 256),
nn.ReLU(),
nn.Linear(256, 1),
nn.Sigmoid()
)
def forward(self, x, alpha=1.0):
features = self.feature_extractor(x)
# 反向梯度反转层(GRL)
if self.training:
# 在反向传播时反转梯度
features = GradientReversalLayer.apply(features, alpha)
class_output = self.classifier(features)
domain_output = self.domain_discriminator(features)
return class_output, domain_output
# GRL层实现
class GradientReversalLayer(torch.autograd.Function):
@staticmethod
def forward(ctx, x, alpha):
ctx.alpha = alpha
return x.clone()
@staticmethod
def backward(ctx, grad_output):
return -ctx.alpha * grad_output, None
9.3 元学习(Meta-Learning)
# MAML(Model-Agnostic Meta-Learning)简化版
class MAML:
def __init__(self, model, inner_lr=0.01, meta_lr=0.001):
self.model = model
self.inner_lr = inner_lr
self.meta_lr = meta_lr
self.meta_optimizer = optim.Adam(model.parameters(), lr=meta_lr)
def inner_loop(self, support_set):
"""在支持集上快速适应"""
# 创建模型副本
fast_model = copy.deepcopy(self.model)
optimizer = optim.SGD(fast_model.parameters(), lr=self.inner_lr)
# 快速适应
for inputs, labels in support_set:
outputs = fast_model(inputs)
loss = nn.CrossEntropyLoss()(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
return fast_model
def outer_loop(self, task):
"""元训练循环"""
support_set, query_set = task
# 内部循环
fast_model = self.inner_loop(support_set)
# 在查询集上评估
total_loss = 0
for inputs, labels in query_set:
outputs = fast_model(inputs)
loss = nn.CrossEntropyLoss()(outputs, labels)
total_loss += loss
# 元更新
self.meta_optimizer.zero_grad()
total_loss.backward()
self.meta_optimizer.step()
return total_loss.item()
第十部分:总结与学习路径
10.1 核心要点总结
- 传输训练的核心思想:利用预训练模型的知识,加速新任务的学习
- 三种主要模式:特征提取、微调、端到端训练
- 关键技巧:
- 合理冻结/解冻层
- 使用合适的学习率策略
- 充分的数据增强
- 早停和正则化防止过拟合
- 性能优化:混合精度、梯度累积、模型量化
10.2 新手学习路径
阶段1:基础(1-2周)
- 掌握Python和PyTorch基础
- 理解传输训练基本概念
- 完成第一个图像分类项目
阶段2:进阶(2-3周)
- 学习数据增强和正则化技巧
- 掌握学习率调度策略
- 实践多类别、多标签分类
阶段3:高级(3-4周)
- 学习目标检测、语义分割
- 探索NLP传输学习
- 实践模型部署
阶段4:专家(持续学习)
- 自监督学习
- 领域自适应
- 元学习
- 关注最新论文和开源项目
10.3 推荐资源
必读书籍:
- 《Deep Learning with Python》(François Chollet)
- 《Hands-On Machine Learning with Scikit-Learn, Keras, and TensorFlow》
在线课程:
- Fast.ai Practical Deep Learning for Coders
- Stanford CS231n: Convolutional Neural Networks for Visual Recognition
开源项目:
- PyTorch Image Models (timm)
- Hugging Face Transformers
- Detectron2
论文阅读:
- “A Survey on Transfer Learning” (2010)
- “ImageNet Pretraining” (AlexNet, 2012)
- “BERT: Pre-training of Deep Bidirectional Transformers” (2018)
10.4 常见误区避免
❌ 误区1:盲目解冻所有层 ✅ 正确做法:根据数据量和任务复杂度逐步解冻
❌ 误区2:使用预训练模型的原始学习率 ✅ 正确做法:分类头使用更高学习率(10-100倍)
❌ 误区3:忽略数据预处理 ✅ 正确做法:必须与预训练时的预处理一致
❌ 误区4:训练轮数越多越好 ✅ 正确做法:使用早停机制,监控验证集性能
附录:完整项目模板
"""
传输训练项目模板
适用于图像分类任务
"""
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
import os
import logging
from datetime import datetime
class TransferLearningTrainer:
def __init__(self, config):
self.config = config
self.setup_logging()
self.setup_device()
self.setup_data()
self.setup_model()
self.setup_optimizer()
self.setup_writer()
def setup_logging(self):
"""配置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f"logs/training_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def setup_device(self):
"""配置设备"""
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.logger.info(f"使用设备: {self.device}")
def setup_data(self):
"""配置数据加载器"""
train_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.RandomHorizontalFlip(p=0.5),
transforms.RandomRotation(15),
transforms.ColorJitter(brightness=0.2, contrast=0.2),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
val_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
train_dataset = datasets.ImageFolder(
os.path.join(self.config['data_dir'], 'train'),
transform=train_transform
)
val_dataset = datasets.ImageFolder(
os.path.join(self.config['data_dir'], 'val'),
transform=val_transform
)
self.train_loader = DataLoader(
train_dataset,
batch_size=self.config['batch_size'],
shuffle=True,
num_workers=4,
pin_memory=True
)
self.val_loader = DataLoader(
val_dataset,
batch_size=self.config['batch_size'],
shuffle=False,
num_workers=4,
pin_memory=True
)
self.num_classes = len(train_dataset.classes)
self.logger.info(f"类别数: {self.num_classes}")
self.logger.info(f"训练集: {len(train_dataset)} 样本")
self.logger.info(f"验证集: {len(val_dataset)} 样本")
def setup_model(self):
"""构建模型"""
model = models.resnet18(pretrained=True)
# 冻结卷积层
for param in model.parameters():
param.requires_grad = False
# 替换分类头
model.fc = nn.Sequential(
nn.Dropout(self.config['dropout']),
nn.Linear(512, 256),
nn.ReLU(),
nn.Dropout(self.config['dropout'] * 0.5),
nn.Linear(256, self.num_classes)
)
self.model = model.to(self.device)
self.logger.info("模型构建完成")
def setup_optimizer(self):
"""配置优化器"""
self.criterion = nn.CrossEntropyLoss()
# 只优化分类头参数
trainable_params = filter(lambda p: p.requires_grad, self.model.parameters())
if self.config['optimizer'] == 'Adam':
self.optimizer = optim.Adam(
trainable_params,
lr=self.config['lr'],
weight_decay=self.config['weight_decay']
)
elif self.config['optimizer'] == 'AdamW':
self.optimizer = optim.AdamW(
trainable_params,
lr=self.config['lr'],
weight_decay=self.config['weight_decay']
)
self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(
self.optimizer, mode='max', factor=0.5, patience=3, verbose=True
)
def setup_writer(self):
"""配置TensorBoard"""
log_dir = f"runs/exp_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
self.writer = SummaryWriter(log_dir=log_dir)
self.logger.info(f"TensorBoard日志目录: {log_dir}")
def train_epoch(self, epoch):
"""单轮训练"""
self.model.train()
running_loss = 0.0
correct = 0
total = 0
pbar = tqdm(self.train_loader, desc=f'Epoch {epoch}')
for inputs, labels in pbar:
inputs, labels = inputs.to(self.device), labels.to(self.device)
self.optimizer.zero_grad()
outputs = self.model(inputs)
loss = self.criterion(outputs, labels)
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
self.optimizer.step()
running_loss += loss.item() * inputs.size(0)
_, predicted = torch.max(outputs, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
pbar.set_postfix({
'loss': f'{loss.item():.4f}',
'acc': f'{100 * correct / total:.2f}%'
})
epoch_loss = running_loss / len(self.train_loader.dataset)
epoch_acc = 100 * correct / total
return epoch_loss, epoch_acc
def validate(self):
"""验证"""
self.model.eval()
running_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for inputs, labels in self.val_loader:
inputs, labels = inputs.to(self.device), labels.to(self.device)
outputs = self.model(inputs)
loss = self.criterion(outputs, labels)
running_loss += loss.item() * inputs.size(0)
_, predicted = torch.max(outputs, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
val_loss = running_loss / len(self.val_loader.dataset)
val_acc = 100 * correct / total
return val_loss, val_acc
def run(self):
"""主训练循环"""
best_val_acc = 0.0
patience_counter = 0
for epoch in range(1, self.config['epochs'] + 1):
self.logger.info(f"\n{'='*50}")
self.logger.info(f"Epoch {epoch}/{self.config['epochs']}")
# 训练
train_loss, train_acc = self.train_epoch(epoch)
# 验证
val_loss, val_acc = self.validate()
# 记录到TensorBoard
self.writer.add_scalar('Loss/Train', train_loss, epoch)
self.writer.add_scalar('Loss/Val', val_loss, epoch)
self.writer.add_scalar('Accuracy/Train', train_acc, epoch)
self.writer.add_scalar('Accuracy/Val', val_acc, epoch)
self.writer.add_scalar('Learning_Rate', self.optimizer.param_groups[0]['lr'], epoch)
# 日志
self.logger.info(
f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}% | "
f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%"
)
# 学习率调度
self.scheduler.step(val_acc)
# 保存最佳模型
if val_acc > best_val_acc:
best_val_acc = val_acc
patience_counter = 0
torch.save({
'epoch': epoch,
'model_state_dict': self.model.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
'val_acc': val_acc,
'config': self.config
}, 'best_model.pth')
self.logger.info(f"✨ 保存新最佳模型: {val_acc:.2f}%")
else:
patience_counter += 1
if patience_counter >= self.config['patience']:
self.logger.info(f"早停触发,最佳验证准确率: {best_val_acc:.2f}%")
break
self.writer.close()
self.logger.info(f"\n训练完成!最佳验证准确率: {best_val_acc:.2f}%")
return best_val_acc
# 使用示例
if __name__ == '__main__':
config = {
'data_dir': 'dataset',
'batch_size': 32,
'epochs': 20,
'lr': 0.001,
'dropout': 0.5,
'weight_decay': 1e-4,
'optimizer': 'AdamW',
'patience': 5
}
trainer = TransferLearningTrainer(config)
best_acc = trainer.run()
结语
传输训练是深度学习中最重要的技术之一,它极大地降低了深度学习的应用门槛。通过本指南,你已经从零基础开始,逐步掌握了:
- ✅ 传输训练的核心概念和原理
- ✅ 环境搭建和工具选择
- ✅ 完整的图像分类项目实现
- ✅ 进阶技巧(数据增强、学习率策略、正则化)
- ✅ 常见问题的解决方案
- ✅ 模型部署和推理
- ✅ 高级主题(自监督、领域自适应、元学习)
记住:理论知识需要通过大量实践来巩固。建议你:
- 从简单的猫狗分类开始
- 尝试不同的数据集(CIFAR-10, Flowers, etc.)
- 实验不同的超参数组合
- 参与Kaggle竞赛
- 阅读最新论文并复现结果
祝你在传输学习的道路上越走越远,成为深度学习领域的专家!🚀
