Pytorch详细版文档
分类: 算法&模型训练
PyTorch 深度学习基础
介绍
PyTorch是一个基于Python的深度学习框架,由Facebook开发并开源,以其动态计算图和简洁易用的API著称,广泛应用于学术研究和工业部署。
准备数据
引入库
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torchvision
import torchvision.transforms as transforms
加载数据
方法一 加载内置数据集
# 加载MNIST手写数据集
train_dataset = torchvision.datasets.MNIST(
root='./data',
train=True,
transform=transforms.ToTensor(),
download=True
)
test_dataset = torchvision.datasets.MNIST(
root='./data',
train=False,
transform=transforms.ToTensor(),
download=True
)
方法二 使用DataLoader加载数据
# batch_size: 每次训练迭代取出的样本数量
# shuffle: 是否打乱数据顺序
# num_workers: 用于数据加载的子进程数量
train_loader = DataLoader(
dataset=train_dataset,
batch_size=64,
shuffle=True,
num_workers=2
)
test_loader = DataLoader(
dataset=test_dataset,
batch_size=64,
shuffle=False,
num_workers=2
)
方法三 加载外部CSV数据集
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
df = pd.read_csv("data.csv")
X = df.drop("label", axis=1).values.astype(np.float32)
y = df["label"].values.astype(np.int64)
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=0.2,
random_state=42,
shuffle=True
)
# 转换为PyTorch张量
X_train_tensor = torch.from_numpy(X_train)
y_train_tensor = torch.from_numpy(y_train)
X_test_tensor = torch.from_numpy(X_test)
y_test_tensor = torch.from_numpy(y_test)
# 创建数据集
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
数据预处理
归一化:让数据落在特定范围,训练更稳定、更快
方法一 使用标准化(Standardization)
# 将数据转换为均值为0,标准差为1的分布
mean = 0.5
std = 0.5
transform = transforms.Normalize(mean=[mean], std=[std])
方法二 归一化到[0,1]范围
# 自己实现归一化到[0,1]范围
X_train_tensor = torch.from_numpy(X_train.astype(np.float32)) / 255.0
方法三 对图像数据进行归一化
transform = transforms.Compose([
transforms.ToTensor(), # 转换为张量并归一化到[0,1]
transforms.Normalize(mean=[0.5], std=[0.5]) # 进一步标准化
])
构建模型
方法一 使用nn.Sequential(顺序容器)
model = nn.Sequential(
nn.Flatten(), # 将28x28的图像展平为784维向量
nn.Linear(784, 128), # 全连接层
nn.ReLU(), # 激活函数
nn.Linear(128, 10), # 输出层
nn.Softmax(dim=1) # Softmax激活
)
方法二 继承nn.Module(推荐方式)
class MyModel(nn.Module):
def __init__(self):
super(MyModel, self).__init__()
self.flatten = nn.Flatten()
self.fc1 = nn.Linear(784, 128)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.flatten(x)
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
model = MyModel()
方法三 构建更复杂的CNN模型
class CNNModel(nn.Module):
def __init__(self):
super(CNNModel, self).__init__()
# 卷积层
self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1)
self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
# 池化层
self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
# 全连接层
self.fc1 = nn.Linear(64 * 7 * 7, 128)
self.fc2 = nn.Linear(128, 10)
# Dropout防止过拟合
self.dropout = nn.Dropout(0.5)
def forward(self, x):
# 第一层卷积 + ReLU + 池化
x = self.pool(torch.relu(self.conv1(x)))
# 第二层卷积 + ReLU + 池化
x = self.pool(torch.relu(self.conv2(x)))
# 展平
x = x.view(-1, 64 * 7 * 7)
# 全连接层
x = self.dropout(torch.relu(self.fc1(x)))
x = self.fc2(x)
return x
model = CNNModel()
| 层类型 |
作用 |
常见参数 |
| nn.Flatten |
将输入张量展平 |
无 |
| nn.Linear |
全连接层 |
in_features, out_features |
| nn.Conv2d |
二维卷积层 |
in_channels, out_channels, kernel_size |
| nn.MaxPool2d |
最大池化层 |
kernel_size, stride |
| nn.AvgPool2d |
平均池化层 |
kernel_size, stride |
| nn.Dropout |
随机失活 |
p(失活概率) |
| nn.BatchNorm2d |
批归一化 |
num_features |
激活函数
| 激活函数 |
简介 |
适用场景 |
| nn.ReLU |
线性整流函数,计算快 |
隐藏层常用 |
| nn.Sigmoid |
S型曲线,输出0-1 |
二分类输出层 |
| nn.Softmax |
归一化指数函数 |
多分类输出层 |
| nn.Tanh |
双曲正切,输出-1到1 |
隐藏层 |
| nn.LeakyReLU |
带泄漏的ReLU |
解决ReLU死亡问题 |
训练模型
定义损失函数
分类问题
# 多分类问题(整数标签)
criterion = nn.CrossEntropyLoss()
# 二分类问题
criterion = nn.BCEWithLogitsLoss()
# 多分类问题(one-hot标签)
criterion = nn.CrossEntropyLoss()
回归问题
| 损失函数 |
简介 |
| nn.MSELoss |
均方误差,常用 |
| nn.L1Loss |
平均绝对误差,鲁棒 |
| nn.SmoothL1Loss |
平滑L1损失 |
criterion = nn.MSELoss()
criterion = nn.L1Loss()
criterion = nn.SmoothL1Loss()
定义优化器
# SGD随机梯度下降
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
# Adam优化器(最常用)
optimizer = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999))
# RMSprop优化器
optimizer = optim.RMSprop(model.parameters(), lr=0.01, alpha=0.99)
# AdamW(带权重衰减的Adam)
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
| 优化器 |
简介 |
常用参数 |
| SGD |
随机梯度下降,可带动量 |
lr, momentum, nesterov |
| Adam |
自适应矩估计,收敛快 |
lr, betas, eps, weight_decay |
| RMSprop |
自适应学习率 |
lr, alpha, momentum |
| AdamW |
Adam改进版,正则化 |
lr, betas, weight_decay |
| Adagrad |
适合稀疏数据 |
lr, eps |
| Adadelta |
Adagrad改进版 |
rho, eps |
训练循环
# 设置设备(GPU或CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
# 训练轮数
num_epochs = 10
for epoch in range(num_epochs):
# 训练阶段
model.train()
running_loss = 0.0
correct = 0
total = 0
for batch_idx, (data, target) in enumerate(train_loader):
# 将数据移到设备上
data, target = data.to(device), target.to(device)
# 梯度清零
optimizer.zero_grad()
# 前向传播
output = model(data)
# 计算损失
loss = criterion(output, target)
# 反向传播
loss.backward()
# 更新参数
optimizer.step()
# 统计损失
running_loss += loss.item()
# 统计准确率
_, predicted = torch.max(output.data, 1)
total += target.size(0)
correct += (predicted == target).sum().item()
# 每100个batch打印一次
if (batch_idx + 1) % 100 == 0:
print(f'Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}')
# 打印本轮训练结果
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100*correct/total:.2f}%')
学习率调度
# 步骤衰减
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)
# 余弦退火
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)
# 指数衰减
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)
# 在训练循环中更新学习率
for epoch in range(num_epochs):
# ... 训练代码 ...
scheduler.step()
评估模型
基本语法
model.eval() # 切换到评估模式
with torch.no_grad(): # 关闭梯度计算
correct = 0
total = 0
test_loss = 0
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += criterion(output, target).item()
# 获取预测结果
_, predicted = torch.max(output.data, 1)
total += target.size(0)
correct += (predicted == target).sum().item()
print(f'测试集损失: {test_loss/len(test_loader):.4f}')
print(f'测试集准确率: {100*correct/total:.2f}%')
使用sklearn评估指标
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np
model.eval()
all_preds = []
all_targets = []
with torch.no_grad():
for data, target in test_loader:
data = data.to(device)
output = model(data)
_, predicted = torch.max(output, 1)
all_preds.extend(predicted.cpu().numpy())
all_targets.extend(target.numpy())
print(classification_report(all_targets, all_preds))
print(confusion_matrix(all_targets, all_preds))
使用模型进行预测
model.eval()
with torch.no_grad():
# 预测单条数据
single_data = torch.randn(1, 1, 28, 28).to(device)
output = model(single_data)
prediction = torch.argmax(output, dim=1)
print(f"预测类别: {prediction.item()}")
# 预测多条数据
batch_data = torch.randn(64, 1, 28, 28).to(device)
outputs = model(batch_data)
predictions = torch.argmax(outputs, dim=1)
print(f"预测类别: {predictions}")
保存和加载模型
保存整个模型
# 保存整个模型
torch.save(model, "model.pth")
# 加载整个模型
model = torch.load("model.pth")
保存模型参数(推荐)
# 仅保存模型参数
torch.save(model.state_dict(), "model_weights.pth")
# 加载模型参数
model = MyModel() # 先实例化模型结构
model.load_state_dict(torch.load("model_weights.pth"))
保存和加载检查点
# 保存检查点(包含优化器状态等)
checkpoint = {
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': loss,
}
torch.save(checkpoint, "checkpoint.pth")
# 加载检查点
checkpoint = torch.load("checkpoint.pth")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
start_epoch = checkpoint['epoch']
start_loss = checkpoint['loss']
完整示例
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision
import torchvision.transforms as transforms
# 1. 准备数据
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,))
])
train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = torchvision.datasets.MNIST(root='./data', train=False, transform=transform, download=True)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
# 2. 构建模型
class SimpleNet(nn.Module):
def __init__(self):
super(SimpleNet, self).__init__()
self.flatten = nn.Flatten()
self.fc1 = nn.Linear(784, 128)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.flatten(x)
x = self.relu(self.fc1(x))
x = self.fc2(x)
return x
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleNet().to(device)
# 3. 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 4. 训练模型
num_epochs = 5
for epoch in range(num_epochs):
model.train()
for data, target in train_loader:
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
# 5. 评估模型
model.eval()
correct = 0
total = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
_, predicted = torch.max(output.data, 1)
total += target.size(0)
correct += (predicted == target).sum().item()
print(f'Epoch {epoch+1}: Accuracy = {100*correct/total:.2f}%')
# 6. 保存模型
torch.save(model.state_dict(), "mnist_model.pth")