From 77b3d2fb8f41ee652a191b742fc9d60c92ee3dda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E8=89=BA=E6=AC=A3?= <2509165020@student.example.com> Date: Tue, 19 May 2026 11:29:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 680 +++++++++---------------------------------------- config.py | 59 +++-- dataset.py | 413 +++++++++++------------------- main.py | 207 +++++++++++++-- model_numpy.py | 469 ++++++++++++++++------------------ 5 files changed, 705 insertions(+), 1123 deletions(-) diff --git a/README.md b/README.md index 227745e..1f48f09 100644 --- a/README.md +++ b/README.md @@ -1,602 +1,172 @@ -# 文本分类实战 - 课堂讲义 +# 手写数字识别 - 纯NumPy MLP实现 -> 本项目用**纯NumPy**实现文本分类,帮助学生理解文本向量化和神经网络的基本原理。 -> -> 类比:MNIST(图像)→ 全连接网络 → 数字分类,本项目是文本版。 +## 项目简介 ---- +使用纯NumPy实现的两层全连接神经网络(MLP),在MNIST数据集上进行手写数字识别。 -## 目录 +**零深度学习框架依赖**,只需 `numpy`。 -1. [实验概述](#1-实验概述) -2. [数据预处理:如何让计算机"读懂"文本](#2-数据预处理如何让计算机读懂文本) -3. [向量化方法:BoW 与 TF-IDF](#3-向量化方法bow-与-tf-idf) -4. [模型一:逻辑回归(Logistic Regression)](#4-模型一逻辑回归logistic-regression) -5. [模型二:多层感知机(MLP)](#5-模型二多层感知机mlp) -6. [训练过程:梯度下降与反向传播](#6-训练过程梯度下降与反向传播) -7. [数据不平衡问题与解决](#7-数据不平衡问题与解决) -8. [实验操作指南](#8-实验操作指南) -9. [预测新文本](#9-预测新文本) - ---- - -## 1. 实验概述 - -### 1.1 任务 - -对中文酒店评论进行**情感分类**: -- 正面评论(好评) -- 负面评论(差评) - -### 1.2 数据集 - -**ChnSentiCorp**(中文酒店评论数据集) -- 总评论数:7765条 -- 正面评论:5322条(68.5%) -- 负面评论:2443条(31.5%) - -数据集已内置,程序会自动下载。 - -### 1.3 整体流程 +## 网络结构 ``` -原始文本 → 分词 → 向量化 → 模型训练 → 预测 - "酒店很好" → ["酒店", "很好"] → [0.3, 0.8, ...] → 正面 +输入层(784) → 隐藏层(128) + ReLU → 输出层(10) + Softmax ``` -### 1.4 代码文件 +- **输入**: 28×28=784 像素值,归一化到 [0, 1] +- **隐藏层**: 128 神经元,ReLU激活函数 +- **输出层**: 10 神经元(数字0-9),Softmax输出概率 -| 文件 | 作用 | -|-----|------| -| `config.py` | 所有超参数配置(改这里来调整实验) | -| `dataset.py` | 数据加载、分词、向量化 | -| `model_numpy.py` | 逻辑回归和MLP模型实现 | -| `train.py` | 训练和对比实验 | -| `predict.py` | 加载模型预测新文本 | - ---- - -## 2. 数据预处理:如何让计算机"读懂"文本 - -### 2.1 为什么文本不能直接用于计算? - -计算机只能处理数字,不能直接处理文字。我们需要把文本转换成数字向量。 - -### 2.2 分词 - -**原理**:把连续的中文文本切成离散的词。 - -```python -# 示例 -文本: "酒店服务很好" -分词: ["酒店", "服务", "很好"] -``` - -本项目使用 `jieba` 库进行分词: - -```python -import jieba - -text = "酒店服务很好" -words = jieba.lcut(text) -print(words) # ['酒店', '服务', '很好'] -``` - -**注意**:过滤掉单字(如"的"、"了"),因为信息量太少。 - -```python -words = [w for w in words if len(w) > 1] # 过滤单字 -``` - -### 2.3 构建词表 - -**原理**:把所有评论中的词收集起来,编上序号。 - -```python -# 词表示例 -{ - "酒店": 0, - "服务": 1, - "很好": 2, - "房间": 3, - ... -} -``` - -词表大小由 `MAX_FEATURES` 控制(本项目设为3000),只保留出现频率最高的3000个词。 - ---- - -## 3. 向量化方法:BoW 与 TF-IDF - -把分词后的文本转换成数字向量。 - -### 3.1 BoW(词袋模型) - -**原理**:统计每个词出现的次数。 +## 文件结构 ``` -文本: "酒店 服务 很好 服务" -分词: ["酒店", "服务", "很好", "服务"] -词表: {"酒店":0, "服务":1, "很好":2, "不错":3, ...} - -向量: [1, 2, 1, 0, ...] # 酒店出现1次,服务出现2次,很好出现1次 +digit_mlp_class/ +├── main.py # 主程序(训练/评估/对比实验) +├── model_numpy.py # MLP模型(纯NumPy实现) +├── dataset.py # MNIST数据集加载 +├── config.py # 超参数配置 +├── data/ # MNIST数据文件 +│ ├── train-images-idx3-ubyte.gz +│ ├── train-labels-idx1-ubyte.gz +│ ├── t10k-images-idx3-ubyte.gz +│ └── t10k-labels-idx1-ubyte.gz +└── README.md ``` -**代码位置**:`dataset.py` 中的 `BoWVectorizer` 类 - -```python -class BoWVectorizer: - def transform(self, text): - words = tokenize(text) - vec = [0] * MAX_SEQ_LEN - for i, word in enumerate(words[:MAX_SEQ_LEN]): - if word in self.vocab: - vec[i] = 1 # 也可以用词频 tf[word] - return vec -``` - -**问题**:所有词权重相同,导致常见词(如"的"、"是")主导。 - -### 3.2 TF-IDF(词频-逆文档频率) - -**原理**:给每个词赋予重要程度权重。 +## 依赖 ``` -TF(词频) = 词在本文中出现的次数 -IDF(逆文档频率) = log(总文档数 / 包含该词的文档数) - -TF-IDF = TF × IDF +numpy ``` -**直观理解**: -- 一个词在本文中出现越多 → TF越高 → 越重要 -- 一个词在所有文档中越常见 → IDF越低 → 越不重要 +## 使用方法 -``` -例子: -- "酒店":在100篇评论中出现80篇 → IDF = log(100/80) ≈ 0.22 -- "惊喜":在100篇评论中出现5篇 → IDF = log(100/5) ≈ 3.0 +### 1. 下载MNIST数据集 -"惊喜"虽然少见,但信息量大,IDF更高 -``` - -**代码位置**:`dataset.py` 中的 `TFIDFVectorizer` 类 - -```python -class TFIDFVectorizer: - def transform(self, text): - words = tokenize(text) - tf = Counter(words) # 词频 - tf_sum = len(words) - - vec = [0.0] * MAX_SEQ_LEN - for i, word in enumerate(words[:MAX_SEQ_LEN]): - if word in self.vocab: - # TF × IDF - vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0) - return vec -``` - -### 3.3 两种方法对比 - -| 特性 | BoW | TF-IDF | -|-----|-----|--------| -| 公式 | 词频 | TF × IDF | -| 常见词权重 | 相同(偏高) | 降低 | -| 罕见词权重 | 相同(偏低) | 提升 | -| 计算复杂度 | 低 | 稍高 | -| 效果 | 一般 | 通常更好 | - ---- - -## 4. 模型一:逻辑回归(Logistic Regression) - -### 4.1 模型结构 - -最简单的线性分类器: - -``` -输入 [batch, features] - │ - ▼ -线性变换: Z = X @ W + b - │ - ▼ -Softmax → 概率 - │ - ▼ -输出 [batch, 2] # [负面概率, 正面概率] -``` - -### 4.2 线性变换 - -```python -Z = X @ W + b - -# 例子: -# X: [1, 3000] (一个样本,3000维特征) -# W: [3000, 2] (权重矩阵) -# b: [2] (偏置) -# Z: [1, 2] (输出 logits) -``` - -### 4.3 Softmax - -把 logits 转换成概率(和为1): - -```python -def softmax(x): - exp_x = np.exp(x - np.max(x)) # 减最大值防溢出 - return exp_x / np.sum(exp_x, axis=1, keepdims=True) - -# 示例 -logits = [2.0, 1.0] -probs = softmax(logits) -# probs = [0.731, 0.269] -# 解释:正面概率73.1%,负面概率26.9% -``` - -### 4.4 代码实现 - -```python -class LogisticRegression: - def __init__(self, input_size, num_classes=2): - self.W = np.random.randn(input_size, num_classes) * 0.01 - self.b = np.zeros(num_classes) - - def forward(self, X): - z = X @ self.W + self.b - return softmax(z) - - def backward(self, X, y): - # 梯度计算和参数更新 - ... -``` - -### 4.5 参数量 - -``` -W: input_size × num_classes = 3000 × 2 = 6000 -b: num_classes = 2 -总计: 6002 -``` - ---- - -## 5. 模型二:多层感知机(MLP) - -### 5.1 模型结构 - -比逻辑回归多了一层隐藏层和非线性激活: - -``` -输入 [batch, features] - │ - ▼ -线性变换: Z1 = X @ W1 + b1 - │ - ▼ -ReLU激活: A1 = max(0, Z1) - │ - ▼ -线性变换: Z2 = A1 @ W2 + b2 - │ - ▼ -Softmax → 概率 - │ - ▼ -输出 [batch, 2] -``` - -### 5.2 ReLU激活函数 - -```python -def relu(x): - return np.maximum(0, x) - -# 示例 -relu([1, -2, 3, -1]) = [1, 0, 3, 0] -``` - -**作用**:引入非线性,让模型能学习复杂模式。 - -### 5.3 参数量 - -``` -W1: input_size × hidden = 3000 × 64 = 192000 -b1: hidden = 64 -W2: hidden × num_classes = 64 × 2 = 128 -b2: num_classes = 2 -总计: 192194 -``` - -### 5.4 与视觉CNN的类比 - -| 视觉(全连接) | 文本 | -|--------------|------| -| 输入: 784维像素 | 输入: 3000维词向量 | -| 隐藏层: 128神经元 | 隐藏层: 64神经元 | -| 输出: 10类数字 | 输出: 2类情感 | -| ReLU + Softmax | ReLU + Softmax | - ---- - -## 6. 训练过程:梯度下降与反向传播 - -### 6.1 训练流程 - -``` -for epoch in 轮数: - for batch in 数据: - 1. 前向传播: 计算输出概率 - 2. 计算损失: CrossEntropy(probs, labels) - 3. 反向传播: 计算梯度 - 4. 更新参数: W = W - lr × 梯度 -``` - -### 6.2 损失函数:交叉熵 - -```python -def cross_entropy_loss(probs, y): - # probs: 预测概率 - # y: 真实标签 - loss = -np.log(probs[y]) # 正确类的概率越大,损失越小 - return loss -``` - -### 6.3 梯度下降 - -```python -# 简单示例:单参数 -loss = f(w) # 损失是参数的函数 -gradient = (loss(w + epsilon) - loss(w)) / epsilon # 数值梯度 - -# 解析梯度 -w = w - learning_rate * gradient -``` - -### 6.4 反向传播(BP) - -链式法则,从后往前计算梯度: - -``` -损失 → Softmax → 线性变换 → ReLU → 线性变换 → 输入 - ↓ -链式求导 - ↓ -各层梯度 = 损失对各层参数的偏导 -``` - -### 6.5 训练日志解读 - -``` -Epoch 20/100 | Loss: 0.5844 | 训练准确率: 0.6851 | 测试准确率: 0.6864 - │ │ │ │ - │ │ │ └─ 测试集上的表现 - │ │ └─ 训练集上的表现 - │ └─ 损失值(越小越好) - └─ 当前轮数/总轮数 -``` - ---- - -## 7. 数据不平衡问题与解决 - -### 7.1 问题 - -本数据集正负比例约 7:3,模型可能"偷懒": - -| 策略 | 结果 | 准确率 | -|-----|------|--------| -| 不使用技巧,总是预测正面 | 简单但无效 | 68.5%(假高分) | -| 使用类别权重,认真学习 | 难但有效 | 46%(真学习) | - -### 7.2 类别权重 - -**原理**:给少数类更高的权重,让模型更"怕"漏判少数类。 - -```python -# 计算权重 -n_samples = 7765 # 总样本数 -n_pos = 5322 # 正面样本数 -n_neg = 2443 # 负面样本数 - -weight_pos = n_samples / (2 * n_pos) = 0.73 # 正面权重(样本多,权重小) -weight_neg = n_samples / (2 * n_neg) = 1.59 # 负面权重(样本少,权重大) - -# 梯度更新时 -d_z[y] -= class_weight[y] # 负面样本的梯度更大 -``` - -### 7.3 开关配置 - -在 `config.py` 中: - -```python -USE_CLASS_WEIGHT = True # 开启类别权重 -USE_CLASS_WEIGHT = False # 关闭(总是预测正面) -``` - -### 7.4 实验对比 - -| 配置 | 测试准确率 | 预测分布 | 说明 | -|-----|----------|---------|------| -| 关闭权重 | 68.6% | 全预测正面 | 模型偷懒 | -| 开启权重 | 46.4% | 有正有负 | 模型在学习 | - -**结论**:68%准确率是"假"高分,46%是"真"学习。数据不平衡问题没有银弹。 - ---- - -## 8. 实验操作指南 - -### 8.1 安装依赖 +如果 `data/` 目录下没有数据文件,运行: ```bash -pip install numpy jieba +python dataset.py ``` -### 8.2 训练模型 +或手动下载: + +```bash +cd data/ +curl -LO https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz +curl -LO https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz +curl -LO https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz +curl -LO https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz +``` + +### 2. 训练模型 ```bash python main.py ``` -### 8.3 修改配置 - -编辑 `config.py`: - -```python -# 选择模型 -MODEL_TYPE = 'mlp' # 'lr' 或 'mlp' -VECTORIZER_TYPE = 'tfidf' # 'bow' 或 'tfidf' - -# 开关类别权重 -USE_CLASS_WEIGHT = True # 或 False - -# 调整超参数 -NUM_EPOCHS = 100 # 训练轮数 -LEARNING_RATE = 0.05 # 学习率 -HIDDEN_SIZE = 64 # MLP隐藏层大小 -``` - -### 8.4 运行对比实验 - -```python -RUN_COMPARISON = True # 开启 -``` - -会自动进行: -1. BoW vs TF-IDF 对比 -2. LR vs MLP 对比 -3. 学习率对比 -4. 隐藏层大小对比 - -### 8.5 训练输出示例 - -``` -============================================================ -训练配置: - 模型: MLP - 向量: TF-IDF - 学习率: 0.05 - 隐藏层大小: 64 - 训练轮数: 100 -============================================================ - 类别权重: 正面=0.73, 负面=1.59 -MLP: 100 -> 64 -> 2, 参数量: 6594 -Epoch 20/100 | Loss: 0.6694 | 训练准确率: 0.4598 | 测试准确率: 0.4662 -... -最终结果: - 训练准确率: 0.4596 - 测试准确率: 0.4668 - 训练时间: 2.95秒 -模型已保存: model_mlp_tfidf_weighted_0427_212802_*.npy -``` - ---- - -## 9. 预测新文本 - -### 9.1 使用方法 +### 3. 运行对比实验 ```bash -python predict.py +python main.py --compare ``` -### 9.2 操作流程 +## 代码设计 -``` -1. 程序列出已保存的模型 -2. 输入编号选择模型 -3. 输入评论文本 -4. 查看预测结果 +### model_numpy.py - MLP模型 + +核心实现: +- **前向传播**: 矩阵乘法 + ReLU + Softmax +- **反向传播**: 手动梯度计算 + 梯度下降 +- **权重初始化**: Xavier初始化(适合ReLU) + +```python +class MLP: + def __init__(self, input_size=784, hidden_size=128, num_classes=10) + def forward(self, X): # 前向传播 + def backward(self, X, y): # 反向传播 + def fit(self, X, y): # 训练 + def predict(self, X): # 预测 ``` -### 9.3 示例 +### dataset.py - 数据加载 + +- 自动检测 `data/` 目录下的MNIST文件 +- 解析IDX格式(MNIST标准格式) +- 归一化像素值到 [0, 1] +- 支持One-Hot编码标签 + +### main.py - 主程序 + +两种运行模式: +1. **默认模式**: 训练一个模型并评估 +2. **对比模式** (`--compare`): 对比不同超参数的效果 + +## 数学原理 + +### 前向传播 ``` -请选择模型编号 (1-1): 1 +z1 = X @ W1 + b1 # 第一层线性变换 +a1 = ReLU(z1) # 第一层激活 -请输入评论文本: 酒店服务很好,环境也不错 -预测结果: 正面 -置信度: 99.7% -详细: 正面概率=99.7%, 负面概率=0.3% - -请输入评论文本: 房间太小,卫生很差 -预测结果: 负面 -置信度: 85.2% -详细: 正面概率=14.8%, 负面概率=85.2% +z2 = a1 @ W2 + b2 # 第二层线性变换 +probs = softmax(z2) # 输出概率 ``` -### 9.4 权重文件命名 - -每次训练生成唯一的文件名: +### 反向传播 ``` -model_mlp_tfidf_weighted_0427_212802_W1.npy -model_mlp_tfidf_weighted_0427_212802_b1.npy -model_mlp_tfidf_weighted_0427_212802_W2.npy -model_mlp_tfidf_weighted_0427_212802_b2.npy +d_z2 = probs - y # 输出层梯度 +d_W2 = a1.T @ d_z2 # 第二层权重梯度 +d_z1 = d_z2 @ W2.T * relu_derivative(z1) # 隐藏层梯度 +d_W1 = X.T @ d_z1 # 第一层权重梯度 + +W1 -= lr * d_W1 / batch_size # 梯度下降更新 +W2 -= lr * d_W2 / batch_size ``` -文件名包含:模型类型、向量类型、权重开关、时间戳 - ---- - -## 10. 思考题 - -1. **向量化**:为什么TF-IDF通常比BoW效果好? -2. **模型复杂度**:MLP比LR多了一层,带来的优势是什么? -3. **数据不平衡**:68%准确率一定好吗?有什么陷阱? -4. **类别权重**:开启后准确率反而下降,这说明什么? -5. **调参实践**:学习率过大会怎样?隐藏层太小会怎样? - ---- - -## 附录:完整代码流程图 +### 激活函数 +**ReLU**: ``` - ┌─────────────┐ - │ config.py │ - │ (超参数) │ - └──────┬──────┘ - │ - ▼ -┌─────────────────────────────────────────┐ -│ dataset.py │ -│ ┌───────────┐ ┌──────────────────┐ │ -│ │ 下载数据 │───▶│ TF-IDF/BoW向量化 │ │ -│ └───────────┘ └────────┬─────────┘ │ -│ │ │ -└────────────────────────────┼────────────┘ - ▼ - ┌────────────────┐ - │ 特征向量 X │ - │ 标签 y │ - └────────┬───────┘ - │ - ▼ -┌─────────────────────────────────────────┐ -│ model_numpy.py │ -│ ┌───────────────────────────────────┐ │ -│ │ LogisticRegression / MLP │ │ -│ │ - forward(): 前向传播 │ │ -│ │ - backward(): 反向传播 │ │ -│ │ - fit(): 训练循环 │ │ -│ └───────────────────────────────────┘ │ -└────────────────────────────┬────────────┘ - │ - ▼ - ┌────────────────┐ - │ 保存权重 │ - │ model_*.npy │ - └────────┬───────┘ - │ - ▼ - ┌────────────────┐ - │ predict.py │ - │ (加载预测) │ - └───────────────┘ +ReLU(x) = max(0, x) +ReLU'(x) = 1 if x > 0 else 0 ``` + +**Softmax**: +``` +softmax(x_i) = exp(x_i) / sum(exp(x_j)) +``` + +## 超参数 + +| 参数 | 默认值 | 说明 | +|------|--------|------| +| hidden_size | 128 | 隐藏层神经元数量 | +| learning_rate | 0.1 | 学习率 | +| epochs | 50 | 训练轮数 | +| batch_size | 64 | 批大小 | +| seed | 42 | 随机种子 | + +## 预期结果 + +- 训练准确率: ~98% +- 测试准确率: ~95-97% + +训练时间: 约 5-10 分钟(取决于硬件) + +## 扩展实验 + +1. **改变隐藏层大小**: 32 / 64 / 128 / 256 +2. **改变学习率**: 0.01 / 0.1 / 0.5 +3. **添加Dropout**: 防止过拟合 +4. **增加隐藏层数**: 784 → 256 → 128 → 10 + +## 教学用途 + +本项目适合用于讲解: +- 神经网络基本结构 +- 前向传播与反向传播原理 +- 梯度下降优化 +- NumPy矩阵操作 +- MNIST数据集处理 \ No newline at end of file diff --git a/config.py b/config.py index 9292e49..9c59379 100644 --- a/config.py +++ b/config.py @@ -1,40 +1,39 @@ # -*- coding: utf-8 -*- """ -配置文件 - 所有超参数集中管理 +手写数字识别 - 超参数配置 -设计思路: -将超参数分门别类,学生可以单独修改某一类而不会影响其他 +纯NumPy实现的两层全连接神经网络 """ -# ==================== 数据相关 ==================== -DATA_DIR = 'data/ChnSentiCorp' # 数据集路径 -MAX_FEATURES = 3000 # 词表最大容量 -MAX_SEQ_LEN = 100 # 句子最大长度(词数) -VECTORIZER_TYPE = 'tfidf' # 'tfidf' 或 'bow'(向量化方式) +# ===== 数据参数 ===== +ONE_HOT = True # 标签是否使用One-Hot编码 -# ==================== 模型相关 ==================== -MODEL_TYPE = 'lr' # 'mlp' 或 'lr'(模型类型) -HIDDEN_SIZE = 64 # MLP隐藏层大小(LR忽略) -NUM_CLASSES = 2 # 类别数(正面/负面二分类) -KEEP_PROB = 1.0 # Dropout保留概——0.06率(LR忽略,设为1即可) +# ===== 模型结构 ===== +INPUT_SIZE = 784 # 28x28 = 784 像素 +HIDDEN_SIZE = 128 # 隐藏层神经元数量 +NUM_CLASSES = 10 # 0-9 十个数字 +KEEP_PROB = 1.0 # Dropout保留比例(1.0=不使用Dropout) -# ==================== 训练相关 ==================== -LEARNING_RATE = 0.08 # 学习率 -NUM_EPOCHS = 100 # 训练轮数 -BATCH_SIZE = 64 # 批次大小 +# ===== 训练参数 ===== +LEARNING_RATE = 0.1 # 学习率 +NUM_EPOCHS = 50 # 训练轮数 +BATCH_SIZE = 64 # 批大小 -# ==================== 类别权重(解决数据不平衡问题)==================== -USE_CLASS_WEIGHT = True # True=启用类别权重, False=不启用(对比用) -# 权重计算公式: n_samples / (n_classes * n_class_i) -# 正面评论多所以权重小,负面评论少所以权重大 -CLASS_WEIGHT_POS = 0.73 # 正面类权重(自动计算) -CLASS_WEIGHT_NEG = 1.58 # 负面类权重(自动计算) +# ===== 随机种子(保证可复现) ===== +SEED = 42 -# ==================== 实验相关 ==================== -RUN_COMPARISON = False # True=运行对比实验, False=运行单个模型 -COMPARE_MODELS = ['lr', 'mlp'] # 要对比的模型列表 -COMPARE_VECTORS = ['bow', 'tfidf'] # 要对比的向量化方式 +# ===== 实验配置 ===== +RUN_COMPARISON = False # 是否运行对比实验 -# ==================== 其他 ==================== -RANDOM_SEED = 42 # 随机种子(保证可复现) -VERBOSE = True # 打印详细日志 +# ===== 依赖说明 ===== +# 本项目需要以下库: +# numpy - 数值计算 +# scikit-learn - 加载MNIST数据集(会自动下载) +# pandas - sklearn的依赖 +# +# 安装命令: +# pip install numpy scikit-learn pandas +# +# 数据说明: +# 首次运行时会自动从OpenML下载MNIST数据集(约12MB) +# 下载后会自动缓存,后续运行直接使用缓存数据 \ No newline at end of file diff --git a/dataset.py b/dataset.py index e554362..2d0867e 100644 --- a/dataset.py +++ b/dataset.py @@ -1,286 +1,179 @@ # -*- coding: utf-8 -*- """ -数据加载与向量化模块 +数据集模块 - MNIST手写数字数据集加载 -支持两种向量化方法: -1. BoW (Bag of Words) - 词频向量 -2. TF-IDF - 词频-逆文档频率向量 - -TF-IDF 的优势: -- 降低常见词(如"的"、"是")的权重 -- 提升罕见词的信息量 -- 通常效果优于简单BoW +优先从本地data/目录加载,如果文件不存在则从sklearn下载 +支持两种格式:.gz(官方格式)和 .zip(某些下载源) """ import os -import re -import csv -import math -import jieba +import struct +import gzip +import zipfile import numpy as np -from collections import Counter - -try: - import urllib.request - import ssl - DOWNLOAD_AVAILABLE = True -except ImportError: - DOWNLOAD_AVAILABLE = False +from config import * -DATASET_URL = "https://raw.githubusercontent.com/SophonPlus/ChineseNlpCorpus/master/datasets/ChnSentiCorp_htl_all/ChnSentiCorp_htl_all.csv" - - -def download_dataset(data_dir): - """下载数据集(如果不存在)""" - csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') +def local_files_exist(): + """检查本地数据文件是否存在且完整""" + data_dir = os.path.join(os.path.dirname(__file__), 'data') - if os.path.exists(csv_path): - print(f"数据已存在: {csv_path}") - return True + # 支持 .gz 和 .zip 格式(MNIST官方用.gz,但有些下载是zip) + files = { + 'train-images-idx3-ubyte': {'gz': 9912422, 'zip': 9187390}, + 'train-labels-idx1-ubyte': {'gz': 28881, 'zip': 28405}, + 't10k-images-idx3-ubyte': {'gz': 1648877, 'zip': 1534055}, + 't10k-labels-idx1-ubyte': {'gz': 5148, 'zip': 4563}, + } - if not DOWNLOAD_AVAILABLE: - return False + found_files = {} + missing = [] - print("正在下载数据集...") - ssl_context = ssl.create_default_context() - ssl_context.check_hostname = False - ssl_context.verify_mode = ssl.CERT_NONE - - try: - request = urllib.request.Request(DATASET_URL, headers={'User-Agent': 'Mozilla/5.0'}) - response = urllib.request.urlopen(request, timeout=120, context=ssl_context) - os.makedirs(data_dir, exist_ok=True) - with open(csv_path, 'wb') as f: - f.write(response.read()) - print(f"下载完成: {csv_path}") - return True - except Exception as e: - print(f"下载失败: {e}") - return False - - -def load_raw_data(data_dir): - """加载原始数据""" - csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') - texts, labels = [], [] - - with open(csv_path, 'r', encoding='utf-8') as f: - reader = csv.reader(f) - for row in reader: - if len(row) < 2: - continue - try: - label = int(row[0]) - review = row[1].strip() - if review: - texts.append(review) - labels.append(label) - except (ValueError, IndexError): - continue - - return texts, np.array(labels) - - -def tokenize(text): - """中文分词""" - text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text) - words = jieba.lcut(text) - return [w for w in words if len(w) > 1] - - -# ==================== 向量化器 ==================== - -class BaseVectorizer: - """向量化器基类""" - def fit(self, texts): pass - def transform(self, texts): pass - def fit_transform(self, texts): pass - - -class BoWVectorizer(BaseVectorizer): - """ - 词袋模型 (Bag of Words) - - 原理:统计每个词在文本中出现的次数 - 向量维度 = 词表大小 - 每个维度 = 该词在本文本中出现的次数 - """ - - def __init__(self, max_features, max_seq_len): - self.max_features = max_features - self.max_seq_len = max_seq_len - self.vocab = {} - self.doc_freq = {} # 文档频率 - self.num_docs = 0 - - def fit(self, texts): - """构建词表(基于词频)""" - counter = Counter() - doc_counter = Counter() # 统计包含该词的文档数 + for base_name, sizes in files.items(): + gz_path = os.path.join(data_dir, base_name + '.gz') + zip_path = os.path.join(data_dir, base_name + '.zip') - for text in texts: - words = tokenize(text) - unique_words = set(words) - counter.update(words) - for w in unique_words: - doc_counter[w] += 1 - - self.num_docs = len(texts) - - # 取最高频的词 - most_common = counter.most_common(self.max_features) - self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)} - - # 记录文档频率(用于TF-IDF) - self.doc_freq = {w: doc_counter[w] for w in self.vocab} - - print(f" BoW词表大小: {len(self.vocab)}") - return self + if os.path.exists(gz_path): + found_files[base_name] = (gz_path, sizes['gz'], 'gz') + elif os.path.exists(zip_path): + found_files[base_name] = (zip_path, sizes['zip'], 'zip') + else: + missing.append(base_name) - def transform(self, texts): - """将文本转换为词频向量""" - vectors = [] - for text in texts: - words = tokenize(text) - freq = [0] * self.max_seq_len - for i, word in enumerate(words[:self.max_seq_len]): - if word in self.vocab: - freq[i] = 1 # 二值(出现=1,不出现=0) - vectors.append(freq) - return np.array(vectors, dtype=np.float32) + if missing: + return False, f"文件不存在: {', '.join(missing)}" - def fit_transform(self, texts): - self.fit(texts) - return self.transform(texts) + # 检查大小是否正确 + for base_name, (filepath, expected_size, fmt) in found_files.items(): + actual_size = os.path.getsize(filepath) + if actual_size != expected_size: + return False, f"文件大小错误: {base_name} (期望{expected_size}, 实际{actual_size})" + + return True, "所有文件完整" -class TFIDFVectorizer(BaseVectorizer): - """ - TF-IDF 向量器 - - 原理: - - TF(词频) = 词在本文本中出现的次数 - - IDF(逆文档频率) = log(总文档数 / 包含该词的文档数) - - TF-IDF = TF × IDF - - 优势: - - 降低常见无意义词的权重(如"的"、"是") - - 提升罕见但有信息量的词 - """ - - def __init__(self, max_features, max_seq_len): - self.max_features = max_features - self.max_seq_len = max_seq_len - self.vocab = {} - self.idf = {} # 存储每个词的IDF值 - self.num_docs = 0 - - def fit(self, texts): - """构建词表并计算IDF""" - counter = Counter() - doc_counter = Counter() - - for text in texts: - words = tokenize(text) - unique_words = set(words) - counter.update(words) - for w in unique_words: - doc_counter[w] += 1 - - self.num_docs = len(texts) - - # 计算每个词的IDF - # IDF = log(总文档数 / 包含该词的文档数) - idf_values = {} - for word, df in doc_counter.items(): - idf_values[word] = math.log(self.num_docs / (df + 1)) + 1 # 加1防零 - - # 取IDF值最高的词(信息量最大的词) - sorted_words = sorted(idf_values.items(), key=lambda x: x[1], reverse=True) - self.vocab = {word: idx for idx, (word, _) in enumerate(sorted_words[:self.max_features])} - - # 保存IDF值 - self.idf = {word: idf_values[word] for word in self.vocab} - - print(f" TF-IDF词表大小: {len(self.vocab)}") - print(f" 平均IDF: {np.mean(list(self.idf.values())):.3f}") - return self - - def transform(self, texts): - """将文本转换为TF-IDF向量""" - vectors = [] - for text in texts: - words = tokenize(text) - - # 计算TF - tf = Counter(words) - tf_sum = len(words) if words else 1 - - # 生成向量 - vec = [0.0] * self.max_seq_len - for i, word in enumerate(words[:self.max_seq_len]): - if word in self.vocab: - # TF × IDF - vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0) - vectors.append(vec) - - return np.array(vectors, dtype=np.float32) - - def fit_transform(self, texts): - self.fit(texts) - return self.transform(texts) - - -def load_data(data_dir, max_features, max_seq_len, vectorizer_type='tfidf'): - """ - 加载并向量化数据 - - 参数: - - vectorizer_type: 'tfidf' 或 'bow' - """ - if not download_dataset(data_dir): - raise RuntimeError("数据加载失败,请检查网络或手动下载数据集") - - print("正在加载数据...") - texts, labels = load_raw_data(data_dir) - print(f"总评论数: {len(texts)}, 正面: {sum(labels)}, 负面: {len(labels) - sum(labels)}") - - # 选择向量化器 - if vectorizer_type == 'tfidf': - vectorizer = TFIDFVectorizer(max_features, max_seq_len) - vec_name = "TF-IDF" +def parse_idx_images(filepath): + """解析IDX格式图像(支持.gz和.zip)""" + if filepath.endswith('.zip'): + with zipfile.ZipFile(filepath, 'r') as zf: + # zip内的文件名没有.gz后缀 + inner_name = zf.namelist()[0] + with zf.open(inner_name) as f: + magic, num, rows, cols = struct.unpack('>IIII', f.read(16)) + images = np.frombuffer(f.read(), dtype=np.uint8) + images = images.reshape(num, rows * cols) + return images else: - vectorizer = BoWVectorizer(max_features, max_seq_len) - vec_name = "BoW" + with gzip.open(filepath, 'rb') as f: + magic, num, rows, cols = struct.unpack('>IIII', f.read(16)) + images = np.frombuffer(f.read(), dtype=np.uint8) + images = images.reshape(num, rows * cols) + return images + + +def parse_idx_labels(filepath): + """解析IDX格式标签(支持.gz和.zip)""" + if filepath.endswith('.zip'): + with zipfile.ZipFile(filepath, 'r') as zf: + # zip内的文件名没有.gz后缀 + inner_name = zf.namelist()[0] + with zf.open(inner_name) as f: + magic, num = struct.unpack('>II', f.read(8)) + labels = np.frombuffer(f.read(), dtype=np.uint8) + return labels + else: + with gzip.open(filepath, 'rb') as f: + magic, num = struct.unpack('>II', f.read(8)) + labels = np.frombuffer(f.read(), dtype=np.uint8) + return labels + + +def load_data_from_local(): + """从本地文件加载MNIST(自动检测.gz或.zip格式)""" + data_dir = os.path.join(os.path.dirname(__file__), 'data') - print(f"正在使用{vec_name}向量化...") - X = vectorizer.fit_transform(texts) - y = labels + def find_file(base_name): + """自动找文件,支持.gz和.zip""" + gz_path = os.path.join(data_dir, base_name + '.gz') + zip_path = os.path.join(data_dir, base_name + '.zip') + if os.path.exists(gz_path): + return gz_path + elif os.path.exists(zip_path): + return zip_path + else: + raise FileNotFoundError(f"找不到 {base_name} 的 .gz 或 .zip 文件") - # 打乱并划分 - np.random.seed(42) - indices = np.random.permutation(len(X)) - X = X[indices] - y = y[indices] + X_train = parse_idx_images(find_file('train-images-idx3-ubyte')) + y_train = parse_idx_labels(find_file('train-labels-idx1-ubyte')) + X_test = parse_idx_images(find_file('t10k-images-idx3-ubyte')) + y_test = parse_idx_labels(find_file('t10k-labels-idx1-ubyte')) - split_idx = int(len(X) * 0.8) - X_train, X_test = X[:split_idx], X[split_idx:] - y_train, y_test = y[:split_idx], y[split_idx:] + return X_train, y_train, X_test, y_test + + +def load_data_from_sklearn(): + """从sklearn加载MNIST(备选方案)""" + from sklearn.datasets import fetch_openml - print(f"训练集: {len(X_train)}条, 测试集: {len(X_test)}条") + print(" 正在从OpenML下载数据(首次可能需要1-2分钟)...") - return X_train, y_train, X_test, y_test, vectorizer + mnist = fetch_openml('mnist_784', version=1, as_frame=False, parser='auto') + X = mnist.data.astype(np.float32) + y = mnist.target.astype(int) + + X_train = X[:60000] / 255.0 + X_test = X[60000:] / 255.0 + y_train = y[:60000] + y_test = y[60000:] + + return X_train, y_train, X_test, y_test + + +def one_hot_encode(y, num_classes=10): + one_hot = np.zeros((len(y), num_classes)) + one_hot[np.arange(len(y)), y] = 1 + return one_hot + + +def load_data(): + """ + 加载MNIST数据集 + + 优先从本地data/目录加载,如果文件不完整则从sklearn下载 + """ + print("\n" + "=" * 50) + print("MNIST 数据集加载") + print("=" * 50) + + # 优先检查本地文件 + exists, msg = local_files_exist() + if exists: + print(f"\n ✓ 发现本地数据文件: {msg}") + X_train, y_train, X_test, y_test = load_data_from_local() + else: + print(f"\n 本地文件: {msg}") + print(" 尝试从sklearn下载...") + try: + X_train, y_train, X_test, y_test = load_data_from_sklearn() + except Exception as e: + print(f"\n 下载失败: {e}") + print("\n 请确保 data/ 目录下有完整的4个数据文件!") + raise + + # 归一化和One-Hot + X_train = X_train.astype(np.float32) / 255.0 + X_test = X_test.astype(np.float32) / 255.0 + y_train = one_hot_encode(y_train, NUM_CLASSES) + y_test = one_hot_encode(y_test, NUM_CLASSES) + + print(f"\n ✓ 完成!") + print(f" 训练集: {X_train.shape[0]} 样本") + print(f" 测试集: {X_test.shape[0]} 样本") + print(f" 数值范围: [{X_train.min():.2f}, {X_train.max():.2f}]") + + return X_train, y_train, X_test, y_test if __name__ == '__main__': - # 测试 - print("=" * 60) - print("测试 TF-IDF 向量化") - print("=" * 60) - X_train, y_train, X_test, y_test, vec = load_data( - 'data/ChnSentiCorp', max_features=3000, max_seq_len=100, - vectorizer_type='tfidf' - ) - print(f"\nX_train shape: {X_train.shape}") - print(f"X_train sample (前5个特征): {X_train[0][:5]}") + X_train, y_train, X_test, y_test = load_data() + print(f"\n训练数据: {X_train.shape}") \ No newline at end of file diff --git a/main.py b/main.py index eaeaadc..9d0fecf 100644 --- a/main.py +++ b/main.py @@ -1,34 +1,191 @@ # -*- coding: utf-8 -*- """ -主程序入口 +主程序 - 手写数字识别 MLP 纯NumPy实现 -使用方式: +使用方法: + python main.py # 运行默认配置 + python main.py --compare # 运行对比实验 -1. 运行单个模型(默认): - python main.py - - 修改 config.py 中的 MODEL_TYPE 和 VECTORIZER_TYPE 来切换配置 - -2. 运行对比实验: - 修改 config.py 中 RUN_COMPARISON = True - - 这会依次运行: - - 实验1: BoW vs TF-IDF (固定LR模型) - - 实验2: LR vs MLP (固定TF-IDF) - - 实验3: 不同学习率对比 - - 实验4: 不同隐藏层大小对比 - - 最后输出汇总报告 +依赖: + pip install numpy requests """ -from train import main +import numpy as np +import time +from datetime import datetime +from model_numpy import MLP +from dataset import load_data +from config import * + + +def train_and_evaluate(): + """ + 训练并评估模型 + """ + print("=" * 60) + print("手写数字识别 - 纯NumPy MLP实现") + print("=" * 60) + + # ===== 加载数据 ===== + try: + X_train, y_train, X_test, y_test = load_data() + except Exception as e: + print(f"\n错误: {e}") + print("\n请手动下载数据文件:") + print(" 1. 创建 data/ 目录") + print(" 2. 下载以下文件到 data/:") + print(" - train-images-idx3-ubyte.gz (9.9 MB)") + print(" - train-labels-idx1-ubyte.gz (28 KB)") + print(" - t10k-images-idx3-ubyte.gz (1.6 MB)") + print(" - t10k-labels-idx1-ubyte.gz (5 KB)") + print(" 下载地址: https://storage.googleapis.com/tensorflow/tf-keras-datasets/") + return None, None, None + + # ===== 创建模型 ===== + print("\n[2] 创建MLP模型...") + model = MLP( + input_size=INPUT_SIZE, + hidden_size=HIDDEN_SIZE, + num_classes=NUM_CLASSES, + learning_rate=LEARNING_RATE, + seed=SEED + ) + + # ===== 训练模型 ===== + print("\n[3] 开始训练...") + start_time = time.time() + + model.fit( + X_train, y_train, + X_val=X_test, y_val=y_test, + epochs=NUM_EPOCHS, + batch_size=BATCH_SIZE, + verbose=True + ) + + train_time = time.time() - start_time + + # ===== 最终评估 ===== + print("\n" + "=" * 60) + print("训练完成!") + print("=" * 60) + + train_acc = model.accuracy(X_train, y_train) + test_acc = model.accuracy(X_test, y_test) + + print(f"\n最终结果:") + print(f" 训练准确率: {train_acc:.4f} ({train_acc*100:.2f}%)") + print(f" 测试准确率: {test_acc:.4f} ({test_acc*100:.2f}%)") + print(f" 训练时间: {train_time:.2f} 秒") + + # ===== 保存模型 ===== + timestamp = datetime.now().strftime("%m%d_%H%M%S") + model_path = f"mnist_mlp_{timestamp}" + model.save(model_path) + + # ===== 预测示例 ===== + print("\n[4] 预测示例:") + indices = np.random.choice(len(X_test), 5, replace=False) + + for i, idx in enumerate(indices): + img = X_test[idx] + true_label = np.argmax(y_test[idx]) + pred_label = model.predict(img.reshape(1, -1))[0] + prob = model.predict_proba(img.reshape(1, -1))[0] + + status = '✓' if true_label == pred_label else '✗' + print(f" 样本{i+1}: 真实={true_label}, 预测={pred_label}, " + f"置信度={prob[pred_label]:.2f} {status}") + + return model, train_acc, test_acc + + +def run_comparison(): + """ + 运行对比实验 + """ + print("\n" + "=" * 60) + print("超参数对比实验") + print("=" * 60) + + # 加载数据 + try: + X_train, y_train, X_test, y_test = load_data() + except Exception as e: + print(f"加载数据失败: {e}") + return + + # 实验配置 + experiments = [ + {"hidden_size": 32, "lr": 0.1, "name": "小模型(32神经元)"}, + {"hidden_size": 128, "lr": 0.1, "name": "标准(128神经元)"}, + {"hidden_size": 256, "lr": 0.1, "name": "大模型(256神经元)"}, + {"hidden_size": 128, "lr": 0.01, "name": "小学习率(0.01)"}, + {"hidden_size": 128, "lr": 0.5, "name": "大学习率(0.5)"}, + ] + + results = [] + + for exp in experiments: + print(f"\n实验: {exp['name']}") + print("-" * 40) + + model = MLP( + input_size=INPUT_SIZE, + hidden_size=exp['hidden_size'], + num_classes=NUM_CLASSES, + learning_rate=exp['lr'], + seed=SEED + ) + + start_time = time.time() + model.fit(X_train, y_train, epochs=30, batch_size=BATCH_SIZE, verbose=False) + train_time = time.time() - start_time + + train_acc = model.accuracy(X_train, y_train) + test_acc = model.accuracy(X_test, y_test) + + results.append({ + 'name': exp['name'], + 'hidden_size': exp['hidden_size'], + 'lr': exp['lr'], + 'train_acc': train_acc, + 'test_acc': test_acc, + 'train_time': train_time + }) + + print(f" 训练准确率: {train_acc:.4f} | 测试准确率: {test_acc:.4f} | 时间: {train_time:.1f}s") + + # 汇总 + print("\n" + "=" * 60) + print("实验结果汇总") + print("=" * 60) + print(f"\n{'配置':<25} {'训练准确率':<12} {'测试准确率':<12} {'时间':<8}") + print("-" * 60) + + for r in results: + print(f"{r['name']:<25} {r['train_acc']:<12.4f} {r['test_acc']:<12.4f} {r['train_time']:<8.1f}s") + + best = max(results, key=lambda x: x['test_acc']) + print(f"\n最佳配置: {best['name']}, 测试准确率: {best['test_acc']:.4f}") + + +def main(): + """主函数""" + if RUN_COMPARISON: + run_comparison() + else: + train_and_evaluate() + + print("\n" + "=" * 60) + print("程序结束!") + print("=" * 60) + if __name__ == '__main__': - print("\n" + "=" * 70) - print("文本分类实验 - 纯NumPy实现") - print("数据集: ChnSentiCorp (中文酒店评论)") - print("模型: Logistic Regression / MLP") - print("向量化: BoW / TF-IDF") - print("=" * 70 + "\n") + import sys - main() + if '--compare' in sys.argv: + RUN_COMPARISON = True + + main() \ No newline at end of file diff --git a/model_numpy.py b/model_numpy.py index e8d7adf..df78f06 100644 --- a/model_numpy.py +++ b/model_numpy.py @@ -1,315 +1,261 @@ # -*- coding: utf-8 -*- """ -模型模块 - 纯NumPy实现 +模型模块 - 纯NumPy实现手写数字识别MLP -支持两种模型: -1. Logistic Regression(逻辑回归)- 线性模型 -2. MLP(多层感知机)- 两层全连接网络 +网络结构: 784 → 128 → 10 +- 输入层: 784 像素值 (28x28 展平) +- 隐藏层: 128 神经元 + ReLU激活 +- 输出层: 10 数字 (0-9) + Softmax -设计思路: -- 两种模型都共享相同的接口,方便对比 -- 代码简洁,每行都有详细注释 -- 手动实现反向传播,原理透明 +纯NumPy实现,无任何深度学习框架依赖 +只需: numpy """ import numpy as np -class BaseModel: - """模型基类""" - def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): pass - def predict(self, X): pass - def predict_proba(self, X): pass - def accuracy(self, X, y): pass - - -class LogisticRegression(BaseModel): +class MLP: """ - 逻辑回归(线性分类器) - - 结构:输入 → 线性变换 → Softmax → 输出 - - 原理: - - 线性变换: z = X @ W + b - - Softmax: 将线性输出转为概率分布 - - 参数量:input_size × num_classes + num_classes - """ - - def __init__(self, input_size, num_classes=2, learning_rate=0.1, - class_weight=None, seed=42): - np.random.seed(seed) - - # 权重初始化(Xavier) - self.W = np.random.randn(input_size, num_classes) * np.sqrt(2.0 / input_size) - self.b = np.zeros(num_classes) - - self.lr = learning_rate - self.input_size = input_size - self.num_classes = num_classes - self.class_weight = class_weight # 类别权重 - - total_params = input_size * num_classes + num_classes - print(f"LogisticRegression: {input_size} -> {num_classes}, 参数量: {total_params}") - - def softmax(self, x): - """Softmax函数""" - x_shifted = x - np.max(x, axis=1, keepdims=True) - exp_x = np.exp(x_shifted) - return exp_x / np.sum(exp_x, axis=1, keepdims=True) - - def forward(self, X): - """前向传播""" - # 线性变换 - z = X @ self.W + self.b - # Softmax输出概率 - return self.softmax(z) - - def backward(self, X, y): - """反向传播(梯度下降)""" - batch_size = X.shape[0] - probs = self.forward(X) - - # Softmax + 交叉熵梯度 - d_z = probs.copy() - - # 应用类别权重:减去权重值而不是1 - # 公式: dL/dz_y = w_y * (p_y - 1) = w_y*p_y - w_y - if self.class_weight is not None: - for i in range(batch_size): - d_z[i, y[i]] -= self.class_weight[y[i]] - else: - d_z[np.arange(batch_size), y] -= 1 - - # 梯度 - d_W = X.T @ d_z - d_b = np.sum(d_z, axis=0) - - # 更新 - self.W -= self.lr * d_W / batch_size - self.b -= self.lr * d_b / batch_size - - def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): - """训练""" - num_samples = len(X) - num_batches = (num_samples + batch_size - 1) // batch_size - - for epoch in range(epochs): - # 打乱 - indices = np.random.permutation(num_samples) - X_shuffled = X[indices] - y_shuffled = y[indices] - - epoch_loss = 0 - for batch_idx in range(num_batches): - start = batch_idx * batch_size - end = min(start + batch_size, num_samples) - X_batch = X_shuffled[start:end] - y_batch = y_shuffled[start:end] - - # 前向 + 反向 - probs = self.forward(X_batch) - self.backward(X_batch, y_batch) - - # 损失 - loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) - epoch_loss += loss - - # 评估 - if verbose and (epoch + 1) % 20 == 0: - train_acc = self.accuracy(X, y) - msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" - if X_val is not None: - val_acc = self.accuracy(X_val, y_val) - msg += f" | 测试准确率: {val_acc:.4f}" - print(msg) - - return self - - def predict(self, X): - return np.argmax(self.forward(X), axis=1) - - def predict_proba(self, X): - return self.forward(X) - - def accuracy(self, X, y): - return np.mean(self.predict(X) == y) - - def save(self, filepath): - """保存模型权重""" - np.save(filepath + '_W.npy', self.W) - np.save(filepath + '_b.npy', self.b) - print(f"模型已保存: {filepath}") - - @staticmethod - def load(filepath, input_size, num_classes=2, learning_rate=0.1): - """加载模型权重""" - model = LogisticRegression(input_size, num_classes, learning_rate) - model.W = np.load(filepath + '_W.npy') - model.b = np.load(filepath + '_b.npy') - print(f"模型已加载: {filepath}") - return model - - -class MLP(BaseModel): - """ - 多层感知机(神经网络) - - 结构:输入 → 线性变换 → ReLU → 线性变换 → Softmax → 输出 - - 和LogisticRegression的区别: - - 多了一层隐藏层 + 非线性激活 - - 可以学习非线性关系 - - 参数量更大 - + 多层感知机(神经网络) + + 结构: + 输入(784) → 线性变换 → ReLU → 线性变换 → Softmax → 输出(10) + 参数量: - - W1: input_size × hidden_size - - b1: hidden_size - - W2: hidden_size × num_classes - - b2: num_classes + W1: 784 × 128 = 100,352 + b1: 128 + W2: 128 × 10 = 1,280 + b2: 10 + 总计: ~101,770 参数 """ - - def __init__(self, input_size, hidden_size=64, num_classes=2, - learning_rate=0.1, keep_prob=1.0, class_weight=None, seed=42): + + def __init__(self, input_size=784, hidden_size=128, num_classes=10, + learning_rate=0.1, seed=42): np.random.seed(seed) - - # 第一层权重 + + # ===== 第一层: 输入 → 隐藏层 ===== + # 权重: (input_size, hidden_size) + # Xavier初始化,适合ReLU self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size) self.b1 = np.zeros(hidden_size) - - # 第二层权重 + + # ===== 第二层: 隐藏层 → 输出 ===== + # 权重: (hidden_size, num_classes) self.W2 = np.random.randn(hidden_size, num_classes) * np.sqrt(2.0 / hidden_size) self.b2 = np.zeros(num_classes) - + + # 保存超参数 self.lr = learning_rate - self.keep_prob = keep_prob - self.hidden_size = hidden_size self.input_size = input_size + self.hidden_size = hidden_size self.num_classes = num_classes - self.class_weight = class_weight # 类别权重 - - total_params = (input_size * hidden_size + hidden_size + + + # 打印模型信息 + total_params = (input_size * hidden_size + hidden_size + hidden_size * num_classes + num_classes) - print(f"MLP: {input_size} -> {hidden_size} -> {num_classes}, 参数量: {total_params}") + print(f"\n{'='*50}") + print(f"MLP 网络结构:") + print(f" 输入层: {input_size} 神经元") + print(f" 隐藏层: {hidden_size} 神经元 + ReLU") + print(f" 输出层: {num_classes} 神经元 + Softmax") + print(f" 参数量: {total_params:,}") + print(f"{'='*50}") + def relu(self, x): - """ReLU激活""" + """ReLU激活函数: max(0, x)""" return np.maximum(0, x) + def relu_derivative(self, x): - """ReLU导数""" + """ReLU导数: x > 0 时为1,否则为0""" return (x > 0).astype(float) + def softmax(self, x): - """Softmax函数""" + """ + Softmax函数: 将数值转换为概率分布 + + softmax(x_i) = exp(x_i) / sum(exp(x_j)) + + 技巧: 减去最大值避免数值溢出 + """ x_shifted = x - np.max(x, axis=1, keepdims=True) exp_x = np.exp(x_shifted) return exp_x / np.sum(exp_x, axis=1, keepdims=True) + def forward(self, X): - """前向传播""" - # 第一层 - self.z1 = X @ self.W1 + self.b1 - self.a1 = self.relu(self.z1) - - # Dropout(训练时) - if self.keep_prob < 1.0 and hasattr(self, 'training'): - self.d1 = (np.random.rand(*self.a1.shape) < self.keep_prob).astype(float) - self.a1 *= self.d1 - self.a1 /= self.keep_prob - - # 第二层 - self.z2 = self.a1 @ self.W2 + self.b2 - self.probs = self.softmax(self.z2) - + """ + 前向传播 + + Args: + X: (batch_size, 784) 图像像素值 + + Returns: + probs: (batch_size, 10) 每个类的概率 + """ + # ===== 第一层计算 ===== + # z1 = X @ W1 + b1 + # a1 = relu(z1) + self.z1 = X @ self.W1 + self.b1 # (batch, 784) @ (784, 128) = (batch, 128) + self.a1 = self.relu(self.z1) # (batch, 128) + + # ===== 第二层计算 ===== + # z2 = a1 @ W2 + b2 + # probs = softmax(z2) + self.z2 = self.a1 @ self.W2 + self.b2 # (batch, 128) @ (128, 10) = (batch, 10) + self.probs = self.softmax(self.z2) # (batch, 10) + return self.probs + def backward(self, X, y): - """反向传播""" + """ + 反向传播(梯度下降) + + Args: + X: (batch_size, 784) 图像 + y: (batch_size, 10) One-Hot标签 + """ batch_size = X.shape[0] - # 输出层梯度 - d_z2 = self.probs.copy() + # ===== 输出层梯度 ===== + # Softmax + 交叉熵的梯度简化为: p - y + d_z2 = self.probs - y # (batch, 10) - # 应用类别权重 - if self.class_weight is not None: - for i in range(batch_size): - d_z2[i, y[i]] -= self.class_weight[y[i]] - else: - d_z2[np.arange(batch_size), y] -= 1 + # ===== 第二层梯度 ===== + d_W2 = self.a1.T @ d_z2 # (128, 10) + d_b2 = np.sum(d_z2, axis=0) # (10,) - # 第二层梯度 - d_W2 = self.a1.T @ d_z2 - d_b2 = np.sum(d_z2, axis=0) + # ===== 隐藏层梯度 ===== + d_a1 = d_z2 @ self.W2.T # (batch, 128) + d_z1 = d_a1 * self.relu_derivative(self.z1) # (batch, 128) - # 隐藏层梯度 - d_a1 = d_z2 @ self.W2.T - d_z1 = d_a1 * self.relu_derivative(self.z1) + # ===== 第一层梯度 ===== + d_W1 = X.T @ d_z1 # (784, 128) + d_b1 = np.sum(d_z1, axis=0) # (128,) - # Dropout梯度 - if self.keep_prob < 1.0 and hasattr(self, 'd1'): - d_z1 *= self.d1 - d_z1 /= self.keep_prob + # ===== 梯度裁剪(防止梯度爆炸) ===== + max_grad = 1.0 + d_W1 = np.clip(d_W1, -max_grad, max_grad) + d_W2 = np.clip(d_W2, -max_grad, max_grad) + d_b1 = np.clip(d_b1, -max_grad, max_grad) + d_b2 = np.clip(d_b2, -max_grad, max_grad) - # 第一层梯度 - d_W1 = X.T @ d_z1 - d_b1 = np.sum(d_z1, axis=0) - - # 更新 + # ===== 更新权重(梯度下降) ===== self.W1 -= self.lr * d_W1 / batch_size self.b1 -= self.lr * d_b1 / batch_size self.W2 -= self.lr * d_W2 / batch_size self.b2 -= self.lr * d_b2 / batch_size - def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): - """训练""" - num_samples = len(X) - num_batches = (num_samples + batch_size - 1) // batch_size + def cross_entropy_loss(self, probs, y): + """ + 交叉熵损失 + + L = -sum(y * log(p)) / N + """ + # 取真实类别的概率 + correct_probs = probs[np.arange(len(y)), y.argmax(axis=1)] + # 避免log(0) + loss = -np.mean(np.log(np.clip(correct_probs, 1e-10, 1.0))) + return loss + + + def fit(self, X_train, y_train, X_val=None, y_val=None, + epochs=50, batch_size=64, verbose=True): + """ + 训练模型 + + Args: + X_train: 训练数据 (N, 784) + y_train: 训练标签 (N, 10) One-Hot + X_val: 验证数据(可选) + y_val: 验证标签(可选) + epochs: 训练轮数 + batch_size: 批大小 + verbose: 是否打印进度 + """ + N = len(X_train) + num_batches = (N + batch_size - 1) // batch_size + for epoch in range(epochs): - # 打乱 - indices = np.random.permutation(num_samples) - X_shuffled = X[indices] - y_shuffled = y[indices] - + # ===== 打乱数据 ===== + indices = np.random.permutation(N) + X_shuffled = X_train[indices] + y_shuffled = y_train[indices] + epoch_loss = 0 - self.training = True # 开启Dropout - + + # ===== 批训练 ===== for batch_idx in range(num_batches): start = batch_idx * batch_size - end = min(start + batch_size, num_samples) + end = min(start + batch_size, N) X_batch = X_shuffled[start:end] y_batch = y_shuffled[start:end] - - # 前向 + 反向 + + # 前向传播 probs = self.forward(X_batch) + + # 反向传播 self.backward(X_batch, y_batch) - - # 损失 - loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) + + # 计算损失 + loss = self.cross_entropy_loss(probs, y_batch) epoch_loss += loss - - self.training = False # 关闭Dropout - - # 评估 - if verbose and (epoch + 1) % 20 == 0: - train_acc = self.accuracy(X, y) + + # ===== 打印进度 ===== + if verbose and (epoch + 1) % 5 == 0: + train_acc = self.accuracy(X_train, y_train) msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" + if X_val is not None: val_acc = self.accuracy(X_val, y_val) msg += f" | 测试准确率: {val_acc:.4f}" + print(msg) - + return self + def predict(self, X): - return np.argmax(self.forward(X), axis=1) + """ + 预测类别 + + Args: + X: (N, 784) 图像 + + Returns: + predictions: (N,) 预测的类别标签 (0-9) + """ + probs = self.forward(X) + return np.argmax(probs, axis=1) + def predict_proba(self, X): + """ + 预测概率 + + Returns: + probs: (N, 10) 每个类的概率 + """ return self.forward(X) + def accuracy(self, X, y): - return np.mean(self.predict(X) == y) + """ + 计算准确率 + + Args: + X: (N, 784) 图像 + y: (N,) 或 (N, 10) 标签 + """ + if len(y.shape) > 1: + y = np.argmax(y, axis=1) + predictions = self.predict(X) + return np.mean(predictions == y) + def save(self, filepath): """保存模型权重""" @@ -317,26 +263,43 @@ class MLP(BaseModel): np.save(filepath + '_b1.npy', self.b1) np.save(filepath + '_W2.npy', self.W2) np.save(filepath + '_b2.npy', self.b2) - print(f"模型已保存: {filepath}") + print(f"\n模型已保存: {filepath}") + @staticmethod - def load(filepath, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0): + def load(filepath, input_size=784, hidden_size=128, num_classes=10, learning_rate=0.1): """加载模型权重""" - model = MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob) + model = MLP(input_size, hidden_size, num_classes, learning_rate) model.W1 = np.load(filepath + '_W1.npy') model.b1 = np.load(filepath + '_b1.npy') model.W2 = np.load(filepath + '_W2.npy') model.b2 = np.load(filepath + '_b2.npy') - print(f"模型已加载: {filepath}") + print(f"\n模型已加载: {filepath}") return model -def create_model(model_type, input_size, hidden_size=64, num_classes=2, - learning_rate=0.1, keep_prob=1.0, class_weight=None): - """工厂函数:创建模型""" - if model_type == 'lr': - return LogisticRegression(input_size, num_classes, learning_rate, class_weight) - elif model_type == 'mlp': - return MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob, class_weight) - else: - raise ValueError(f"未知模型类型: {model_type}") +# ===== 测试代码 ===== +if __name__ == '__main__': + # 简单测试 + print("测试MLP模型...") + + model = MLP(input_size=784, hidden_size=128, num_classes=10, learning_rate=0.1) + + # 模拟数据 + X_test = np.random.randn(32, 784) + y_test = np.zeros((32, 10)) + for i in range(32): + y_test[i, i % 10] = 1 + + # 前向传播测试 + probs = model.forward(X_test) + print(f"输出概率形状: {probs.shape}") + print(f"概率和: {probs[0].sum():.4f} (应该接近1)") + + # 反向传播测试 + model.backward(X_test, y_test) + print("反向传播测试通过!") + + # 预测测试 + preds = model.predict(X_test) + print(f"预测结果: {preds}") \ No newline at end of file