Pytorch详细版文档

分类: 算法&模型训练

PyTorch 深度学习基础

介绍

PyTorch是一个基于Python的深度学习框架,由Facebook开发并开源,以其动态计算图和简洁易用的API著称,广泛应用于学术研究和工业部署。

准备数据

引入库

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torchvision
import torchvision.transforms as transforms

加载数据

方法一 加载内置数据集

# 加载MNIST手写数据集
train_dataset = torchvision.datasets.MNIST(
    root='./data',
    train=True,
    transform=transforms.ToTensor(),
    download=True
)

test_dataset = torchvision.datasets.MNIST(
    root='./data',
    train=False,
    transform=transforms.ToTensor(),
    download=True
)

方法二 使用DataLoader加载数据

# batch_size: 每次训练迭代取出的样本数量
# shuffle: 是否打乱数据顺序
# num_workers: 用于数据加载的子进程数量
train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=64,
    shuffle=True,
    num_workers=2
)

test_loader = DataLoader(
    dataset=test_dataset,
    batch_size=64,
    shuffle=False,
    num_workers=2
)

方法三 加载外部CSV数据集

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

df = pd.read_csv("data.csv")

X = df.drop("label", axis=1).values.astype(np.float32)
y = df["label"].values.astype(np.int64)

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    random_state=42,
    shuffle=True
)

# 转换为PyTorch张量
X_train_tensor = torch.from_numpy(X_train)
y_train_tensor = torch.from_numpy(y_train)
X_test_tensor = torch.from_numpy(X_test)
y_test_tensor = torch.from_numpy(y_test)

# 创建数据集
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

数据预处理

归一化:让数据落在特定范围,训练更稳定、更快

方法一 使用标准化(Standardization)

# 将数据转换为均值为0,标准差为1的分布
mean = 0.5
std = 0.5
transform = transforms.Normalize(mean=[mean], std=[std])

方法二 归一化到[0,1]范围

# 自己实现归一化到[0,1]范围
X_train_tensor = torch.from_numpy(X_train.astype(np.float32)) / 255.0

方法三 对图像数据进行归一化

transform = transforms.Compose([
    transforms.ToTensor(),  # 转换为张量并归一化到[0,1]
    transforms.Normalize(mean=[0.5], std=[0.5])  # 进一步标准化
])

构建模型

方法一 使用nn.Sequential(顺序容器)

model = nn.Sequential(
    nn.Flatten(),  # 将28x28的图像展平为784维向量
    nn.Linear(784, 128),  # 全连接层
    nn.ReLU(),  # 激活函数
    nn.Linear(128, 10),  # 输出层
    nn.Softmax(dim=1)  # Softmax激活
)

方法二 继承nn.Module(推荐方式)

class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(784, 128)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

model = MyModel()

方法三 构建更复杂的CNN模型

class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        # 卷积层
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        # 池化层
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        # 全连接层
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 10)
        # Dropout防止过拟合
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        # 第一层卷积 + ReLU + 池化
        x = self.pool(torch.relu(self.conv1(x)))
        # 第二层卷积 + ReLU + 池化
        x = self.pool(torch.relu(self.conv2(x)))
        # 展平
        x = x.view(-1, 64 * 7 * 7)
        # 全连接层
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return x

model = CNNModel()
层类型 作用 常见参数
nn.Flatten 将输入张量展平
nn.Linear 全连接层 in_features, out_features
nn.Conv2d 二维卷积层 in_channels, out_channels, kernel_size
nn.MaxPool2d 最大池化层 kernel_size, stride
nn.AvgPool2d 平均池化层 kernel_size, stride
nn.Dropout 随机失活 p(失活概率)
nn.BatchNorm2d 批归一化 num_features

激活函数

激活函数 简介 适用场景
nn.ReLU 线性整流函数,计算快 隐藏层常用
nn.Sigmoid S型曲线,输出0-1 二分类输出层
nn.Softmax 归一化指数函数 多分类输出层
nn.Tanh 双曲正切,输出-1到1 隐藏层
nn.LeakyReLU 带泄漏的ReLU 解决ReLU死亡问题

训练模型

定义损失函数

分类问题

# 多分类问题(整数标签)
criterion = nn.CrossEntropyLoss()

# 二分类问题
criterion = nn.BCEWithLogitsLoss()

# 多分类问题(one-hot标签)
criterion = nn.CrossEntropyLoss()

回归问题

损失函数 简介
nn.MSELoss 均方误差,常用
nn.L1Loss 平均绝对误差,鲁棒
nn.SmoothL1Loss 平滑L1损失
criterion = nn.MSELoss()
criterion = nn.L1Loss()
criterion = nn.SmoothL1Loss()

定义优化器

# SGD随机梯度下降
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

# Adam优化器(最常用)
optimizer = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999))

# RMSprop优化器
optimizer = optim.RMSprop(model.parameters(), lr=0.01, alpha=0.99)

# AdamW(带权重衰减的Adam)
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
优化器 简介 常用参数
SGD 随机梯度下降,可带动量 lr, momentum, nesterov
Adam 自适应矩估计,收敛快 lr, betas, eps, weight_decay
RMSprop 自适应学习率 lr, alpha, momentum
AdamW Adam改进版,正则化 lr, betas, weight_decay
Adagrad 适合稀疏数据 lr, eps
Adadelta Adagrad改进版 rho, eps

训练循环

# 设置设备(GPU或CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# 训练轮数
num_epochs = 10

for epoch in range(num_epochs):
    # 训练阶段
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for batch_idx, (data, target) in enumerate(train_loader):
        # 将数据移到设备上
        data, target = data.to(device), target.to(device)

        # 梯度清零
        optimizer.zero_grad()

        # 前向传播
        output = model(data)

        # 计算损失
        loss = criterion(output, target)

        # 反向传播
        loss.backward()

        # 更新参数
        optimizer.step()

        # 统计损失
        running_loss += loss.item()

        # 统计准确率
        _, predicted = torch.max(output.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()

        # 每100个batch打印一次
        if (batch_idx + 1) % 100 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}')

    # 打印本轮训练结果
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100*correct/total:.2f}%')

学习率调度

# 步骤衰减
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

# 余弦退火
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)

# 指数衰减
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)

# 在训练循环中更新学习率
for epoch in range(num_epochs):
    # ... 训练代码 ...
    scheduler.step()

评估模型

基本语法

model.eval()  # 切换到评估模式
with torch.no_grad():  # 关闭梯度计算
    correct = 0
    total = 0
    test_loss = 0

    for data, target in test_loader:
        data, target = data.to(device), target.to(device)

        output = model(data)
        test_loss += criterion(output, target).item()

        # 获取预测结果
        _, predicted = torch.max(output.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()

print(f'测试集损失: {test_loss/len(test_loader):.4f}')
print(f'测试集准确率: {100*correct/total:.2f}%')

使用sklearn评估指标

from sklearn.metrics import classification_report, confusion_matrix
import numpy as np

model.eval()
all_preds = []
all_targets = []

with torch.no_grad():
    for data, target in test_loader:
        data = data.to(device)
        output = model(data)
        _, predicted = torch.max(output, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_targets.extend(target.numpy())

print(classification_report(all_targets, all_preds))
print(confusion_matrix(all_targets, all_preds))

使用模型进行预测

model.eval()
with torch.no_grad():
    # 预测单条数据
    single_data = torch.randn(1, 1, 28, 28).to(device)
    output = model(single_data)
    prediction = torch.argmax(output, dim=1)
    print(f"预测类别: {prediction.item()}")

    # 预测多条数据
    batch_data = torch.randn(64, 1, 28, 28).to(device)
    outputs = model(batch_data)
    predictions = torch.argmax(outputs, dim=1)
    print(f"预测类别: {predictions}")

保存和加载模型

保存整个模型

# 保存整个模型
torch.save(model, "model.pth")

# 加载整个模型
model = torch.load("model.pth")

保存模型参数(推荐)

# 仅保存模型参数
torch.save(model.state_dict(), "model_weights.pth")

# 加载模型参数
model = MyModel()  # 先实例化模型结构
model.load_state_dict(torch.load("model_weights.pth"))

保存和加载检查点

# 保存检查点(包含优化器状态等)
checkpoint = {
    'epoch': epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': loss,
}
torch.save(checkpoint, "checkpoint.pth")

# 加载检查点
checkpoint = torch.load("checkpoint.pth")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
start_epoch = checkpoint['epoch']
start_loss = checkpoint['loss']

完整示例

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision
import torchvision.transforms as transforms

# 1. 准备数据
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = torchvision.datasets.MNIST(root='./data', train=False, transform=transform, download=True)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# 2. 构建模型
class SimpleNet(nn.Module):
    def __init__(self):
        super(SimpleNet, self).__init__()
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(784, 128)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.flatten(x)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleNet().to(device)

# 3. 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 4. 训练模型
num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    for data, target in train_loader:
        data, target = data.to(device), target.to(device)

        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

    # 5. 评估模型
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    print(f'Epoch {epoch+1}: Accuracy = {100*correct/total:.2f}%')

# 6. 保存模型
torch.save(model.state_dict(), "mnist_model.pth")