Files
task-3-3-2-MLP/digit_mlp_class/model_numpy.py
2026-05-21 15:08:03 +08:00

305 lines
9.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
模型模块 - 纯NumPy实现手写数字识别MLP
网络结构: 784 → 128 → 10
- 输入层: 784 像素值 (28x28 展平)
- 隐藏层: 128 神经元 + ReLU激活
- 输出层: 10 数字 (0-9) + Softmax
纯NumPy实现无任何深度学习框架依赖
只需: numpy
"""
import numpy as np
class MLP:
"""
多层感知机(神经网络)
结构:
输入(784) → 线性变换 → ReLU → 线性变换 → Softmax → 输出(10)
参数量:
W1: 784 × 128 = 100,352
b1: 128
W2: 128 × 10 = 1,280
b2: 10
总计: ~101,770 参数
"""
def __init__(self, input_size=784, hidden_size=128, num_classes=10,
learning_rate=0.1, seed=42):
np.random.seed(seed)
# ===== 第一层: 输入 → 隐藏层 =====
# 权重: (input_size, hidden_size)
# Xavier初始化适合ReLU
self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size)
self.b1 = np.zeros(hidden_size)
# ===== 第二层: 隐藏层 → 输出 =====
# 权重: (hidden_size, num_classes)
self.W2 = np.random.randn(hidden_size, num_classes) * np.sqrt(2.0 / hidden_size)
self.b2 = np.zeros(num_classes)
# 保存超参数
self.lr = learning_rate
self.input_size = input_size
self.hidden_size = hidden_size
self.num_classes = num_classes
# 打印模型信息
total_params = (input_size * hidden_size + hidden_size +
hidden_size * num_classes + num_classes)
print(f"\n{'='*50}")
print(f"MLP 网络结构:")
print(f" 输入层: {input_size} 神经元")
print(f" 隐藏层: {hidden_size} 神经元 + ReLU")
print(f" 输出层: {num_classes} 神经元 + Softmax")
print(f" 参数量: {total_params:,}")
print(f"{'='*50}")
def relu(self, x):
"""ReLU激活函数: max(0, x)"""
return np.maximum(0, x)
def relu_derivative(self, x):
"""ReLU导数: x > 0 时为1否则为0"""
return (x > 0).astype(float)
def softmax(self, x):
"""
Softmax函数: 将数值转换为概率分布
softmax(x_i) = exp(x_i) / sum(exp(x_j))
技巧: 减去最大值避免数值溢出
"""
x_shifted = x - np.max(x, axis=1, keepdims=True)
exp_x = np.exp(x_shifted)
return exp_x / np.sum(exp_x, axis=1, keepdims=True)
def forward(self, X):
"""
前向传播
Args:
X: (batch_size, 784) 图像像素值
Returns:
probs: (batch_size, 10) 每个类的概率
"""
# ===== 第一层计算 =====
# z1 = X @ W1 + b1
# a1 = relu(z1)
self.z1 = X @ self.W1 + self.b1 # (batch, 784) @ (784, 128) = (batch, 128)
self.a1 = self.relu(self.z1) # (batch, 128)
# ===== 第二层计算 =====
# z2 = a1 @ W2 + b2
# probs = softmax(z2)
self.z2 = self.a1 @ self.W2 + self.b2 # (batch, 128) @ (128, 10) = (batch, 10)
self.probs = self.softmax(self.z2) # (batch, 10)
return self.probs
def backward(self, X, y):
"""
反向传播(梯度下降)
Args:
X: (batch_size, 784) 图像
y: (batch_size, 10) One-Hot标签
"""
batch_size = X.shape[0]
# ===== 输出层梯度 =====
# Softmax + 交叉熵的梯度简化为: p - y
d_z2 = self.probs - y # (batch, 10)
# ===== 第二层梯度 =====
d_W2 = self.a1.T @ d_z2 # (128, 10)
d_b2 = np.sum(d_z2, axis=0) # (10,)
# ===== 隐藏层梯度 =====
d_a1 = d_z2 @ self.W2.T # (batch, 128)
d_z1 = d_a1 * self.relu_derivative(self.z1) # (batch, 128)
# ===== 第一层梯度 =====
d_W1 = X.T @ d_z1 # (784, 128)
d_b1 = np.sum(d_z1, axis=0) # (128,)
# ===== 梯度裁剪(防止梯度爆炸) =====
max_grad = 1.0
d_W1 = np.clip(d_W1, -max_grad, max_grad)
d_W2 = np.clip(d_W2, -max_grad, max_grad)
d_b1 = np.clip(d_b1, -max_grad, max_grad)
d_b2 = np.clip(d_b2, -max_grad, max_grad)
# ===== 更新权重(梯度下降) =====
self.W1 -= self.lr * d_W1 / batch_size
self.b1 -= self.lr * d_b1 / batch_size
self.W2 -= self.lr * d_W2 / batch_size
self.b2 -= self.lr * d_b2 / batch_size
def cross_entropy_loss(self, probs, y):
"""
交叉熵损失
L = -sum(y * log(p)) / N
"""
# 取真实类别的概率
correct_probs = probs[np.arange(len(y)), y.argmax(axis=1)]
# 避免log(0)
loss = -np.mean(np.log(np.clip(correct_probs, 1e-10, 1.0)))
return loss
def fit(self, X_train, y_train, X_val=None, y_val=None,
epochs=50, batch_size=64, verbose=True):
"""
训练模型
Args:
X_train: 训练数据 (N, 784)
y_train: 训练标签 (N, 10) One-Hot
X_val: 验证数据(可选)
y_val: 验证标签(可选)
epochs: 训练轮数
batch_size: 批大小
verbose: 是否打印进度
"""
N = len(X_train)
num_batches = (N + batch_size - 1) // batch_size
for epoch in range(epochs):
# ===== 打乱数据 =====
indices = np.random.permutation(N)
X_shuffled = X_train[indices]
y_shuffled = y_train[indices]
epoch_loss = 0
# ===== 批训练 =====
for batch_idx in range(num_batches):
start = batch_idx * batch_size
end = min(start + batch_size, N)
X_batch = X_shuffled[start:end]
y_batch = y_shuffled[start:end]
# 前向传播
probs = self.forward(X_batch)
# 反向传播
self.backward(X_batch, y_batch)
# 计算损失
loss = self.cross_entropy_loss(probs, y_batch)
epoch_loss += loss
# ===== 打印进度 =====
if verbose and (epoch + 1) % 5 == 0:
train_acc = self.accuracy(X_train, y_train)
msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}"
if X_val is not None:
val_acc = self.accuracy(X_val, y_val)
msg += f" | 测试准确率: {val_acc:.4f}"
print(msg)
return self
def predict(self, X):
"""
预测类别
Args:
X: (N, 784) 图像
Returns:
predictions: (N,) 预测的类别标签 (0-9)
"""
probs = self.forward(X)
return np.argmax(probs, axis=1)
def predict_proba(self, X):
"""
预测概率
Returns:
probs: (N, 10) 每个类的概率
"""
return self.forward(X)
def accuracy(self, X, y):
"""
计算准确率
Args:
X: (N, 784) 图像
y: (N,) 或 (N, 10) 标签
"""
if len(y.shape) > 1:
y = np.argmax(y, axis=1)
predictions = self.predict(X)
return np.mean(predictions == y)
def save(self, filepath):
"""保存模型权重"""
np.save(filepath + '_W1.npy', self.W1)
np.save(filepath + '_b1.npy', self.b1)
np.save(filepath + '_W2.npy', self.W2)
np.save(filepath + '_b2.npy', self.b2)
print(f"\n模型已保存: {filepath}")
@staticmethod
def load(filepath, input_size=784, hidden_size=128, num_classes=10, learning_rate=0.1):
"""加载模型权重"""
model = MLP(input_size, hidden_size, num_classes, learning_rate)
model.W1 = np.load(filepath + '_W1.npy')
model.b1 = np.load(filepath + '_b1.npy')
model.W2 = np.load(filepath + '_W2.npy')
model.b2 = np.load(filepath + '_b2.npy')
print(f"\n模型已加载: {filepath}")
return model
# ===== 测试代码 =====
if __name__ == '__main__':
# 简单测试
print("测试MLP模型...")
model = MLP(input_size=784, hidden_size=128, num_classes=10, learning_rate=0.1)
# 模拟数据
X_test = np.random.randn(32, 784)
y_test = np.zeros((32, 10))
for i in range(32):
y_test[i, i % 10] = 1
# 前向传播测试
probs = model.forward(X_test)
print(f"输出概率形状: {probs.shape}")
print(f"概率和: {probs[0].sum():.4f} (应该接近1)")
# 反向传播测试
model.backward(X_test, y_test)
print("反向传播测试通过!")
# 预测测试
preds = model.predict(X_test)
print(f"预测结果: {preds}")