Upload model_numpy.py
This commit is contained in:
342
model_numpy.py
Normal file
342
model_numpy.py
Normal file
@@ -0,0 +1,342 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
模型模块 - 纯NumPy实现
|
||||
|
||||
支持两种模型:
|
||||
1. Logistic Regression(逻辑回归)- 线性模型
|
||||
2. MLP(多层感知机)- 两层全连接网络
|
||||
|
||||
设计思路:
|
||||
- 两种模型都共享相同的接口,方便对比
|
||||
- 代码简洁,每行都有详细注释
|
||||
- 手动实现反向传播,原理透明
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
class BaseModel:
|
||||
"""模型基类"""
|
||||
def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): pass
|
||||
def predict(self, X): pass
|
||||
def predict_proba(self, X): pass
|
||||
def accuracy(self, X, y): pass
|
||||
|
||||
|
||||
class LogisticRegression(BaseModel):
|
||||
"""
|
||||
逻辑回归(线性分类器)
|
||||
|
||||
结构:输入 → 线性变换 → Softmax → 输出
|
||||
|
||||
原理:
|
||||
- 线性变换: z = X @ W + b
|
||||
- Softmax: 将线性输出转为概率分布
|
||||
|
||||
参数量:input_size × num_classes + num_classes
|
||||
"""
|
||||
|
||||
def __init__(self, input_size, num_classes=2, learning_rate=0.1,
|
||||
class_weight=None, seed=42):
|
||||
np.random.seed(seed)
|
||||
|
||||
# 权重初始化(Xavier)
|
||||
self.W = np.random.randn(input_size, num_classes) * np.sqrt(2.0 / input_size)
|
||||
self.b = np.zeros(num_classes)
|
||||
|
||||
self.lr = learning_rate
|
||||
self.input_size = input_size
|
||||
self.num_classes = num_classes
|
||||
self.class_weight = class_weight # 类别权重
|
||||
|
||||
total_params = input_size * num_classes + num_classes
|
||||
print(f"LogisticRegression: {input_size} -> {num_classes}, 参数量: {total_params}")
|
||||
|
||||
def softmax(self, x):
|
||||
"""Softmax函数"""
|
||||
x_shifted = x - np.max(x, axis=1, keepdims=True)
|
||||
exp_x = np.exp(x_shifted)
|
||||
return exp_x / np.sum(exp_x, axis=1, keepdims=True)
|
||||
|
||||
def forward(self, X):
|
||||
"""前向传播"""
|
||||
# 线性变换
|
||||
z = X @ self.W + self.b
|
||||
# Softmax输出概率
|
||||
return self.softmax(z)
|
||||
|
||||
def backward(self, X, y):
|
||||
"""反向传播(梯度下降)"""
|
||||
batch_size = X.shape[0]
|
||||
probs = self.forward(X)
|
||||
|
||||
# Softmax + 交叉熵梯度
|
||||
d_z = probs.copy()
|
||||
|
||||
# 应用类别权重:减去权重值而不是1
|
||||
# 公式: dL/dz_y = w_y * (p_y - 1) = w_y*p_y - w_y
|
||||
if self.class_weight is not None:
|
||||
for i in range(batch_size):
|
||||
d_z[i, y[i]] -= self.class_weight[y[i]]
|
||||
else:
|
||||
d_z[np.arange(batch_size), y] -= 1
|
||||
|
||||
# 梯度
|
||||
d_W = X.T @ d_z
|
||||
d_b = np.sum(d_z, axis=0)
|
||||
|
||||
# 更新
|
||||
self.W -= self.lr * d_W / batch_size
|
||||
self.b -= self.lr * d_b / batch_size
|
||||
|
||||
def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True):
|
||||
"""训练"""
|
||||
num_samples = len(X)
|
||||
num_batches = (num_samples + batch_size - 1) // batch_size
|
||||
|
||||
for epoch in range(epochs):
|
||||
# 打乱
|
||||
indices = np.random.permutation(num_samples)
|
||||
X_shuffled = X[indices]
|
||||
y_shuffled = y[indices]
|
||||
|
||||
epoch_loss = 0
|
||||
for batch_idx in range(num_batches):
|
||||
start = batch_idx * batch_size
|
||||
end = min(start + batch_size, num_samples)
|
||||
X_batch = X_shuffled[start:end]
|
||||
y_batch = y_shuffled[start:end]
|
||||
|
||||
# 前向 + 反向
|
||||
probs = self.forward(X_batch)
|
||||
self.backward(X_batch, y_batch)
|
||||
|
||||
# 损失
|
||||
loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1)))
|
||||
epoch_loss += loss
|
||||
|
||||
# 评估
|
||||
if verbose and (epoch + 1) % 20 == 0:
|
||||
train_acc = self.accuracy(X, y)
|
||||
msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}"
|
||||
if X_val is not None:
|
||||
val_acc = self.accuracy(X_val, y_val)
|
||||
msg += f" | 测试准确率: {val_acc:.4f}"
|
||||
print(msg)
|
||||
|
||||
return self
|
||||
|
||||
def predict(self, X):
|
||||
return np.argmax(self.forward(X), axis=1)
|
||||
|
||||
def predict_proba(self, X):
|
||||
return self.forward(X)
|
||||
|
||||
def accuracy(self, X, y):
|
||||
return np.mean(self.predict(X) == y)
|
||||
|
||||
def save(self, filepath):
|
||||
"""保存模型权重"""
|
||||
np.save(filepath + '_W.npy', self.W)
|
||||
np.save(filepath + '_b.npy', self.b)
|
||||
print(f"模型已保存: {filepath}")
|
||||
|
||||
@staticmethod
|
||||
def load(filepath, input_size, num_classes=2, learning_rate=0.1):
|
||||
"""加载模型权重"""
|
||||
model = LogisticRegression(input_size, num_classes, learning_rate)
|
||||
model.W = np.load(filepath + '_W.npy')
|
||||
model.b = np.load(filepath + '_b.npy')
|
||||
print(f"模型已加载: {filepath}")
|
||||
return model
|
||||
|
||||
|
||||
class MLP(BaseModel):
|
||||
"""
|
||||
多层感知机(神经网络)
|
||||
|
||||
结构:输入 → 线性变换 → ReLU → 线性变换 → Softmax → 输出
|
||||
|
||||
和LogisticRegression的区别:
|
||||
- 多了一层隐藏层 + 非线性激活
|
||||
- 可以学习非线性关系
|
||||
- 参数量更大
|
||||
|
||||
参数量:
|
||||
- W1: input_size × hidden_size
|
||||
- b1: hidden_size
|
||||
- W2: hidden_size × num_classes
|
||||
- b2: num_classes
|
||||
"""
|
||||
|
||||
def __init__(self, input_size, hidden_size=64, num_classes=2,
|
||||
learning_rate=0.1, keep_prob=1.0, class_weight=None, seed=42):
|
||||
np.random.seed(seed)
|
||||
|
||||
# 第一层权重
|
||||
self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size)
|
||||
self.b1 = np.zeros(hidden_size)
|
||||
|
||||
# 第二层权重
|
||||
self.W2 = np.random.randn(hidden_size, num_classes) * np.sqrt(2.0 / hidden_size)
|
||||
self.b2 = np.zeros(num_classes)
|
||||
|
||||
self.lr = learning_rate
|
||||
self.keep_prob = keep_prob
|
||||
self.hidden_size = hidden_size
|
||||
self.input_size = input_size
|
||||
self.num_classes = num_classes
|
||||
self.class_weight = class_weight # 类别权重
|
||||
|
||||
total_params = (input_size * hidden_size + hidden_size +
|
||||
hidden_size * num_classes + num_classes)
|
||||
print(f"MLP: {input_size} -> {hidden_size} -> {num_classes}, 参数量: {total_params}")
|
||||
|
||||
def relu(self, x):
|
||||
"""ReLU激活"""
|
||||
return np.maximum(0, x)
|
||||
|
||||
def relu_derivative(self, x):
|
||||
"""ReLU导数"""
|
||||
return (x > 0).astype(float)
|
||||
|
||||
def softmax(self, x):
|
||||
"""Softmax函数"""
|
||||
x_shifted = x - np.max(x, axis=1, keepdims=True)
|
||||
exp_x = np.exp(x_shifted)
|
||||
return exp_x / np.sum(exp_x, axis=1, keepdims=True)
|
||||
|
||||
def forward(self, X):
|
||||
"""前向传播"""
|
||||
# 第一层
|
||||
self.z1 = X @ self.W1 + self.b1
|
||||
self.a1 = self.relu(self.z1)
|
||||
|
||||
# Dropout(训练时)
|
||||
if self.keep_prob < 1.0 and hasattr(self, 'training'):
|
||||
self.d1 = (np.random.rand(*self.a1.shape) < self.keep_prob).astype(float)
|
||||
self.a1 *= self.d1
|
||||
self.a1 /= self.keep_prob
|
||||
|
||||
# 第二层
|
||||
self.z2 = self.a1 @ self.W2 + self.b2
|
||||
self.probs = self.softmax(self.z2)
|
||||
|
||||
return self.probs
|
||||
|
||||
def backward(self, X, y):
|
||||
"""反向传播"""
|
||||
batch_size = X.shape[0]
|
||||
|
||||
# 输出层梯度
|
||||
d_z2 = self.probs.copy()
|
||||
|
||||
# 应用类别权重
|
||||
if self.class_weight is not None:
|
||||
for i in range(batch_size):
|
||||
d_z2[i, y[i]] -= self.class_weight[y[i]]
|
||||
else:
|
||||
d_z2[np.arange(batch_size), y] -= 1
|
||||
|
||||
# 第二层梯度
|
||||
d_W2 = self.a1.T @ d_z2
|
||||
d_b2 = np.sum(d_z2, axis=0)
|
||||
|
||||
# 隐藏层梯度
|
||||
d_a1 = d_z2 @ self.W2.T
|
||||
d_z1 = d_a1 * self.relu_derivative(self.z1)
|
||||
|
||||
# Dropout梯度
|
||||
if self.keep_prob < 1.0 and hasattr(self, 'd1'):
|
||||
d_z1 *= self.d1
|
||||
d_z1 /= self.keep_prob
|
||||
|
||||
# 第一层梯度
|
||||
d_W1 = X.T @ d_z1
|
||||
d_b1 = np.sum(d_z1, axis=0)
|
||||
|
||||
# 更新
|
||||
self.W1 -= self.lr * d_W1 / batch_size
|
||||
self.b1 -= self.lr * d_b1 / batch_size
|
||||
self.W2 -= self.lr * d_W2 / batch_size
|
||||
self.b2 -= self.lr * d_b2 / batch_size
|
||||
|
||||
def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True):
|
||||
"""训练"""
|
||||
num_samples = len(X)
|
||||
num_batches = (num_samples + batch_size - 1) // batch_size
|
||||
|
||||
for epoch in range(epochs):
|
||||
# 打乱
|
||||
indices = np.random.permutation(num_samples)
|
||||
X_shuffled = X[indices]
|
||||
y_shuffled = y[indices]
|
||||
|
||||
epoch_loss = 0
|
||||
self.training = True # 开启Dropout
|
||||
|
||||
for batch_idx in range(num_batches):
|
||||
start = batch_idx * batch_size
|
||||
end = min(start + batch_size, num_samples)
|
||||
X_batch = X_shuffled[start:end]
|
||||
y_batch = y_shuffled[start:end]
|
||||
|
||||
# 前向 + 反向
|
||||
probs = self.forward(X_batch)
|
||||
self.backward(X_batch, y_batch)
|
||||
|
||||
# 损失
|
||||
loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1)))
|
||||
epoch_loss += loss
|
||||
|
||||
self.training = False # 关闭Dropout
|
||||
|
||||
# 评估
|
||||
if verbose and (epoch + 1) % 20 == 0:
|
||||
train_acc = self.accuracy(X, y)
|
||||
msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}"
|
||||
if X_val is not None:
|
||||
val_acc = self.accuracy(X_val, y_val)
|
||||
msg += f" | 测试准确率: {val_acc:.4f}"
|
||||
print(msg)
|
||||
|
||||
return self
|
||||
|
||||
def predict(self, X):
|
||||
return np.argmax(self.forward(X), axis=1)
|
||||
|
||||
def predict_proba(self, X):
|
||||
return self.forward(X)
|
||||
|
||||
def accuracy(self, X, y):
|
||||
return np.mean(self.predict(X) == y)
|
||||
|
||||
def save(self, filepath):
|
||||
"""保存模型权重"""
|
||||
np.save(filepath + '_W1.npy', self.W1)
|
||||
np.save(filepath + '_b1.npy', self.b1)
|
||||
np.save(filepath + '_W2.npy', self.W2)
|
||||
np.save(filepath + '_b2.npy', self.b2)
|
||||
print(f"模型已保存: {filepath}")
|
||||
|
||||
@staticmethod
|
||||
def load(filepath, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0):
|
||||
"""加载模型权重"""
|
||||
model = MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob)
|
||||
model.W1 = np.load(filepath + '_W1.npy')
|
||||
model.b1 = np.load(filepath + '_b1.npy')
|
||||
model.W2 = np.load(filepath + '_W2.npy')
|
||||
model.b2 = np.load(filepath + '_b2.npy')
|
||||
print(f"模型已加载: {filepath}")
|
||||
return model
|
||||
|
||||
|
||||
def create_model(model_type, input_size, hidden_size=64, num_classes=2,
|
||||
learning_rate=0.1, keep_prob=1.0, class_weight=None):
|
||||
"""工厂函数:创建模型"""
|
||||
if model_type == 'lr':
|
||||
return LogisticRegression(input_size, num_classes, learning_rate, class_weight)
|
||||
elif model_type == 'mlp':
|
||||
return MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob, class_weight)
|
||||
else:
|
||||
raise ValueError(f"未知模型类型: {model_type}")
|
||||
Reference in New Issue
Block a user