diff --git a/49张兴宇.xls b/49张兴宇.xls new file mode 100644 index 0000000..a74a261 Binary files /dev/null and b/49张兴宇.xls differ diff --git a/config.py b/config.py index 12a552d..ec6ef16 100644 --- a/config.py +++ b/config.py @@ -1,40 +1,40 @@ -# -*- coding: utf-8 -*- -""" -配置文件 - 所有超参数集中管理 - -设计思路: -将超参数分门别类,学生可以单独修改某一类而不会影响其他 -""" - -# ==================== 数据相关 ==================== -DATA_DIR = 'data/ChnSentiCorp' # 数据集路径 -MAX_FEATURES = 3000 # 词表最大容量 -MAX_SEQ_LEN = 100 # 句子最大长度(词数) -VECTORIZER_TYPE = 'tfidf' # 'tfidf' 或 'bow'(向量化方式) - -# ==================== 模型相关 ==================== -MODEL_TYPE = 'mlp' # 'mlp' 或 'lr'(模型类型) -HIDDEN_SIZE = 64 # MLP隐藏层大小(LR忽略) -NUM_CLASSES = 2 # 类别数(正面/负面二分类) -KEEP_PROB = 1.0 # Dropout保留概率(LR忽略,设为1即可) - -# ==================== 训练相关 ==================== -LEARNING_RATE = 0.05 # 学习率 -NUM_EPOCHS = 100 # 训练轮数 -BATCH_SIZE = 64 # 批次大小 - -# ==================== 类别权重(解决数据不平衡问题)==================== -USE_CLASS_WEIGHT = True # True=启用类别权重, False=不启用(对比用) -# 权重计算公式: n_samples / (n_classes * n_class_i) -# 正面评论多所以权重小,负面评论少所以权重大 -CLASS_WEIGHT_POS = 0.73 # 正面类权重(自动计算) -CLASS_WEIGHT_NEG = 1.58 # 负面类权重(自动计算) - -# ==================== 实验相关 ==================== -RUN_COMPARISON = False # True=运行对比实验, False=运行单个模型 -COMPARE_MODELS = ['lr', 'mlp'] # 要对比的模型列表 -COMPARE_VECTORS = ['bow', 'tfidf'] # 要对比的向量化方式 - -# ==================== 其他 ==================== -RANDOM_SEED = 42 # 随机种子(保证可复现) -VERBOSE = True # 打印详细日志 +# -*- coding: utf-8 -*- +""" +配置文件 - 所有超参数集中管理 + +设计思路: +将超参数分门别类,学生可以单独修改某一类而不会影响其他 +""" + +# ==================== 数据相关 ==================== +DATA_DIR = 'data/ChnSentiCorp' # 数据集路径 +MAX_FEATURES = 3000 # 词表最大容量 +MAX_SEQ_LEN = 100 # 句子最大长度(词数) +VECTORIZER_TYPE = 'tfidf' # 'tfidf' 或 'bow'(向量化方式) + +# ==================== 模型相关 ==================== +MODEL_TYPE = 'mlp' # 'mlp' 或 'lr'(模型类型) +HIDDEN_SIZE = 60 # MLP隐藏层大小(LR忽略) +NUM_CLASSES = 2 # 类别数(正面/负面二分类) +KEEP_PROB = 1.0 # Dropout保留概率(LR忽略,设为1即可) + +# ==================== 训练相关 ==================== +LEARNING_RATE = 0.05 # 学习率 +NUM_EPOCHS = 100 # 训练轮数 +BATCH_SIZE = 50 # 批次大小 + +# ==================== 类别权重(解决数据不平衡问题)==================== +USE_CLASS_WEIGHT = True # True=启用类别权重, False=不启用(对比用) +# 权重计算公式: n_samples / (n_classes * n_class_i) +# 正面评论多所以权重小,负面评论少所以权重大 +CLASS_WEIGHT_POS = 1.66 # 正面类权重(自动计算) +CLASS_WEIGHT_NEG = 0.99 # 负面类权重(自动计算) + +# ==================== 实验相关 ==================== +RUN_COMPARISON = False # True=运行对比实验, False=运行单个模型 +COMPARE_MODELS = ['lr', 'mlp'] # 要对比的模型列表 +COMPARE_VECTORS = ['bow', 'tfidf'] # 要对比的向量化方式 + +# ==================== 其他 ==================== +RANDOM_SEED = 42 # 随机种子(保证可复现) +VERBOSE = True # 打印详细日志 diff --git a/dataset.py b/dataset.py index e554362..4f4163e 100644 --- a/dataset.py +++ b/dataset.py @@ -1,286 +1,286 @@ -# -*- coding: utf-8 -*- -""" -数据加载与向量化模块 - -支持两种向量化方法: -1. BoW (Bag of Words) - 词频向量 -2. TF-IDF - 词频-逆文档频率向量 - -TF-IDF 的优势: -- 降低常见词(如"的"、"是")的权重 -- 提升罕见词的信息量 -- 通常效果优于简单BoW -""" - -import os -import re -import csv -import math -import jieba -import numpy as np -from collections import Counter - -try: - import urllib.request - import ssl - DOWNLOAD_AVAILABLE = True -except ImportError: - DOWNLOAD_AVAILABLE = False - - -DATASET_URL = "https://raw.githubusercontent.com/SophonPlus/ChineseNlpCorpus/master/datasets/ChnSentiCorp_htl_all/ChnSentiCorp_htl_all.csv" - - -def download_dataset(data_dir): - """下载数据集(如果不存在)""" - csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') - - if os.path.exists(csv_path): - print(f"数据已存在: {csv_path}") - return True - - if not DOWNLOAD_AVAILABLE: - return False - - print("正在下载数据集...") - ssl_context = ssl.create_default_context() - ssl_context.check_hostname = False - ssl_context.verify_mode = ssl.CERT_NONE - - try: - request = urllib.request.Request(DATASET_URL, headers={'User-Agent': 'Mozilla/5.0'}) - response = urllib.request.urlopen(request, timeout=120, context=ssl_context) - os.makedirs(data_dir, exist_ok=True) - with open(csv_path, 'wb') as f: - f.write(response.read()) - print(f"下载完成: {csv_path}") - return True - except Exception as e: - print(f"下载失败: {e}") - return False - - -def load_raw_data(data_dir): - """加载原始数据""" - csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') - texts, labels = [], [] - - with open(csv_path, 'r', encoding='utf-8') as f: - reader = csv.reader(f) - for row in reader: - if len(row) < 2: - continue - try: - label = int(row[0]) - review = row[1].strip() - if review: - texts.append(review) - labels.append(label) - except (ValueError, IndexError): - continue - - return texts, np.array(labels) - - -def tokenize(text): - """中文分词""" - text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text) - words = jieba.lcut(text) - return [w for w in words if len(w) > 1] - - -# ==================== 向量化器 ==================== - -class BaseVectorizer: - """向量化器基类""" - def fit(self, texts): pass - def transform(self, texts): pass - def fit_transform(self, texts): pass - - -class BoWVectorizer(BaseVectorizer): - """ - 词袋模型 (Bag of Words) - - 原理:统计每个词在文本中出现的次数 - 向量维度 = 词表大小 - 每个维度 = 该词在本文本中出现的次数 - """ - - def __init__(self, max_features, max_seq_len): - self.max_features = max_features - self.max_seq_len = max_seq_len - self.vocab = {} - self.doc_freq = {} # 文档频率 - self.num_docs = 0 - - def fit(self, texts): - """构建词表(基于词频)""" - counter = Counter() - doc_counter = Counter() # 统计包含该词的文档数 - - for text in texts: - words = tokenize(text) - unique_words = set(words) - counter.update(words) - for w in unique_words: - doc_counter[w] += 1 - - self.num_docs = len(texts) - - # 取最高频的词 - most_common = counter.most_common(self.max_features) - self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)} - - # 记录文档频率(用于TF-IDF) - self.doc_freq = {w: doc_counter[w] for w in self.vocab} - - print(f" BoW词表大小: {len(self.vocab)}") - return self - - def transform(self, texts): - """将文本转换为词频向量""" - vectors = [] - for text in texts: - words = tokenize(text) - freq = [0] * self.max_seq_len - for i, word in enumerate(words[:self.max_seq_len]): - if word in self.vocab: - freq[i] = 1 # 二值(出现=1,不出现=0) - vectors.append(freq) - return np.array(vectors, dtype=np.float32) - - def fit_transform(self, texts): - self.fit(texts) - return self.transform(texts) - - -class TFIDFVectorizer(BaseVectorizer): - """ - TF-IDF 向量器 - - 原理: - - TF(词频) = 词在本文本中出现的次数 - - IDF(逆文档频率) = log(总文档数 / 包含该词的文档数) - - TF-IDF = TF × IDF - - 优势: - - 降低常见无意义词的权重(如"的"、"是") - - 提升罕见但有信息量的词 - """ - - def __init__(self, max_features, max_seq_len): - self.max_features = max_features - self.max_seq_len = max_seq_len - self.vocab = {} - self.idf = {} # 存储每个词的IDF值 - self.num_docs = 0 - - def fit(self, texts): - """构建词表并计算IDF""" - counter = Counter() - doc_counter = Counter() - - for text in texts: - words = tokenize(text) - unique_words = set(words) - counter.update(words) - for w in unique_words: - doc_counter[w] += 1 - - self.num_docs = len(texts) - - # 计算每个词的IDF - # IDF = log(总文档数 / 包含该词的文档数) - idf_values = {} - for word, df in doc_counter.items(): - idf_values[word] = math.log(self.num_docs / (df + 1)) + 1 # 加1防零 - - # 取IDF值最高的词(信息量最大的词) - sorted_words = sorted(idf_values.items(), key=lambda x: x[1], reverse=True) - self.vocab = {word: idx for idx, (word, _) in enumerate(sorted_words[:self.max_features])} - - # 保存IDF值 - self.idf = {word: idf_values[word] for word in self.vocab} - - print(f" TF-IDF词表大小: {len(self.vocab)}") - print(f" 平均IDF: {np.mean(list(self.idf.values())):.3f}") - return self - - def transform(self, texts): - """将文本转换为TF-IDF向量""" - vectors = [] - for text in texts: - words = tokenize(text) - - # 计算TF - tf = Counter(words) - tf_sum = len(words) if words else 1 - - # 生成向量 - vec = [0.0] * self.max_seq_len - for i, word in enumerate(words[:self.max_seq_len]): - if word in self.vocab: - # TF × IDF - vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0) - vectors.append(vec) - - return np.array(vectors, dtype=np.float32) - - def fit_transform(self, texts): - self.fit(texts) - return self.transform(texts) - - -def load_data(data_dir, max_features, max_seq_len, vectorizer_type='tfidf'): - """ - 加载并向量化数据 - - 参数: - - vectorizer_type: 'tfidf' 或 'bow' - """ - if not download_dataset(data_dir): - raise RuntimeError("数据加载失败,请检查网络或手动下载数据集") - - print("正在加载数据...") - texts, labels = load_raw_data(data_dir) - print(f"总评论数: {len(texts)}, 正面: {sum(labels)}, 负面: {len(labels) - sum(labels)}") - - # 选择向量化器 - if vectorizer_type == 'tfidf': - vectorizer = TFIDFVectorizer(max_features, max_seq_len) - vec_name = "TF-IDF" - else: - vectorizer = BoWVectorizer(max_features, max_seq_len) - vec_name = "BoW" - - print(f"正在使用{vec_name}向量化...") - X = vectorizer.fit_transform(texts) - y = labels - - # 打乱并划分 - np.random.seed(42) - indices = np.random.permutation(len(X)) - X = X[indices] - y = y[indices] - - split_idx = int(len(X) * 0.8) - X_train, X_test = X[:split_idx], X[split_idx:] - y_train, y_test = y[:split_idx], y[split_idx:] - - print(f"训练集: {len(X_train)}条, 测试集: {len(X_test)}条") - - return X_train, y_train, X_test, y_test, vectorizer - - -if __name__ == '__main__': - # 测试 - print("=" * 60) - print("测试 TF-IDF 向量化") - print("=" * 60) - X_train, y_train, X_test, y_test, vec = load_data( - 'data/ChnSentiCorp', max_features=3000, max_seq_len=100, - vectorizer_type='tfidf' - ) - print(f"\nX_train shape: {X_train.shape}") - print(f"X_train sample (前5个特征): {X_train[0][:5]}") +# -*- coding: utf-8 -*- +""" +数据加载与向量化模块 + +支持两种向量化方法: +1. BoW (Bag of Words) - 词频向量 +2. TF-IDF - 词频-逆文档频率向量 + +TF-IDF 的优势: +- 降低常见词(如"的"、"是")的权重 +- 提升罕见词的信息量 +- 通常效果优于简单BoW +""" + +import os +import re +import csv +import math +import jieba +import numpy as np +from collections import Counter + +try: + import urllib.request + import ssl + DOWNLOAD_AVAILABLE = True +except ImportError: + DOWNLOAD_AVAILABLE = False + + +DATASET_URL = "https://raw.githubusercontent.com/SophonPlus/ChineseNlpCorpus/master/datasets/ChnSentiCorp_htl_all/ChnSentiCorp_htl_all.csv" + + +def download_dataset(data_dir): + """下载数据集(如果不存在)""" + csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') + + if os.path.exists(csv_path): + print(f"数据已存在: {csv_path}") + return True + + if not DOWNLOAD_AVAILABLE: + return False + + print("正在下载数据集...") + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + + try: + request = urllib.request.Request(DATASET_URL, headers={'User-Agent': 'Mozilla/5.0'}) + response = urllib.request.urlopen(request, timeout=120, context=ssl_context) + os.makedirs(data_dir, exist_ok=True) + with open(csv_path, 'wb') as f: + f.write(response.read()) + print(f"下载完成: {csv_path}") + return True + except Exception as e: + print(f"下载失败: {e}") + return False + + +def load_raw_data(data_dir): + """加载原始数据""" + csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') + texts, labels = [], [] + + with open(csv_path, 'r', encoding='utf-8') as f: + reader = csv.reader(f) + for row in reader: + if len(row) < 2: + continue + try: + label = int(row[0]) + review = row[1].strip() + if review: + texts.append(review) + labels.append(label) + except (ValueError, IndexError): + continue + + return texts, np.array(labels) + + +def tokenize(text): + """中文分词""" + text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text) + words = jieba.lcut(text) + return [w for w in words if len(w) > 1] + + +# ==================== 向量化器 ==================== + +class BaseVectorizer: + """向量化器基类""" + def fit(self, texts): pass + def transform(self, texts): pass + def fit_transform(self, texts): pass + + +class BoWVectorizer(BaseVectorizer): + """ + 词袋模型 (Bag of Words) + + 原理:统计每个词在文本中出现的次数 + 向量维度 = 词表大小 + 每个维度 = 该词在本文本中出现的次数 + """ + + def __init__(self, max_features, max_seq_len): + self.max_features = max_features + self.max_seq_len = max_seq_len + self.vocab = {} + self.doc_freq = {} # 文档频率 + self.num_docs = 0 + + def fit(self, texts): + """构建词表(基于词频)""" + counter = Counter() + doc_counter = Counter() # 统计包含该词的文档数 + + for text in texts: + words = tokenize(text) + unique_words = set(words) + counter.update(words) + for w in unique_words: + doc_counter[w] += 1 + + self.num_docs = len(texts) + + # 取最高频的词 + most_common = counter.most_common(self.max_features) + self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)} + + # 记录文档频率(用于TF-IDF) + self.doc_freq = {w: doc_counter[w] for w in self.vocab} + + print(f" BoW词表大小: {len(self.vocab)}") + return self + + def transform(self, texts): + """将文本转换为词频向量""" + vectors = [] + for text in texts: + words = tokenize(text) + freq = [0] * self.max_seq_len + for i, word in enumerate(words[:self.max_seq_len]): + if word in self.vocab: + freq[i] = 1 # 二值(出现=1,不出现=0) + vectors.append(freq) + return np.array(vectors, dtype=np.float32) + + def fit_transform(self, texts): + self.fit(texts) + return self.transform(texts) + + +class TFIDFVectorizer(BaseVectorizer): + """ + TF-IDF 向量器 + + 原理: + - TF(词频) = 词在本文本中出现的次数 + - IDF(逆文档频率) = log(总文档数 / 包含该词的文档数) + - TF-IDF = TF × IDF + + 优势: + - 降低常见无意义词的权重(如"的"、"是") + - 提升罕见但有信息量的词 + """ + + def __init__(self, max_features, max_seq_len): + self.max_features = max_features + self.max_seq_len = max_seq_len + self.vocab = {} + self.idf = {} # 存储每个词的IDF值 + self.num_docs = 0 + + def fit(self, texts): + """构建词表并计算IDF""" + counter = Counter() + doc_counter = Counter() + + for text in texts: + words = tokenize(text) + unique_words = set(words) + counter.update(words) + for w in unique_words: + doc_counter[w] += 1 + + self.num_docs = len(texts) + + # 计算每个词的IDF + # IDF = log(总文档数 / 包含该词的文档数) + idf_values = {} + for word, df in doc_counter.items(): + idf_values[word] = math.log(self.num_docs / (df + 1)) + 1 # 加1防零 + + # 取IDF值最高的词(信息量最大的词) + sorted_words = sorted(idf_values.items(), key=lambda x: x[1], reverse=True) + self.vocab = {word: idx for idx, (word, _) in enumerate(sorted_words[:self.max_features])} + + # 保存IDF值 + self.idf = {word: idf_values[word] for word in self.vocab} + + print(f" TF-IDF词表大小: {len(self.vocab)}") + print(f" 平均IDF: {np.mean(list(self.idf.values())):.3f}") + return self + + def transform(self, texts): + """将文本转换为TF-IDF向量""" + vectors = [] + for text in texts: + words = tokenize(text) + + # 计算TF + tf = Counter(words) + tf_sum = len(words) if words else 1 + + # 生成向量 + vec = [0.0] * self.max_seq_len + for i, word in enumerate(words[:self.max_seq_len]): + if word in self.vocab: + # TF × IDF + vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0) + vectors.append(vec) + + return np.array(vectors, dtype=np.float32) + + def fit_transform(self, texts): + self.fit(texts) + return self.transform(texts) + + +def load_data(data_dir, max_features, max_seq_len, vectorizer_type='tfidf'): + """ + 加载并向量化数据 + + 参数: + - vectorizer_type: 'tfidf' 或 'bow' + """ + if not download_dataset(data_dir): + raise RuntimeError("数据加载失败,请检查网络或手动下载数据集") + + print("正在加载数据...") + texts, labels = load_raw_data(data_dir) + print(f"总评论数: {len(texts)}, 正面: {sum(labels)}, 负面: {len(labels) - sum(labels)}") + + # 选择向量化器 + if vectorizer_type == 'tfidf': + vectorizer = TFIDFVectorizer(max_features, max_seq_len) + vec_name = "TF-IDF" + else: + vectorizer = BoWVectorizer(max_features, max_seq_len) + vec_name = "BoW" + + print(f"正在使用{vec_name}向量化...") + X = vectorizer.fit_transform(texts) + y = labels + + # 打乱并划分 + np.random.seed(42) + indices = np.random.permutation(len(X)) + X = X[indices] + y = y[indices] + + split_idx = int(len(X) * 0.8) + X_train, X_test = X[:split_idx], X[split_idx:] + y_train, y_test = y[:split_idx], y[split_idx:] + + print(f"训练集: {len(X_train)}条, 测试集: {len(X_test)}条") + + return X_train, y_train, X_test, y_test, vectorizer + + +if __name__ == '__main__': + # 测试 + print("=" * 60) + print("测试 TF-IDF 向量化") + print("=" * 60) + X_train, y_train, X_test, y_test, vec = load_data( + 'data/ChnSentiCorp', max_features=3000, max_seq_len=100, + vectorizer_type='tfidf' + ) + print(f"\nX_train shape: {X_train.shape}") + print(f"X_train sample (前5个特征): {X_train[0][:5]}") diff --git a/main.py b/main.py index eaeaadc..2dbbe1c 100644 --- a/main.py +++ b/main.py @@ -1,34 +1,34 @@ -# -*- coding: utf-8 -*- -""" -主程序入口 - -使用方式: - -1. 运行单个模型(默认): - python main.py - - 修改 config.py 中的 MODEL_TYPE 和 VECTORIZER_TYPE 来切换配置 - -2. 运行对比实验: - 修改 config.py 中 RUN_COMPARISON = True - - 这会依次运行: - - 实验1: BoW vs TF-IDF (固定LR模型) - - 实验2: LR vs MLP (固定TF-IDF) - - 实验3: 不同学习率对比 - - 实验4: 不同隐藏层大小对比 - - 最后输出汇总报告 -""" - -from train import main - -if __name__ == '__main__': - print("\n" + "=" * 70) - print("文本分类实验 - 纯NumPy实现") - print("数据集: ChnSentiCorp (中文酒店评论)") - print("模型: Logistic Regression / MLP") - print("向量化: BoW / TF-IDF") - print("=" * 70 + "\n") - - main() +# -*- coding: utf-8 -*- +""" +主程序入口 + +使用方式: + +1. 运行单个模型(默认): + python main.py + + 修改 config.py 中的 MODEL_TYPE 和 VECTORIZER_TYPE 来切换配置 + +2. 运行对比实验: + 修改 config.py 中 RUN_COMPARISON = True + + 这会依次运行: + - 实验1: BoW vs TF-IDF (固定LR模型) + - 实验2: LR vs MLP (固定TF-IDF) + - 实验3: 不同学习率对比 + - 实验4: 不同隐藏层大小对比 + + 最后输出汇总报告 +""" + +from train import main + +if __name__ == '__main__': + print("\n" + "=" * 70) + print("文本分类实验 - 纯NumPy实现") + print("数据集: ChnSentiCorp (中文酒店评论)") + print("模型: Logistic Regression / MLP") + print("向量化: BoW / TF-IDF") + print("=" * 70 + "\n") + + main() diff --git a/model_numpy.py b/model_numpy.py index e8d7adf..cdec28a 100644 --- a/model_numpy.py +++ b/model_numpy.py @@ -1,342 +1,342 @@ -# -*- coding: utf-8 -*- -""" -模型模块 - 纯NumPy实现 - -支持两种模型: -1. Logistic Regression(逻辑回归)- 线性模型 -2. MLP(多层感知机)- 两层全连接网络 - -设计思路: -- 两种模型都共享相同的接口,方便对比 -- 代码简洁,每行都有详细注释 -- 手动实现反向传播,原理透明 -""" - -import numpy as np - - -class BaseModel: - """模型基类""" - def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): pass - def predict(self, X): pass - def predict_proba(self, X): pass - def accuracy(self, X, y): pass - - -class LogisticRegression(BaseModel): - """ - 逻辑回归(线性分类器) - - 结构:输入 → 线性变换 → Softmax → 输出 - - 原理: - - 线性变换: z = X @ W + b - - Softmax: 将线性输出转为概率分布 - - 参数量:input_size × num_classes + num_classes - """ - - def __init__(self, input_size, num_classes=2, learning_rate=0.1, - class_weight=None, seed=42): - np.random.seed(seed) - - # 权重初始化(Xavier) - self.W = np.random.randn(input_size, num_classes) * np.sqrt(2.0 / input_size) - self.b = np.zeros(num_classes) - - self.lr = learning_rate - self.input_size = input_size - self.num_classes = num_classes - self.class_weight = class_weight # 类别权重 - - total_params = input_size * num_classes + num_classes - print(f"LogisticRegression: {input_size} -> {num_classes}, 参数量: {total_params}") - - def softmax(self, x): - """Softmax函数""" - x_shifted = x - np.max(x, axis=1, keepdims=True) - exp_x = np.exp(x_shifted) - return exp_x / np.sum(exp_x, axis=1, keepdims=True) - - def forward(self, X): - """前向传播""" - # 线性变换 - z = X @ self.W + self.b - # Softmax输出概率 - return self.softmax(z) - - def backward(self, X, y): - """反向传播(梯度下降)""" - batch_size = X.shape[0] - probs = self.forward(X) - - # Softmax + 交叉熵梯度 - d_z = probs.copy() - - # 应用类别权重:减去权重值而不是1 - # 公式: dL/dz_y = w_y * (p_y - 1) = w_y*p_y - w_y - if self.class_weight is not None: - for i in range(batch_size): - d_z[i, y[i]] -= self.class_weight[y[i]] - else: - d_z[np.arange(batch_size), y] -= 1 - - # 梯度 - d_W = X.T @ d_z - d_b = np.sum(d_z, axis=0) - - # 更新 - self.W -= self.lr * d_W / batch_size - self.b -= self.lr * d_b / batch_size - - def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): - """训练""" - num_samples = len(X) - num_batches = (num_samples + batch_size - 1) // batch_size - - for epoch in range(epochs): - # 打乱 - indices = np.random.permutation(num_samples) - X_shuffled = X[indices] - y_shuffled = y[indices] - - epoch_loss = 0 - for batch_idx in range(num_batches): - start = batch_idx * batch_size - end = min(start + batch_size, num_samples) - X_batch = X_shuffled[start:end] - y_batch = y_shuffled[start:end] - - # 前向 + 反向 - probs = self.forward(X_batch) - self.backward(X_batch, y_batch) - - # 损失 - loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) - epoch_loss += loss - - # 评估 - if verbose and (epoch + 1) % 20 == 0: - train_acc = self.accuracy(X, y) - msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" - if X_val is not None: - val_acc = self.accuracy(X_val, y_val) - msg += f" | 测试准确率: {val_acc:.4f}" - print(msg) - - return self - - def predict(self, X): - return np.argmax(self.forward(X), axis=1) - - def predict_proba(self, X): - return self.forward(X) - - def accuracy(self, X, y): - return np.mean(self.predict(X) == y) - - def save(self, filepath): - """保存模型权重""" - np.save(filepath + '_W.npy', self.W) - np.save(filepath + '_b.npy', self.b) - print(f"模型已保存: {filepath}") - - @staticmethod - def load(filepath, input_size, num_classes=2, learning_rate=0.1): - """加载模型权重""" - model = LogisticRegression(input_size, num_classes, learning_rate) - model.W = np.load(filepath + '_W.npy') - model.b = np.load(filepath + '_b.npy') - print(f"模型已加载: {filepath}") - return model - - -class MLP(BaseModel): - """ - 多层感知机(神经网络) - - 结构:输入 → 线性变换 → ReLU → 线性变换 → Softmax → 输出 - - 和LogisticRegression的区别: - - 多了一层隐藏层 + 非线性激活 - - 可以学习非线性关系 - - 参数量更大 - - 参数量: - - W1: input_size × hidden_size - - b1: hidden_size - - W2: hidden_size × num_classes - - b2: num_classes - """ - - def __init__(self, input_size, hidden_size=64, num_classes=2, - learning_rate=0.1, keep_prob=1.0, class_weight=None, seed=42): - np.random.seed(seed) - - # 第一层权重 - self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size) - self.b1 = np.zeros(hidden_size) - - # 第二层权重 - self.W2 = np.random.randn(hidden_size, num_classes) * np.sqrt(2.0 / hidden_size) - self.b2 = np.zeros(num_classes) - - self.lr = learning_rate - self.keep_prob = keep_prob - self.hidden_size = hidden_size - self.input_size = input_size - self.num_classes = num_classes - self.class_weight = class_weight # 类别权重 - - total_params = (input_size * hidden_size + hidden_size + - hidden_size * num_classes + num_classes) - print(f"MLP: {input_size} -> {hidden_size} -> {num_classes}, 参数量: {total_params}") - - def relu(self, x): - """ReLU激活""" - return np.maximum(0, x) - - def relu_derivative(self, x): - """ReLU导数""" - return (x > 0).astype(float) - - def softmax(self, x): - """Softmax函数""" - x_shifted = x - np.max(x, axis=1, keepdims=True) - exp_x = np.exp(x_shifted) - return exp_x / np.sum(exp_x, axis=1, keepdims=True) - - def forward(self, X): - """前向传播""" - # 第一层 - self.z1 = X @ self.W1 + self.b1 - self.a1 = self.relu(self.z1) - - # Dropout(训练时) - if self.keep_prob < 1.0 and hasattr(self, 'training'): - self.d1 = (np.random.rand(*self.a1.shape) < self.keep_prob).astype(float) - self.a1 *= self.d1 - self.a1 /= self.keep_prob - - # 第二层 - self.z2 = self.a1 @ self.W2 + self.b2 - self.probs = self.softmax(self.z2) - - return self.probs - - def backward(self, X, y): - """反向传播""" - batch_size = X.shape[0] - - # 输出层梯度 - d_z2 = self.probs.copy() - - # 应用类别权重 - if self.class_weight is not None: - for i in range(batch_size): - d_z2[i, y[i]] -= self.class_weight[y[i]] - else: - d_z2[np.arange(batch_size), y] -= 1 - - # 第二层梯度 - d_W2 = self.a1.T @ d_z2 - d_b2 = np.sum(d_z2, axis=0) - - # 隐藏层梯度 - d_a1 = d_z2 @ self.W2.T - d_z1 = d_a1 * self.relu_derivative(self.z1) - - # Dropout梯度 - if self.keep_prob < 1.0 and hasattr(self, 'd1'): - d_z1 *= self.d1 - d_z1 /= self.keep_prob - - # 第一层梯度 - d_W1 = X.T @ d_z1 - d_b1 = np.sum(d_z1, axis=0) - - # 更新 - self.W1 -= self.lr * d_W1 / batch_size - self.b1 -= self.lr * d_b1 / batch_size - self.W2 -= self.lr * d_W2 / batch_size - self.b2 -= self.lr * d_b2 / batch_size - - def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): - """训练""" - num_samples = len(X) - num_batches = (num_samples + batch_size - 1) // batch_size - - for epoch in range(epochs): - # 打乱 - indices = np.random.permutation(num_samples) - X_shuffled = X[indices] - y_shuffled = y[indices] - - epoch_loss = 0 - self.training = True # 开启Dropout - - for batch_idx in range(num_batches): - start = batch_idx * batch_size - end = min(start + batch_size, num_samples) - X_batch = X_shuffled[start:end] - y_batch = y_shuffled[start:end] - - # 前向 + 反向 - probs = self.forward(X_batch) - self.backward(X_batch, y_batch) - - # 损失 - loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) - epoch_loss += loss - - self.training = False # 关闭Dropout - - # 评估 - if verbose and (epoch + 1) % 20 == 0: - train_acc = self.accuracy(X, y) - msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" - if X_val is not None: - val_acc = self.accuracy(X_val, y_val) - msg += f" | 测试准确率: {val_acc:.4f}" - print(msg) - - return self - - def predict(self, X): - return np.argmax(self.forward(X), axis=1) - - def predict_proba(self, X): - return self.forward(X) - - def accuracy(self, X, y): - return np.mean(self.predict(X) == y) - - def save(self, filepath): - """保存模型权重""" - np.save(filepath + '_W1.npy', self.W1) - np.save(filepath + '_b1.npy', self.b1) - np.save(filepath + '_W2.npy', self.W2) - np.save(filepath + '_b2.npy', self.b2) - print(f"模型已保存: {filepath}") - - @staticmethod - def load(filepath, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0): - """加载模型权重""" - model = MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob) - model.W1 = np.load(filepath + '_W1.npy') - model.b1 = np.load(filepath + '_b1.npy') - model.W2 = np.load(filepath + '_W2.npy') - model.b2 = np.load(filepath + '_b2.npy') - print(f"模型已加载: {filepath}") - return model - - -def create_model(model_type, input_size, hidden_size=64, num_classes=2, - learning_rate=0.1, keep_prob=1.0, class_weight=None): - """工厂函数:创建模型""" - if model_type == 'lr': - return LogisticRegression(input_size, num_classes, learning_rate, class_weight) - elif model_type == 'mlp': - return MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob, class_weight) - else: - raise ValueError(f"未知模型类型: {model_type}") +# -*- coding: utf-8 -*- +""" +模型模块 - 纯NumPy实现 + +支持两种模型: +1. Logistic Regression(逻辑回归)- 线性模型 +2. MLP(多层感知机)- 两层全连接网络 + +设计思路: +- 两种模型都共享相同的接口,方便对比 +- 代码简洁,每行都有详细注释 +- 手动实现反向传播,原理透明 +""" + +import numpy as np + + +class BaseModel: + """模型基类""" + def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): pass + def predict(self, X): pass + def predict_proba(self, X): pass + def accuracy(self, X, y): pass + + +class LogisticRegression(BaseModel): + """ + 逻辑回归(线性分类器) + + 结构:输入 → 线性变换 → Softmax → 输出 + + 原理: + - 线性变换: z = X @ W + b + - Softmax: 将线性输出转为概率分布 + + 参数量:input_size × num_classes + num_classes + """ + + def __init__(self, input_size, num_classes=2, learning_rate=0.1, + class_weight=None, seed=42): + np.random.seed(seed) + + # 权重初始化(Xavier) + self.W = np.random.randn(input_size, num_classes) * np.sqrt(2.0 / input_size) + self.b = np.zeros(num_classes) + + self.lr = learning_rate + self.input_size = input_size + self.num_classes = num_classes + self.class_weight = class_weight # 类别权重 + + total_params = input_size * num_classes + num_classes + print(f"LogisticRegression: {input_size} -> {num_classes}, 参数量: {total_params}") + + def softmax(self, x): + """Softmax函数""" + x_shifted = x - np.max(x, axis=1, keepdims=True) + exp_x = np.exp(x_shifted) + return exp_x / np.sum(exp_x, axis=1, keepdims=True) + + def forward(self, X): + """前向传播""" + # 线性变换 + z = X @ self.W + self.b + # Softmax输出概率 + return self.softmax(z) + + def backward(self, X, y): + """反向传播(梯度下降)""" + batch_size = X.shape[0] + probs = self.forward(X) + + # Softmax + 交叉熵梯度 + d_z = probs.copy() + + # 应用类别权重:减去权重值而不是1 + # 公式: dL/dz_y = w_y * (p_y - 1) = w_y*p_y - w_y + if self.class_weight is not None: + for i in range(batch_size): + d_z[i, y[i]] -= self.class_weight[y[i]] + else: + d_z[np.arange(batch_size), y] -= 1 + + # 梯度 + d_W = X.T @ d_z + d_b = np.sum(d_z, axis=0) + + # 更新 + self.W -= self.lr * d_W / batch_size + self.b -= self.lr * d_b / batch_size + + def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): + """训练""" + num_samples = len(X) + num_batches = (num_samples + batch_size - 1) // batch_size + + for epoch in range(epochs): + # 打乱 + indices = np.random.permutation(num_samples) + X_shuffled = X[indices] + y_shuffled = y[indices] + + epoch_loss = 0 + for batch_idx in range(num_batches): + start = batch_idx * batch_size + end = min(start + batch_size, num_samples) + X_batch = X_shuffled[start:end] + y_batch = y_shuffled[start:end] + + # 前向 + 反向 + probs = self.forward(X_batch) + self.backward(X_batch, y_batch) + + # 损失 + loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) + epoch_loss += loss + + # 评估 + if verbose and (epoch + 1) % 20 == 0: + train_acc = self.accuracy(X, y) + msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" + if X_val is not None: + val_acc = self.accuracy(X_val, y_val) + msg += f" | 测试准确率: {val_acc:.4f}" + print(msg) + + return self + + def predict(self, X): + return np.argmax(self.forward(X), axis=1) + + def predict_proba(self, X): + return self.forward(X) + + def accuracy(self, X, y): + return np.mean(self.predict(X) == y) + + def save(self, filepath): + """保存模型权重""" + np.save(filepath + '_W.npy', self.W) + np.save(filepath + '_b.npy', self.b) + print(f"模型已保存: {filepath}") + + @staticmethod + def load(filepath, input_size, num_classes=2, learning_rate=0.1): + """加载模型权重""" + model = LogisticRegression(input_size, num_classes, learning_rate) + model.W = np.load(filepath + '_W.npy') + model.b = np.load(filepath + '_b.npy') + print(f"模型已加载: {filepath}") + return model + + +class MLP(BaseModel): + """ + 多层感知机(神经网络) + + 结构:输入 → 线性变换 → ReLU → 线性变换 → Softmax → 输出 + + 和LogisticRegression的区别: + - 多了一层隐藏层 + 非线性激活 + - 可以学习非线性关系 + - 参数量更大 + + 参数量: + - W1: input_size × hidden_size + - b1: hidden_size + - W2: hidden_size × num_classes + - b2: num_classes + """ + + def __init__(self, input_size, hidden_size=64, num_classes=2, + learning_rate=0.1, keep_prob=1.0, class_weight=None, seed=42): + np.random.seed(seed) + + # 第一层权重 + self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size) + self.b1 = np.zeros(hidden_size) + + # 第二层权重 + self.W2 = np.random.randn(hidden_size, num_classes) * np.sqrt(2.0 / hidden_size) + self.b2 = np.zeros(num_classes) + + self.lr = learning_rate + self.keep_prob = keep_prob + self.hidden_size = hidden_size + self.input_size = input_size + self.num_classes = num_classes + self.class_weight = class_weight # 类别权重 + + total_params = (input_size * hidden_size + hidden_size + + hidden_size * num_classes + num_classes) + print(f"MLP: {input_size} -> {hidden_size} -> {num_classes}, 参数量: {total_params}") + + def relu(self, x): + """ReLU激活""" + return np.maximum(0, x) + + def relu_derivative(self, x): + """ReLU导数""" + return (x > 0).astype(float) + + def softmax(self, x): + """Softmax函数""" + x_shifted = x - np.max(x, axis=1, keepdims=True) + exp_x = np.exp(x_shifted) + return exp_x / np.sum(exp_x, axis=1, keepdims=True) + + def forward(self, X): + """前向传播""" + # 第一层 + self.z1 = X @ self.W1 + self.b1 + self.a1 = self.relu(self.z1) + + # Dropout(训练时) + if self.keep_prob < 1.0 and hasattr(self, 'training'): + self.d1 = (np.random.rand(*self.a1.shape) < self.keep_prob).astype(float) + self.a1 *= self.d1 + self.a1 /= self.keep_prob + + # 第二层 + self.z2 = self.a1 @ self.W2 + self.b2 + self.probs = self.softmax(self.z2) + + return self.probs + + def backward(self, X, y): + """反向传播""" + batch_size = X.shape[0] + + # 输出层梯度 + d_z2 = self.probs.copy() + + # 应用类别权重 + if self.class_weight is not None: + for i in range(batch_size): + d_z2[i, y[i]] -= self.class_weight[y[i]] + else: + d_z2[np.arange(batch_size), y] -= 1 + + # 第二层梯度 + d_W2 = self.a1.T @ d_z2 + d_b2 = np.sum(d_z2, axis=0) + + # 隐藏层梯度 + d_a1 = d_z2 @ self.W2.T + d_z1 = d_a1 * self.relu_derivative(self.z1) + + # Dropout梯度 + if self.keep_prob < 1.0 and hasattr(self, 'd1'): + d_z1 *= self.d1 + d_z1 /= self.keep_prob + + # 第一层梯度 + d_W1 = X.T @ d_z1 + d_b1 = np.sum(d_z1, axis=0) + + # 更新 + self.W1 -= self.lr * d_W1 / batch_size + self.b1 -= self.lr * d_b1 / batch_size + self.W2 -= self.lr * d_W2 / batch_size + self.b2 -= self.lr * d_b2 / batch_size + + def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): + """训练""" + num_samples = len(X) + num_batches = (num_samples + batch_size - 1) // batch_size + + for epoch in range(epochs): + # 打乱 + indices = np.random.permutation(num_samples) + X_shuffled = X[indices] + y_shuffled = y[indices] + + epoch_loss = 0 + self.training = True # 开启Dropout + + for batch_idx in range(num_batches): + start = batch_idx * batch_size + end = min(start + batch_size, num_samples) + X_batch = X_shuffled[start:end] + y_batch = y_shuffled[start:end] + + # 前向 + 反向 + probs = self.forward(X_batch) + self.backward(X_batch, y_batch) + + # 损失 + loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) + epoch_loss += loss + + self.training = False # 关闭Dropout + + # 评估 + if verbose and (epoch + 1) % 20 == 0: + train_acc = self.accuracy(X, y) + msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" + if X_val is not None: + val_acc = self.accuracy(X_val, y_val) + msg += f" | 测试准确率: {val_acc:.4f}" + print(msg) + + return self + + def predict(self, X): + return np.argmax(self.forward(X), axis=1) + + def predict_proba(self, X): + return self.forward(X) + + def accuracy(self, X, y): + return np.mean(self.predict(X) == y) + + def save(self, filepath): + """保存模型权重""" + np.save(filepath + '_W1.npy', self.W1) + np.save(filepath + '_b1.npy', self.b1) + np.save(filepath + '_W2.npy', self.W2) + np.save(filepath + '_b2.npy', self.b2) + print(f"模型已保存: {filepath}") + + @staticmethod + def load(filepath, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0): + """加载模型权重""" + model = MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob) + model.W1 = np.load(filepath + '_W1.npy') + model.b1 = np.load(filepath + '_b1.npy') + model.W2 = np.load(filepath + '_W2.npy') + model.b2 = np.load(filepath + '_b2.npy') + print(f"模型已加载: {filepath}") + return model + + +def create_model(model_type, input_size, hidden_size=64, num_classes=2, + learning_rate=0.1, keep_prob=1.0, class_weight=None): + """工厂函数:创建模型""" + if model_type == 'lr': + return LogisticRegression(input_size, num_classes, learning_rate, class_weight) + elif model_type == 'mlp': + return MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob, class_weight) + else: + raise ValueError(f"未知模型类型: {model_type}")