From 4560f4f31b79a4c08e24eb76224827f225ee9f42 Mon Sep 17 00:00:00 2001 From: gitea_eternal <401029566@qq.com> Date: Mon, 27 Apr 2026 21:43:40 +0800 Subject: [PATCH] Upload model_numpy.py --- model_numpy.py | 342 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 342 insertions(+) create mode 100644 model_numpy.py diff --git a/model_numpy.py b/model_numpy.py new file mode 100644 index 0000000..e8d7adf --- /dev/null +++ b/model_numpy.py @@ -0,0 +1,342 @@ +# -*- coding: utf-8 -*- +""" +模型模块 - 纯NumPy实现 + +支持两种模型: +1. Logistic Regression(逻辑回归)- 线性模型 +2. MLP(多层感知机)- 两层全连接网络 + +设计思路: +- 两种模型都共享相同的接口,方便对比 +- 代码简洁,每行都有详细注释 +- 手动实现反向传播,原理透明 +""" + +import numpy as np + + +class BaseModel: + """模型基类""" + def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): pass + def predict(self, X): pass + def predict_proba(self, X): pass + def accuracy(self, X, y): pass + + +class LogisticRegression(BaseModel): + """ + 逻辑回归(线性分类器) + + 结构:输入 → 线性变换 → Softmax → 输出 + + 原理: + - 线性变换: z = X @ W + b + - Softmax: 将线性输出转为概率分布 + + 参数量:input_size × num_classes + num_classes + """ + + def __init__(self, input_size, num_classes=2, learning_rate=0.1, + class_weight=None, seed=42): + np.random.seed(seed) + + # 权重初始化(Xavier) + self.W = np.random.randn(input_size, num_classes) * np.sqrt(2.0 / input_size) + self.b = np.zeros(num_classes) + + self.lr = learning_rate + self.input_size = input_size + self.num_classes = num_classes + self.class_weight = class_weight # 类别权重 + + total_params = input_size * num_classes + num_classes + print(f"LogisticRegression: {input_size} -> {num_classes}, 参数量: {total_params}") + + def softmax(self, x): + """Softmax函数""" + x_shifted = x - np.max(x, axis=1, keepdims=True) + exp_x = np.exp(x_shifted) + return exp_x / np.sum(exp_x, axis=1, keepdims=True) + + def forward(self, X): + """前向传播""" + # 线性变换 + z = X @ self.W + self.b + # Softmax输出概率 + return self.softmax(z) + + def backward(self, X, y): + """反向传播(梯度下降)""" + batch_size = X.shape[0] + probs = self.forward(X) + + # Softmax + 交叉熵梯度 + d_z = probs.copy() + + # 应用类别权重:减去权重值而不是1 + # 公式: dL/dz_y = w_y * (p_y - 1) = w_y*p_y - w_y + if self.class_weight is not None: + for i in range(batch_size): + d_z[i, y[i]] -= self.class_weight[y[i]] + else: + d_z[np.arange(batch_size), y] -= 1 + + # 梯度 + d_W = X.T @ d_z + d_b = np.sum(d_z, axis=0) + + # 更新 + self.W -= self.lr * d_W / batch_size + self.b -= self.lr * d_b / batch_size + + def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): + """训练""" + num_samples = len(X) + num_batches = (num_samples + batch_size - 1) // batch_size + + for epoch in range(epochs): + # 打乱 + indices = np.random.permutation(num_samples) + X_shuffled = X[indices] + y_shuffled = y[indices] + + epoch_loss = 0 + for batch_idx in range(num_batches): + start = batch_idx * batch_size + end = min(start + batch_size, num_samples) + X_batch = X_shuffled[start:end] + y_batch = y_shuffled[start:end] + + # 前向 + 反向 + probs = self.forward(X_batch) + self.backward(X_batch, y_batch) + + # 损失 + loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) + epoch_loss += loss + + # 评估 + if verbose and (epoch + 1) % 20 == 0: + train_acc = self.accuracy(X, y) + msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" + if X_val is not None: + val_acc = self.accuracy(X_val, y_val) + msg += f" | 测试准确率: {val_acc:.4f}" + print(msg) + + return self + + def predict(self, X): + return np.argmax(self.forward(X), axis=1) + + def predict_proba(self, X): + return self.forward(X) + + def accuracy(self, X, y): + return np.mean(self.predict(X) == y) + + def save(self, filepath): + """保存模型权重""" + np.save(filepath + '_W.npy', self.W) + np.save(filepath + '_b.npy', self.b) + print(f"模型已保存: {filepath}") + + @staticmethod + def load(filepath, input_size, num_classes=2, learning_rate=0.1): + """加载模型权重""" + model = LogisticRegression(input_size, num_classes, learning_rate) + model.W = np.load(filepath + '_W.npy') + model.b = np.load(filepath + '_b.npy') + print(f"模型已加载: {filepath}") + return model + + +class MLP(BaseModel): + """ + 多层感知机(神经网络) + + 结构:输入 → 线性变换 → ReLU → 线性变换 → Softmax → 输出 + + 和LogisticRegression的区别: + - 多了一层隐藏层 + 非线性激活 + - 可以学习非线性关系 + - 参数量更大 + + 参数量: + - W1: input_size × hidden_size + - b1: hidden_size + - W2: hidden_size × num_classes + - b2: num_classes + """ + + def __init__(self, input_size, hidden_size=64, num_classes=2, + learning_rate=0.1, keep_prob=1.0, class_weight=None, seed=42): + np.random.seed(seed) + + # 第一层权重 + self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size) + self.b1 = np.zeros(hidden_size) + + # 第二层权重 + self.W2 = np.random.randn(hidden_size, num_classes) * np.sqrt(2.0 / hidden_size) + self.b2 = np.zeros(num_classes) + + self.lr = learning_rate + self.keep_prob = keep_prob + self.hidden_size = hidden_size + self.input_size = input_size + self.num_classes = num_classes + self.class_weight = class_weight # 类别权重 + + total_params = (input_size * hidden_size + hidden_size + + hidden_size * num_classes + num_classes) + print(f"MLP: {input_size} -> {hidden_size} -> {num_classes}, 参数量: {total_params}") + + def relu(self, x): + """ReLU激活""" + return np.maximum(0, x) + + def relu_derivative(self, x): + """ReLU导数""" + return (x > 0).astype(float) + + def softmax(self, x): + """Softmax函数""" + x_shifted = x - np.max(x, axis=1, keepdims=True) + exp_x = np.exp(x_shifted) + return exp_x / np.sum(exp_x, axis=1, keepdims=True) + + def forward(self, X): + """前向传播""" + # 第一层 + self.z1 = X @ self.W1 + self.b1 + self.a1 = self.relu(self.z1) + + # Dropout(训练时) + if self.keep_prob < 1.0 and hasattr(self, 'training'): + self.d1 = (np.random.rand(*self.a1.shape) < self.keep_prob).astype(float) + self.a1 *= self.d1 + self.a1 /= self.keep_prob + + # 第二层 + self.z2 = self.a1 @ self.W2 + self.b2 + self.probs = self.softmax(self.z2) + + return self.probs + + def backward(self, X, y): + """反向传播""" + batch_size = X.shape[0] + + # 输出层梯度 + d_z2 = self.probs.copy() + + # 应用类别权重 + if self.class_weight is not None: + for i in range(batch_size): + d_z2[i, y[i]] -= self.class_weight[y[i]] + else: + d_z2[np.arange(batch_size), y] -= 1 + + # 第二层梯度 + d_W2 = self.a1.T @ d_z2 + d_b2 = np.sum(d_z2, axis=0) + + # 隐藏层梯度 + d_a1 = d_z2 @ self.W2.T + d_z1 = d_a1 * self.relu_derivative(self.z1) + + # Dropout梯度 + if self.keep_prob < 1.0 and hasattr(self, 'd1'): + d_z1 *= self.d1 + d_z1 /= self.keep_prob + + # 第一层梯度 + d_W1 = X.T @ d_z1 + d_b1 = np.sum(d_z1, axis=0) + + # 更新 + self.W1 -= self.lr * d_W1 / batch_size + self.b1 -= self.lr * d_b1 / batch_size + self.W2 -= self.lr * d_W2 / batch_size + self.b2 -= self.lr * d_b2 / batch_size + + def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): + """训练""" + num_samples = len(X) + num_batches = (num_samples + batch_size - 1) // batch_size + + for epoch in range(epochs): + # 打乱 + indices = np.random.permutation(num_samples) + X_shuffled = X[indices] + y_shuffled = y[indices] + + epoch_loss = 0 + self.training = True # 开启Dropout + + for batch_idx in range(num_batches): + start = batch_idx * batch_size + end = min(start + batch_size, num_samples) + X_batch = X_shuffled[start:end] + y_batch = y_shuffled[start:end] + + # 前向 + 反向 + probs = self.forward(X_batch) + self.backward(X_batch, y_batch) + + # 损失 + loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) + epoch_loss += loss + + self.training = False # 关闭Dropout + + # 评估 + if verbose and (epoch + 1) % 20 == 0: + train_acc = self.accuracy(X, y) + msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" + if X_val is not None: + val_acc = self.accuracy(X_val, y_val) + msg += f" | 测试准确率: {val_acc:.4f}" + print(msg) + + return self + + def predict(self, X): + return np.argmax(self.forward(X), axis=1) + + def predict_proba(self, X): + return self.forward(X) + + def accuracy(self, X, y): + return np.mean(self.predict(X) == y) + + def save(self, filepath): + """保存模型权重""" + np.save(filepath + '_W1.npy', self.W1) + np.save(filepath + '_b1.npy', self.b1) + np.save(filepath + '_W2.npy', self.W2) + np.save(filepath + '_b2.npy', self.b2) + print(f"模型已保存: {filepath}") + + @staticmethod + def load(filepath, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0): + """加载模型权重""" + model = MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob) + model.W1 = np.load(filepath + '_W1.npy') + model.b1 = np.load(filepath + '_b1.npy') + model.W2 = np.load(filepath + '_W2.npy') + model.b2 = np.load(filepath + '_b2.npy') + print(f"模型已加载: {filepath}") + return model + + +def create_model(model_type, input_size, hidden_size=64, num_classes=2, + learning_rate=0.1, keep_prob=1.0, class_weight=None): + """工厂函数:创建模型""" + if model_type == 'lr': + return LogisticRegression(input_size, num_classes, learning_rate, class_weight) + elif model_type == 'mlp': + return MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob, class_weight) + else: + raise ValueError(f"未知模型类型: {model_type}")