上传文件至 /

This commit is contained in:
2026-05-19 11:29:43 +08:00
parent 0271dc3a14
commit 77b3d2fb8f
5 changed files with 705 additions and 1123 deletions

View File

@@ -1,315 +1,261 @@
# -*- coding: utf-8 -*-
"""
模型模块 - 纯NumPy实现
模型模块 - 纯NumPy实现手写数字识别MLP
支持两种模型:
1. Logistic Regression(逻辑回归)- 线性模型
2. MLP(多层感知机)- 两层全连接网络
网络结构: 784 → 128 → 10
- 输入层: 784 像素值 (28x28 展平)
- 隐藏层: 128 神经元 + ReLU激活
- 输出层: 10 数字 (0-9) + Softmax
设计思路:
- 两种模型都共享相同的接口,方便对比
- 代码简洁,每行都有详细注释
- 手动实现反向传播,原理透明
纯NumPy实现无任何深度学习框架依赖
只需: numpy
"""
import numpy as np
class BaseModel:
"""模型基类"""
def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): pass
def predict(self, X): pass
def predict_proba(self, X): pass
def accuracy(self, X, y): pass
class LogisticRegression(BaseModel):
class MLP:
"""
逻辑回归(线性分类器)
结构:输入 → 线性变换 → Softmax → 输出
原理:
- 线性变换: z = X @ W + b
- Softmax: 将线性输出转为概率分布
参数量:input_size × num_classes + num_classes
"""
def __init__(self, input_size, num_classes=2, learning_rate=0.1,
class_weight=None, seed=42):
np.random.seed(seed)
# 权重初始化(Xavier)
self.W = np.random.randn(input_size, num_classes) * np.sqrt(2.0 / input_size)
self.b = np.zeros(num_classes)
self.lr = learning_rate
self.input_size = input_size
self.num_classes = num_classes
self.class_weight = class_weight # 类别权重
total_params = input_size * num_classes + num_classes
print(f"LogisticRegression: {input_size} -> {num_classes}, 参数量: {total_params}")
def softmax(self, x):
"""Softmax函数"""
x_shifted = x - np.max(x, axis=1, keepdims=True)
exp_x = np.exp(x_shifted)
return exp_x / np.sum(exp_x, axis=1, keepdims=True)
def forward(self, X):
"""前向传播"""
# 线性变换
z = X @ self.W + self.b
# Softmax输出概率
return self.softmax(z)
def backward(self, X, y):
"""反向传播(梯度下降)"""
batch_size = X.shape[0]
probs = self.forward(X)
# Softmax + 交叉熵梯度
d_z = probs.copy()
# 应用类别权重:减去权重值而不是1
# 公式: dL/dz_y = w_y * (p_y - 1) = w_y*p_y - w_y
if self.class_weight is not None:
for i in range(batch_size):
d_z[i, y[i]] -= self.class_weight[y[i]]
else:
d_z[np.arange(batch_size), y] -= 1
# 梯度
d_W = X.T @ d_z
d_b = np.sum(d_z, axis=0)
# 更新
self.W -= self.lr * d_W / batch_size
self.b -= self.lr * d_b / batch_size
def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True):
"""训练"""
num_samples = len(X)
num_batches = (num_samples + batch_size - 1) // batch_size
for epoch in range(epochs):
# 打乱
indices = np.random.permutation(num_samples)
X_shuffled = X[indices]
y_shuffled = y[indices]
epoch_loss = 0
for batch_idx in range(num_batches):
start = batch_idx * batch_size
end = min(start + batch_size, num_samples)
X_batch = X_shuffled[start:end]
y_batch = y_shuffled[start:end]
# 前向 + 反向
probs = self.forward(X_batch)
self.backward(X_batch, y_batch)
# 损失
loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1)))
epoch_loss += loss
# 评估
if verbose and (epoch + 1) % 20 == 0:
train_acc = self.accuracy(X, y)
msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}"
if X_val is not None:
val_acc = self.accuracy(X_val, y_val)
msg += f" | 测试准确率: {val_acc:.4f}"
print(msg)
return self
def predict(self, X):
return np.argmax(self.forward(X), axis=1)
def predict_proba(self, X):
return self.forward(X)
def accuracy(self, X, y):
return np.mean(self.predict(X) == y)
def save(self, filepath):
"""保存模型权重"""
np.save(filepath + '_W.npy', self.W)
np.save(filepath + '_b.npy', self.b)
print(f"模型已保存: {filepath}")
@staticmethod
def load(filepath, input_size, num_classes=2, learning_rate=0.1):
"""加载模型权重"""
model = LogisticRegression(input_size, num_classes, learning_rate)
model.W = np.load(filepath + '_W.npy')
model.b = np.load(filepath + '_b.npy')
print(f"模型已加载: {filepath}")
return model
class MLP(BaseModel):
"""
多层感知机(神经网络)
结构:输入 → 线性变换 → ReLU → 线性变换 → Softmax → 输出
和LogisticRegression的区别:
- 多了一层隐藏层 + 非线性激活
- 可以学习非线性关系
- 参数量更大
多层感知机(神经网络)
结构:
输入(784) → 线性变换 → ReLU → 线性变换 → Softmax → 输出(10)
参数量:
- W1: input_size × hidden_size
- b1: hidden_size
- W2: hidden_size × num_classes
- b2: num_classes
W1: 784 × 128 = 100,352
b1: 128
W2: 128 × 10 = 1,280
b2: 10
总计: ~101,770 参数
"""
def __init__(self, input_size, hidden_size=64, num_classes=2,
learning_rate=0.1, keep_prob=1.0, class_weight=None, seed=42):
def __init__(self, input_size=784, hidden_size=128, num_classes=10,
learning_rate=0.1, seed=42):
np.random.seed(seed)
# 第一层权重
# ===== 第一层: 输入 → 隐藏层 =====
# 权重: (input_size, hidden_size)
# Xavier初始化适合ReLU
self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size)
self.b1 = np.zeros(hidden_size)
# 第二层权重
# ===== 第二层: 隐藏层 → 输出 =====
# 权重: (hidden_size, num_classes)
self.W2 = np.random.randn(hidden_size, num_classes) * np.sqrt(2.0 / hidden_size)
self.b2 = np.zeros(num_classes)
# 保存超参数
self.lr = learning_rate
self.keep_prob = keep_prob
self.hidden_size = hidden_size
self.input_size = input_size
self.hidden_size = hidden_size
self.num_classes = num_classes
self.class_weight = class_weight # 类别权重
total_params = (input_size * hidden_size + hidden_size +
# 打印模型信息
total_params = (input_size * hidden_size + hidden_size +
hidden_size * num_classes + num_classes)
print(f"MLP: {input_size} -> {hidden_size} -> {num_classes}, 参数量: {total_params}")
print(f"\n{'='*50}")
print(f"MLP 网络结构:")
print(f" 输入层: {input_size} 神经元")
print(f" 隐藏层: {hidden_size} 神经元 + ReLU")
print(f" 输出层: {num_classes} 神经元 + Softmax")
print(f" 参数量: {total_params:,}")
print(f"{'='*50}")
def relu(self, x):
"""ReLU激活"""
"""ReLU激活函数: max(0, x)"""
return np.maximum(0, x)
def relu_derivative(self, x):
"""ReLU导数"""
"""ReLU导数: x > 0 时为1否则为0"""
return (x > 0).astype(float)
def softmax(self, x):
"""Softmax函数"""
"""
Softmax函数: 将数值转换为概率分布
softmax(x_i) = exp(x_i) / sum(exp(x_j))
技巧: 减去最大值避免数值溢出
"""
x_shifted = x - np.max(x, axis=1, keepdims=True)
exp_x = np.exp(x_shifted)
return exp_x / np.sum(exp_x, axis=1, keepdims=True)
def forward(self, X):
"""前向传播"""
# 第一层
self.z1 = X @ self.W1 + self.b1
self.a1 = self.relu(self.z1)
# Dropout(训练时)
if self.keep_prob < 1.0 and hasattr(self, 'training'):
self.d1 = (np.random.rand(*self.a1.shape) < self.keep_prob).astype(float)
self.a1 *= self.d1
self.a1 /= self.keep_prob
# 第二层
self.z2 = self.a1 @ self.W2 + self.b2
self.probs = self.softmax(self.z2)
"""
前向传播
Args:
X: (batch_size, 784) 图像像素值
Returns:
probs: (batch_size, 10) 每个类的概率
"""
# ===== 第一层计算 =====
# z1 = X @ W1 + b1
# a1 = relu(z1)
self.z1 = X @ self.W1 + self.b1 # (batch, 784) @ (784, 128) = (batch, 128)
self.a1 = self.relu(self.z1) # (batch, 128)
# ===== 第二层计算 =====
# z2 = a1 @ W2 + b2
# probs = softmax(z2)
self.z2 = self.a1 @ self.W2 + self.b2 # (batch, 128) @ (128, 10) = (batch, 10)
self.probs = self.softmax(self.z2) # (batch, 10)
return self.probs
def backward(self, X, y):
"""反向传播"""
"""
反向传播(梯度下降)
Args:
X: (batch_size, 784) 图像
y: (batch_size, 10) One-Hot标签
"""
batch_size = X.shape[0]
# 输出层梯度
d_z2 = self.probs.copy()
# ===== 输出层梯度 =====
# Softmax + 交叉熵的梯度简化为: p - y
d_z2 = self.probs - y # (batch, 10)
# 应用类别权重
if self.class_weight is not None:
for i in range(batch_size):
d_z2[i, y[i]] -= self.class_weight[y[i]]
else:
d_z2[np.arange(batch_size), y] -= 1
# ===== 第二层梯度 =====
d_W2 = self.a1.T @ d_z2 # (128, 10)
d_b2 = np.sum(d_z2, axis=0) # (10,)
# 第二层梯度
d_W2 = self.a1.T @ d_z2
d_b2 = np.sum(d_z2, axis=0)
# ===== 隐藏层梯度 =====
d_a1 = d_z2 @ self.W2.T # (batch, 128)
d_z1 = d_a1 * self.relu_derivative(self.z1) # (batch, 128)
# 隐藏层梯度
d_a1 = d_z2 @ self.W2.T
d_z1 = d_a1 * self.relu_derivative(self.z1)
# ===== 第一层梯度 =====
d_W1 = X.T @ d_z1 # (784, 128)
d_b1 = np.sum(d_z1, axis=0) # (128,)
# Dropout梯度
if self.keep_prob < 1.0 and hasattr(self, 'd1'):
d_z1 *= self.d1
d_z1 /= self.keep_prob
# ===== 梯度裁剪(防止梯度爆炸) =====
max_grad = 1.0
d_W1 = np.clip(d_W1, -max_grad, max_grad)
d_W2 = np.clip(d_W2, -max_grad, max_grad)
d_b1 = np.clip(d_b1, -max_grad, max_grad)
d_b2 = np.clip(d_b2, -max_grad, max_grad)
# 第一层梯度
d_W1 = X.T @ d_z1
d_b1 = np.sum(d_z1, axis=0)
# 更新
# ===== 更新权重(梯度下降) =====
self.W1 -= self.lr * d_W1 / batch_size
self.b1 -= self.lr * d_b1 / batch_size
self.W2 -= self.lr * d_W2 / batch_size
self.b2 -= self.lr * d_b2 / batch_size
def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True):
"""训练"""
num_samples = len(X)
num_batches = (num_samples + batch_size - 1) // batch_size
def cross_entropy_loss(self, probs, y):
"""
交叉熵损失
L = -sum(y * log(p)) / N
"""
# 取真实类别的概率
correct_probs = probs[np.arange(len(y)), y.argmax(axis=1)]
# 避免log(0)
loss = -np.mean(np.log(np.clip(correct_probs, 1e-10, 1.0)))
return loss
def fit(self, X_train, y_train, X_val=None, y_val=None,
epochs=50, batch_size=64, verbose=True):
"""
训练模型
Args:
X_train: 训练数据 (N, 784)
y_train: 训练标签 (N, 10) One-Hot
X_val: 验证数据(可选)
y_val: 验证标签(可选)
epochs: 训练轮数
batch_size: 批大小
verbose: 是否打印进度
"""
N = len(X_train)
num_batches = (N + batch_size - 1) // batch_size
for epoch in range(epochs):
# 打乱
indices = np.random.permutation(num_samples)
X_shuffled = X[indices]
y_shuffled = y[indices]
# ===== 打乱数据 =====
indices = np.random.permutation(N)
X_shuffled = X_train[indices]
y_shuffled = y_train[indices]
epoch_loss = 0
self.training = True # 开启Dropout
# ===== 批训练 =====
for batch_idx in range(num_batches):
start = batch_idx * batch_size
end = min(start + batch_size, num_samples)
end = min(start + batch_size, N)
X_batch = X_shuffled[start:end]
y_batch = y_shuffled[start:end]
# 前向 + 反向
# 前向传播
probs = self.forward(X_batch)
# 反向传播
self.backward(X_batch, y_batch)
# 损失
loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1)))
# 计算损失
loss = self.cross_entropy_loss(probs, y_batch)
epoch_loss += loss
self.training = False # 关闭Dropout
# 评估
if verbose and (epoch + 1) % 20 == 0:
train_acc = self.accuracy(X, y)
# ===== 打印进度 =====
if verbose and (epoch + 1) % 5 == 0:
train_acc = self.accuracy(X_train, y_train)
msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}"
if X_val is not None:
val_acc = self.accuracy(X_val, y_val)
msg += f" | 测试准确率: {val_acc:.4f}"
print(msg)
return self
def predict(self, X):
return np.argmax(self.forward(X), axis=1)
"""
预测类别
Args:
X: (N, 784) 图像
Returns:
predictions: (N,) 预测的类别标签 (0-9)
"""
probs = self.forward(X)
return np.argmax(probs, axis=1)
def predict_proba(self, X):
"""
预测概率
Returns:
probs: (N, 10) 每个类的概率
"""
return self.forward(X)
def accuracy(self, X, y):
return np.mean(self.predict(X) == y)
"""
计算准确率
Args:
X: (N, 784) 图像
y: (N,) 或 (N, 10) 标签
"""
if len(y.shape) > 1:
y = np.argmax(y, axis=1)
predictions = self.predict(X)
return np.mean(predictions == y)
def save(self, filepath):
"""保存模型权重"""
@@ -317,26 +263,43 @@ class MLP(BaseModel):
np.save(filepath + '_b1.npy', self.b1)
np.save(filepath + '_W2.npy', self.W2)
np.save(filepath + '_b2.npy', self.b2)
print(f"模型已保存: {filepath}")
print(f"\n模型已保存: {filepath}")
@staticmethod
def load(filepath, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0):
def load(filepath, input_size=784, hidden_size=128, num_classes=10, learning_rate=0.1):
"""加载模型权重"""
model = MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob)
model = MLP(input_size, hidden_size, num_classes, learning_rate)
model.W1 = np.load(filepath + '_W1.npy')
model.b1 = np.load(filepath + '_b1.npy')
model.W2 = np.load(filepath + '_W2.npy')
model.b2 = np.load(filepath + '_b2.npy')
print(f"模型已加载: {filepath}")
print(f"\n模型已加载: {filepath}")
return model
def create_model(model_type, input_size, hidden_size=64, num_classes=2,
learning_rate=0.1, keep_prob=1.0, class_weight=None):
"""工厂函数:创建模型"""
if model_type == 'lr':
return LogisticRegression(input_size, num_classes, learning_rate, class_weight)
elif model_type == 'mlp':
return MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob, class_weight)
else:
raise ValueError(f"未知模型类型: {model_type}")
# ===== 测试代码 =====
if __name__ == '__main__':
# 简单测试
print("测试MLP模型...")
model = MLP(input_size=784, hidden_size=128, num_classes=10, learning_rate=0.1)
# 模拟数据
X_test = np.random.randn(32, 784)
y_test = np.zeros((32, 10))
for i in range(32):
y_test[i, i % 10] = 1
# 前向传播测试
probs = model.forward(X_test)
print(f"输出概率形状: {probs.shape}")
print(f"概率和: {probs[0].sum():.4f} (应该接近1)")
# 反向传播测试
model.backward(X_test, y_test)
print("反向传播测试通过!")
# 预测测试
preds = model.predict(X_test)
print(f"预测结果: {preds}")