# -*- coding: utf-8 -*- """ 模型模块 - 纯NumPy实现 支持两种模型: 1. Logistic Regression(逻辑回归)- 线性模型 2. MLP(多层感知机)- 两层全连接网络 设计思路: - 两种模型都共享相同的接口,方便对比 - 代码简洁,每行都有详细注释 - 手动实现反向传播,原理透明 """ import numpy as np class BaseModel: """模型基类""" def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): pass def predict(self, X): pass def predict_proba(self, X): pass def accuracy(self, X, y): pass class LogisticRegression(BaseModel): """ 逻辑回归(线性分类器) 结构:输入 → 线性变换 → Softmax → 输出 原理: - 线性变换: z = X @ W + b - Softmax: 将线性输出转为概率分布 参数量:input_size × num_classes + num_classes """ def __init__(self, input_size, num_classes=2, learning_rate=0.1, class_weight=None, seed=42): np.random.seed(seed) # 权重初始化(Xavier) self.W = np.random.randn(input_size, num_classes) * np.sqrt(2.0 / input_size) self.b = np.zeros(num_classes) self.lr = learning_rate self.input_size = input_size self.num_classes = num_classes self.class_weight = class_weight # 类别权重 total_params = input_size * num_classes + num_classes print(f"LogisticRegression: {input_size} -> {num_classes}, 参数量: {total_params}") def softmax(self, x): """Softmax函数""" x_shifted = x - np.max(x, axis=1, keepdims=True) exp_x = np.exp(x_shifted) return exp_x / np.sum(exp_x, axis=1, keepdims=True) def forward(self, X): """前向传播""" # 线性变换 z = X @ self.W + self.b # Softmax输出概率 return self.softmax(z) def backward(self, X, y): """反向传播(梯度下降)""" batch_size = X.shape[0] probs = self.forward(X) # Softmax + 交叉熵梯度 d_z = probs.copy() # 应用类别权重:减去权重值而不是1 # 公式: dL/dz_y = w_y * (p_y - 1) = w_y*p_y - w_y if self.class_weight is not None: for i in range(batch_size): d_z[i, y[i]] -= self.class_weight[y[i]] else: d_z[np.arange(batch_size), y] -= 1 # 梯度 d_W = X.T @ d_z d_b = np.sum(d_z, axis=0) # 更新 self.W -= self.lr * d_W / batch_size self.b -= self.lr * d_b / batch_size def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): """训练""" num_samples = len(X) num_batches = (num_samples + batch_size - 1) // batch_size for epoch in range(epochs): # 打乱 indices = np.random.permutation(num_samples) X_shuffled = X[indices] y_shuffled = y[indices] epoch_loss = 0 for batch_idx in range(num_batches): start = batch_idx * batch_size end = min(start + batch_size, num_samples) X_batch = X_shuffled[start:end] y_batch = y_shuffled[start:end] # 前向 + 反向 probs = self.forward(X_batch) self.backward(X_batch, y_batch) # 损失 loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) epoch_loss += loss # 评估 if verbose and (epoch + 1) % 20 == 0: train_acc = self.accuracy(X, y) msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" if X_val is not None: val_acc = self.accuracy(X_val, y_val) msg += f" | 测试准确率: {val_acc:.4f}" print(msg) return self def predict(self, X): return np.argmax(self.forward(X), axis=1) def predict_proba(self, X): return self.forward(X) def accuracy(self, X, y): return np.mean(self.predict(X) == y) def save(self, filepath): """保存模型权重""" np.save(filepath + '_W.npy', self.W) np.save(filepath + '_b.npy', self.b) print(f"模型已保存: {filepath}") @staticmethod def load(filepath, input_size, num_classes=2, learning_rate=0.1): """加载模型权重""" model = LogisticRegression(input_size, num_classes, learning_rate) model.W = np.load(filepath + '_W.npy') model.b = np.load(filepath + '_b.npy') print(f"模型已加载: {filepath}") return model class MLP(BaseModel): """ 多层感知机(神经网络) 结构:输入 → 线性变换 → ReLU → 线性变换 → Softmax → 输出 和LogisticRegression的区别: - 多了一层隐藏层 + 非线性激活 - 可以学习非线性关系 - 参数量更大 参数量: - W1: input_size × hidden_size - b1: hidden_size - W2: hidden_size × num_classes - b2: num_classes """ def __init__(self, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0, class_weight=None, seed=42): np.random.seed(seed) # 第一层权重 self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size) self.b1 = np.zeros(hidden_size) # 第二层权重 self.W2 = np.random.randn(hidden_size, num_classes) * np.sqrt(2.0 / hidden_size) self.b2 = np.zeros(num_classes) self.lr = learning_rate self.keep_prob = keep_prob self.hidden_size = hidden_size self.input_size = input_size self.num_classes = num_classes self.class_weight = class_weight # 类别权重 total_params = (input_size * hidden_size + hidden_size + hidden_size * num_classes + num_classes) print(f"MLP: {input_size} -> {hidden_size} -> {num_classes}, 参数量: {total_params}") def relu(self, x): """ReLU激活""" return np.maximum(0, x) def relu_derivative(self, x): """ReLU导数""" return (x > 0).astype(float) def softmax(self, x): """Softmax函数""" x_shifted = x - np.max(x, axis=1, keepdims=True) exp_x = np.exp(x_shifted) return exp_x / np.sum(exp_x, axis=1, keepdims=True) def forward(self, X): """前向传播""" # 第一层 self.z1 = X @ self.W1 + self.b1 self.a1 = self.relu(self.z1) # Dropout(训练时) if self.keep_prob < 1.0 and hasattr(self, 'training'): self.d1 = (np.random.rand(*self.a1.shape) < self.keep_prob).astype(float) self.a1 *= self.d1 self.a1 /= self.keep_prob # 第二层 self.z2 = self.a1 @ self.W2 + self.b2 self.probs = self.softmax(self.z2) return self.probs def backward(self, X, y): """反向传播""" batch_size = X.shape[0] # 输出层梯度 d_z2 = self.probs.copy() # 应用类别权重 if self.class_weight is not None: for i in range(batch_size): d_z2[i, y[i]] -= self.class_weight[y[i]] else: d_z2[np.arange(batch_size), y] -= 1 # 第二层梯度 d_W2 = self.a1.T @ d_z2 d_b2 = np.sum(d_z2, axis=0) # 隐藏层梯度 d_a1 = d_z2 @ self.W2.T d_z1 = d_a1 * self.relu_derivative(self.z1) # Dropout梯度 if self.keep_prob < 1.0 and hasattr(self, 'd1'): d_z1 *= self.d1 d_z1 /= self.keep_prob # 第一层梯度 d_W1 = X.T @ d_z1 d_b1 = np.sum(d_z1, axis=0) # 更新 self.W1 -= self.lr * d_W1 / batch_size self.b1 -= self.lr * d_b1 / batch_size self.W2 -= self.lr * d_W2 / batch_size self.b2 -= self.lr * d_b2 / batch_size def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): """训练""" num_samples = len(X) num_batches = (num_samples + batch_size - 1) // batch_size for epoch in range(epochs): # 打乱 indices = np.random.permutation(num_samples) X_shuffled = X[indices] y_shuffled = y[indices] epoch_loss = 0 self.training = True # 开启Dropout for batch_idx in range(num_batches): start = batch_idx * batch_size end = min(start + batch_size, num_samples) X_batch = X_shuffled[start:end] y_batch = y_shuffled[start:end] # 前向 + 反向 probs = self.forward(X_batch) self.backward(X_batch, y_batch) # 损失 loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) epoch_loss += loss self.training = False # 关闭Dropout # 评估 if verbose and (epoch + 1) % 20 == 0: train_acc = self.accuracy(X, y) msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" if X_val is not None: val_acc = self.accuracy(X_val, y_val) msg += f" | 测试准确率: {val_acc:.4f}" print(msg) return self def predict(self, X): return np.argmax(self.forward(X), axis=1) def predict_proba(self, X): return self.forward(X) def accuracy(self, X, y): return np.mean(self.predict(X) == y) def save(self, filepath): """保存模型权重""" np.save(filepath + '_W1.npy', self.W1) np.save(filepath + '_b1.npy', self.b1) np.save(filepath + '_W2.npy', self.W2) np.save(filepath + '_b2.npy', self.b2) print(f"模型已保存: {filepath}") @staticmethod def load(filepath, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0): """加载模型权重""" model = MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob) model.W1 = np.load(filepath + '_W1.npy') model.b1 = np.load(filepath + '_b1.npy') model.W2 = np.load(filepath + '_W2.npy') model.b2 = np.load(filepath + '_b2.npy') print(f"模型已加载: {filepath}") return model def create_model(model_type, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0, class_weight=None): """工厂函数:创建模型""" if model_type == 'lr': return LogisticRegression(input_size, num_classes, learning_rate, class_weight) elif model_type == 'mlp': return MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob, class_weight) else: raise ValueError(f"未知模型类型: {model_type}")