From 016df0c747aeeafbc60b35e86c03d826091d6c42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E5=A9=89=E5=A7=97?= <2509165046@student.example.com> Date: Tue, 28 Apr 2026 11:05:27 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.py | 80 +++--- dataset.py | 572 ++++++++++++++++++++--------------------- main.py | 68 ++--- model_numpy.py | 684 ++++++++++++++++++++++++------------------------- predict.py | 528 +++++++++++++++++++------------------- 5 files changed, 966 insertions(+), 966 deletions(-) diff --git a/config.py b/config.py index 12a552d..d48f6e1 100644 --- a/config.py +++ b/config.py @@ -1,40 +1,40 @@ -# -*- coding: utf-8 -*- -""" -配置文件 - 所有超参数集中管理 - -设计思路: -将超参数分门别类,学生可以单独修改某一类而不会影响其他 -""" - -# ==================== 数据相关 ==================== -DATA_DIR = 'data/ChnSentiCorp' # 数据集路径 -MAX_FEATURES = 3000 # 词表最大容量 -MAX_SEQ_LEN = 100 # 句子最大长度(词数) -VECTORIZER_TYPE = 'tfidf' # 'tfidf' 或 'bow'(向量化方式) - -# ==================== 模型相关 ==================== -MODEL_TYPE = 'mlp' # 'mlp' 或 'lr'(模型类型) -HIDDEN_SIZE = 64 # MLP隐藏层大小(LR忽略) -NUM_CLASSES = 2 # 类别数(正面/负面二分类) -KEEP_PROB = 1.0 # Dropout保留概率(LR忽略,设为1即可) - -# ==================== 训练相关 ==================== -LEARNING_RATE = 0.05 # 学习率 -NUM_EPOCHS = 100 # 训练轮数 -BATCH_SIZE = 64 # 批次大小 - -# ==================== 类别权重(解决数据不平衡问题)==================== -USE_CLASS_WEIGHT = True # True=启用类别权重, False=不启用(对比用) -# 权重计算公式: n_samples / (n_classes * n_class_i) -# 正面评论多所以权重小,负面评论少所以权重大 -CLASS_WEIGHT_POS = 0.73 # 正面类权重(自动计算) -CLASS_WEIGHT_NEG = 1.58 # 负面类权重(自动计算) - -# ==================== 实验相关 ==================== -RUN_COMPARISON = False # True=运行对比实验, False=运行单个模型 -COMPARE_MODELS = ['lr', 'mlp'] # 要对比的模型列表 -COMPARE_VECTORS = ['bow', 'tfidf'] # 要对比的向量化方式 - -# ==================== 其他 ==================== -RANDOM_SEED = 42 # 随机种子(保证可复现) -VERBOSE = True # 打印详细日志 +# -*- coding: utf-8 -*- +""" +配置文件 - 所有超参数集中管理 + +设计思路: +将超参数分门别类,学生可以单独修改某一类而不会影响其他 +""" + +# ==================== 数据相关 ==================== +DATA_DIR = 'data/ChnSentiCorp' # 数据集路径 +MAX_FEATURES = 3000 # 词表最大容量 +MAX_SEQ_LEN = 100 # 句子最大长度(词数) +VECTORIZER_TYPE = 'tfidf' # 'tfidf' 或 'bow'(向量化方式) + +# ==================== 模型相关 ==================== +MODEL_TYPE = 'mlp' # 'mlp' 或 'lr'(模型类型) +HIDDEN_SIZE = 64 # MLP隐藏层大小(LR忽略) +NUM_CLASSES = 2 # 类别数(正面/负面二分类) +KEEP_PROB = 1.0 # Dropout保留概率(LR忽略,设为1即可) + +# ==================== 训练相关 ==================== +LEARNING_RATE = 0.05 # 学习率 +NUM_EPOCHS = 100 # 训练轮数 +BATCH_SIZE = 64 # 批次大小 + +# ==================== 类别权重(解决数据不平衡问题)==================== +USE_CLASS_WEIGHT = True # True=启用类别权重, False=不启用(对比用) +# 权重计算公式: n_samples / (n_classes * n_class_i) +# 正面评论多所以权重小,负面评论少所以权重大 +CLASS_WEIGHT_POS = 0.73 # 正面类权重(自动计算) +CLASS_WEIGHT_NEG = 1.58 # 负面类权重(自动计算) + +# ==================== 实验相关 ==================== +RUN_COMPARISON = False # True=运行对比实验, False=运行单个模型 +COMPARE_MODELS = ['lr', 'mlp'] # 要对比的模型列表 +COMPARE_VECTORS = ['bow', 'tfidf'] # 要对比的向量化方式 + +# ==================== 其他 ==================== +RANDOM_SEED = 42 # 随机种子(保证可复现) +VERBOSE = True # 打印详细日志 diff --git a/dataset.py b/dataset.py index e554362..4f4163e 100644 --- a/dataset.py +++ b/dataset.py @@ -1,286 +1,286 @@ -# -*- coding: utf-8 -*- -""" -数据加载与向量化模块 - -支持两种向量化方法: -1. BoW (Bag of Words) - 词频向量 -2. TF-IDF - 词频-逆文档频率向量 - -TF-IDF 的优势: -- 降低常见词(如"的"、"是")的权重 -- 提升罕见词的信息量 -- 通常效果优于简单BoW -""" - -import os -import re -import csv -import math -import jieba -import numpy as np -from collections import Counter - -try: - import urllib.request - import ssl - DOWNLOAD_AVAILABLE = True -except ImportError: - DOWNLOAD_AVAILABLE = False - - -DATASET_URL = "https://raw.githubusercontent.com/SophonPlus/ChineseNlpCorpus/master/datasets/ChnSentiCorp_htl_all/ChnSentiCorp_htl_all.csv" - - -def download_dataset(data_dir): - """下载数据集(如果不存在)""" - csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') - - if os.path.exists(csv_path): - print(f"数据已存在: {csv_path}") - return True - - if not DOWNLOAD_AVAILABLE: - return False - - print("正在下载数据集...") - ssl_context = ssl.create_default_context() - ssl_context.check_hostname = False - ssl_context.verify_mode = ssl.CERT_NONE - - try: - request = urllib.request.Request(DATASET_URL, headers={'User-Agent': 'Mozilla/5.0'}) - response = urllib.request.urlopen(request, timeout=120, context=ssl_context) - os.makedirs(data_dir, exist_ok=True) - with open(csv_path, 'wb') as f: - f.write(response.read()) - print(f"下载完成: {csv_path}") - return True - except Exception as e: - print(f"下载失败: {e}") - return False - - -def load_raw_data(data_dir): - """加载原始数据""" - csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') - texts, labels = [], [] - - with open(csv_path, 'r', encoding='utf-8') as f: - reader = csv.reader(f) - for row in reader: - if len(row) < 2: - continue - try: - label = int(row[0]) - review = row[1].strip() - if review: - texts.append(review) - labels.append(label) - except (ValueError, IndexError): - continue - - return texts, np.array(labels) - - -def tokenize(text): - """中文分词""" - text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text) - words = jieba.lcut(text) - return [w for w in words if len(w) > 1] - - -# ==================== 向量化器 ==================== - -class BaseVectorizer: - """向量化器基类""" - def fit(self, texts): pass - def transform(self, texts): pass - def fit_transform(self, texts): pass - - -class BoWVectorizer(BaseVectorizer): - """ - 词袋模型 (Bag of Words) - - 原理:统计每个词在文本中出现的次数 - 向量维度 = 词表大小 - 每个维度 = 该词在本文本中出现的次数 - """ - - def __init__(self, max_features, max_seq_len): - self.max_features = max_features - self.max_seq_len = max_seq_len - self.vocab = {} - self.doc_freq = {} # 文档频率 - self.num_docs = 0 - - def fit(self, texts): - """构建词表(基于词频)""" - counter = Counter() - doc_counter = Counter() # 统计包含该词的文档数 - - for text in texts: - words = tokenize(text) - unique_words = set(words) - counter.update(words) - for w in unique_words: - doc_counter[w] += 1 - - self.num_docs = len(texts) - - # 取最高频的词 - most_common = counter.most_common(self.max_features) - self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)} - - # 记录文档频率(用于TF-IDF) - self.doc_freq = {w: doc_counter[w] for w in self.vocab} - - print(f" BoW词表大小: {len(self.vocab)}") - return self - - def transform(self, texts): - """将文本转换为词频向量""" - vectors = [] - for text in texts: - words = tokenize(text) - freq = [0] * self.max_seq_len - for i, word in enumerate(words[:self.max_seq_len]): - if word in self.vocab: - freq[i] = 1 # 二值(出现=1,不出现=0) - vectors.append(freq) - return np.array(vectors, dtype=np.float32) - - def fit_transform(self, texts): - self.fit(texts) - return self.transform(texts) - - -class TFIDFVectorizer(BaseVectorizer): - """ - TF-IDF 向量器 - - 原理: - - TF(词频) = 词在本文本中出现的次数 - - IDF(逆文档频率) = log(总文档数 / 包含该词的文档数) - - TF-IDF = TF × IDF - - 优势: - - 降低常见无意义词的权重(如"的"、"是") - - 提升罕见但有信息量的词 - """ - - def __init__(self, max_features, max_seq_len): - self.max_features = max_features - self.max_seq_len = max_seq_len - self.vocab = {} - self.idf = {} # 存储每个词的IDF值 - self.num_docs = 0 - - def fit(self, texts): - """构建词表并计算IDF""" - counter = Counter() - doc_counter = Counter() - - for text in texts: - words = tokenize(text) - unique_words = set(words) - counter.update(words) - for w in unique_words: - doc_counter[w] += 1 - - self.num_docs = len(texts) - - # 计算每个词的IDF - # IDF = log(总文档数 / 包含该词的文档数) - idf_values = {} - for word, df in doc_counter.items(): - idf_values[word] = math.log(self.num_docs / (df + 1)) + 1 # 加1防零 - - # 取IDF值最高的词(信息量最大的词) - sorted_words = sorted(idf_values.items(), key=lambda x: x[1], reverse=True) - self.vocab = {word: idx for idx, (word, _) in enumerate(sorted_words[:self.max_features])} - - # 保存IDF值 - self.idf = {word: idf_values[word] for word in self.vocab} - - print(f" TF-IDF词表大小: {len(self.vocab)}") - print(f" 平均IDF: {np.mean(list(self.idf.values())):.3f}") - return self - - def transform(self, texts): - """将文本转换为TF-IDF向量""" - vectors = [] - for text in texts: - words = tokenize(text) - - # 计算TF - tf = Counter(words) - tf_sum = len(words) if words else 1 - - # 生成向量 - vec = [0.0] * self.max_seq_len - for i, word in enumerate(words[:self.max_seq_len]): - if word in self.vocab: - # TF × IDF - vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0) - vectors.append(vec) - - return np.array(vectors, dtype=np.float32) - - def fit_transform(self, texts): - self.fit(texts) - return self.transform(texts) - - -def load_data(data_dir, max_features, max_seq_len, vectorizer_type='tfidf'): - """ - 加载并向量化数据 - - 参数: - - vectorizer_type: 'tfidf' 或 'bow' - """ - if not download_dataset(data_dir): - raise RuntimeError("数据加载失败,请检查网络或手动下载数据集") - - print("正在加载数据...") - texts, labels = load_raw_data(data_dir) - print(f"总评论数: {len(texts)}, 正面: {sum(labels)}, 负面: {len(labels) - sum(labels)}") - - # 选择向量化器 - if vectorizer_type == 'tfidf': - vectorizer = TFIDFVectorizer(max_features, max_seq_len) - vec_name = "TF-IDF" - else: - vectorizer = BoWVectorizer(max_features, max_seq_len) - vec_name = "BoW" - - print(f"正在使用{vec_name}向量化...") - X = vectorizer.fit_transform(texts) - y = labels - - # 打乱并划分 - np.random.seed(42) - indices = np.random.permutation(len(X)) - X = X[indices] - y = y[indices] - - split_idx = int(len(X) * 0.8) - X_train, X_test = X[:split_idx], X[split_idx:] - y_train, y_test = y[:split_idx], y[split_idx:] - - print(f"训练集: {len(X_train)}条, 测试集: {len(X_test)}条") - - return X_train, y_train, X_test, y_test, vectorizer - - -if __name__ == '__main__': - # 测试 - print("=" * 60) - print("测试 TF-IDF 向量化") - print("=" * 60) - X_train, y_train, X_test, y_test, vec = load_data( - 'data/ChnSentiCorp', max_features=3000, max_seq_len=100, - vectorizer_type='tfidf' - ) - print(f"\nX_train shape: {X_train.shape}") - print(f"X_train sample (前5个特征): {X_train[0][:5]}") +# -*- coding: utf-8 -*- +""" +数据加载与向量化模块 + +支持两种向量化方法: +1. BoW (Bag of Words) - 词频向量 +2. TF-IDF - 词频-逆文档频率向量 + +TF-IDF 的优势: +- 降低常见词(如"的"、"是")的权重 +- 提升罕见词的信息量 +- 通常效果优于简单BoW +""" + +import os +import re +import csv +import math +import jieba +import numpy as np +from collections import Counter + +try: + import urllib.request + import ssl + DOWNLOAD_AVAILABLE = True +except ImportError: + DOWNLOAD_AVAILABLE = False + + +DATASET_URL = "https://raw.githubusercontent.com/SophonPlus/ChineseNlpCorpus/master/datasets/ChnSentiCorp_htl_all/ChnSentiCorp_htl_all.csv" + + +def download_dataset(data_dir): + """下载数据集(如果不存在)""" + csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') + + if os.path.exists(csv_path): + print(f"数据已存在: {csv_path}") + return True + + if not DOWNLOAD_AVAILABLE: + return False + + print("正在下载数据集...") + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + + try: + request = urllib.request.Request(DATASET_URL, headers={'User-Agent': 'Mozilla/5.0'}) + response = urllib.request.urlopen(request, timeout=120, context=ssl_context) + os.makedirs(data_dir, exist_ok=True) + with open(csv_path, 'wb') as f: + f.write(response.read()) + print(f"下载完成: {csv_path}") + return True + except Exception as e: + print(f"下载失败: {e}") + return False + + +def load_raw_data(data_dir): + """加载原始数据""" + csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') + texts, labels = [], [] + + with open(csv_path, 'r', encoding='utf-8') as f: + reader = csv.reader(f) + for row in reader: + if len(row) < 2: + continue + try: + label = int(row[0]) + review = row[1].strip() + if review: + texts.append(review) + labels.append(label) + except (ValueError, IndexError): + continue + + return texts, np.array(labels) + + +def tokenize(text): + """中文分词""" + text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text) + words = jieba.lcut(text) + return [w for w in words if len(w) > 1] + + +# ==================== 向量化器 ==================== + +class BaseVectorizer: + """向量化器基类""" + def fit(self, texts): pass + def transform(self, texts): pass + def fit_transform(self, texts): pass + + +class BoWVectorizer(BaseVectorizer): + """ + 词袋模型 (Bag of Words) + + 原理:统计每个词在文本中出现的次数 + 向量维度 = 词表大小 + 每个维度 = 该词在本文本中出现的次数 + """ + + def __init__(self, max_features, max_seq_len): + self.max_features = max_features + self.max_seq_len = max_seq_len + self.vocab = {} + self.doc_freq = {} # 文档频率 + self.num_docs = 0 + + def fit(self, texts): + """构建词表(基于词频)""" + counter = Counter() + doc_counter = Counter() # 统计包含该词的文档数 + + for text in texts: + words = tokenize(text) + unique_words = set(words) + counter.update(words) + for w in unique_words: + doc_counter[w] += 1 + + self.num_docs = len(texts) + + # 取最高频的词 + most_common = counter.most_common(self.max_features) + self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)} + + # 记录文档频率(用于TF-IDF) + self.doc_freq = {w: doc_counter[w] for w in self.vocab} + + print(f" BoW词表大小: {len(self.vocab)}") + return self + + def transform(self, texts): + """将文本转换为词频向量""" + vectors = [] + for text in texts: + words = tokenize(text) + freq = [0] * self.max_seq_len + for i, word in enumerate(words[:self.max_seq_len]): + if word in self.vocab: + freq[i] = 1 # 二值(出现=1,不出现=0) + vectors.append(freq) + return np.array(vectors, dtype=np.float32) + + def fit_transform(self, texts): + self.fit(texts) + return self.transform(texts) + + +class TFIDFVectorizer(BaseVectorizer): + """ + TF-IDF 向量器 + + 原理: + - TF(词频) = 词在本文本中出现的次数 + - IDF(逆文档频率) = log(总文档数 / 包含该词的文档数) + - TF-IDF = TF × IDF + + 优势: + - 降低常见无意义词的权重(如"的"、"是") + - 提升罕见但有信息量的词 + """ + + def __init__(self, max_features, max_seq_len): + self.max_features = max_features + self.max_seq_len = max_seq_len + self.vocab = {} + self.idf = {} # 存储每个词的IDF值 + self.num_docs = 0 + + def fit(self, texts): + """构建词表并计算IDF""" + counter = Counter() + doc_counter = Counter() + + for text in texts: + words = tokenize(text) + unique_words = set(words) + counter.update(words) + for w in unique_words: + doc_counter[w] += 1 + + self.num_docs = len(texts) + + # 计算每个词的IDF + # IDF = log(总文档数 / 包含该词的文档数) + idf_values = {} + for word, df in doc_counter.items(): + idf_values[word] = math.log(self.num_docs / (df + 1)) + 1 # 加1防零 + + # 取IDF值最高的词(信息量最大的词) + sorted_words = sorted(idf_values.items(), key=lambda x: x[1], reverse=True) + self.vocab = {word: idx for idx, (word, _) in enumerate(sorted_words[:self.max_features])} + + # 保存IDF值 + self.idf = {word: idf_values[word] for word in self.vocab} + + print(f" TF-IDF词表大小: {len(self.vocab)}") + print(f" 平均IDF: {np.mean(list(self.idf.values())):.3f}") + return self + + def transform(self, texts): + """将文本转换为TF-IDF向量""" + vectors = [] + for text in texts: + words = tokenize(text) + + # 计算TF + tf = Counter(words) + tf_sum = len(words) if words else 1 + + # 生成向量 + vec = [0.0] * self.max_seq_len + for i, word in enumerate(words[:self.max_seq_len]): + if word in self.vocab: + # TF × IDF + vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0) + vectors.append(vec) + + return np.array(vectors, dtype=np.float32) + + def fit_transform(self, texts): + self.fit(texts) + return self.transform(texts) + + +def load_data(data_dir, max_features, max_seq_len, vectorizer_type='tfidf'): + """ + 加载并向量化数据 + + 参数: + - vectorizer_type: 'tfidf' 或 'bow' + """ + if not download_dataset(data_dir): + raise RuntimeError("数据加载失败,请检查网络或手动下载数据集") + + print("正在加载数据...") + texts, labels = load_raw_data(data_dir) + print(f"总评论数: {len(texts)}, 正面: {sum(labels)}, 负面: {len(labels) - sum(labels)}") + + # 选择向量化器 + if vectorizer_type == 'tfidf': + vectorizer = TFIDFVectorizer(max_features, max_seq_len) + vec_name = "TF-IDF" + else: + vectorizer = BoWVectorizer(max_features, max_seq_len) + vec_name = "BoW" + + print(f"正在使用{vec_name}向量化...") + X = vectorizer.fit_transform(texts) + y = labels + + # 打乱并划分 + np.random.seed(42) + indices = np.random.permutation(len(X)) + X = X[indices] + y = y[indices] + + split_idx = int(len(X) * 0.8) + X_train, X_test = X[:split_idx], X[split_idx:] + y_train, y_test = y[:split_idx], y[split_idx:] + + print(f"训练集: {len(X_train)}条, 测试集: {len(X_test)}条") + + return X_train, y_train, X_test, y_test, vectorizer + + +if __name__ == '__main__': + # 测试 + print("=" * 60) + print("测试 TF-IDF 向量化") + print("=" * 60) + X_train, y_train, X_test, y_test, vec = load_data( + 'data/ChnSentiCorp', max_features=3000, max_seq_len=100, + vectorizer_type='tfidf' + ) + print(f"\nX_train shape: {X_train.shape}") + print(f"X_train sample (前5个特征): {X_train[0][:5]}") diff --git a/main.py b/main.py index eaeaadc..2dbbe1c 100644 --- a/main.py +++ b/main.py @@ -1,34 +1,34 @@ -# -*- coding: utf-8 -*- -""" -主程序入口 - -使用方式: - -1. 运行单个模型(默认): - python main.py - - 修改 config.py 中的 MODEL_TYPE 和 VECTORIZER_TYPE 来切换配置 - -2. 运行对比实验: - 修改 config.py 中 RUN_COMPARISON = True - - 这会依次运行: - - 实验1: BoW vs TF-IDF (固定LR模型) - - 实验2: LR vs MLP (固定TF-IDF) - - 实验3: 不同学习率对比 - - 实验4: 不同隐藏层大小对比 - - 最后输出汇总报告 -""" - -from train import main - -if __name__ == '__main__': - print("\n" + "=" * 70) - print("文本分类实验 - 纯NumPy实现") - print("数据集: ChnSentiCorp (中文酒店评论)") - print("模型: Logistic Regression / MLP") - print("向量化: BoW / TF-IDF") - print("=" * 70 + "\n") - - main() +# -*- coding: utf-8 -*- +""" +主程序入口 + +使用方式: + +1. 运行单个模型(默认): + python main.py + + 修改 config.py 中的 MODEL_TYPE 和 VECTORIZER_TYPE 来切换配置 + +2. 运行对比实验: + 修改 config.py 中 RUN_COMPARISON = True + + 这会依次运行: + - 实验1: BoW vs TF-IDF (固定LR模型) + - 实验2: LR vs MLP (固定TF-IDF) + - 实验3: 不同学习率对比 + - 实验4: 不同隐藏层大小对比 + + 最后输出汇总报告 +""" + +from train import main + +if __name__ == '__main__': + print("\n" + "=" * 70) + print("文本分类实验 - 纯NumPy实现") + print("数据集: ChnSentiCorp (中文酒店评论)") + print("模型: Logistic Regression / MLP") + print("向量化: BoW / TF-IDF") + print("=" * 70 + "\n") + + main() diff --git a/model_numpy.py b/model_numpy.py index e8d7adf..cdec28a 100644 --- a/model_numpy.py +++ b/model_numpy.py @@ -1,342 +1,342 @@ -# -*- coding: utf-8 -*- -""" -模型模块 - 纯NumPy实现 - -支持两种模型: -1. Logistic Regression(逻辑回归)- 线性模型 -2. MLP(多层感知机)- 两层全连接网络 - -设计思路: -- 两种模型都共享相同的接口,方便对比 -- 代码简洁,每行都有详细注释 -- 手动实现反向传播,原理透明 -""" - -import numpy as np - - -class BaseModel: - """模型基类""" - def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): pass - def predict(self, X): pass - def predict_proba(self, X): pass - def accuracy(self, X, y): pass - - -class LogisticRegression(BaseModel): - """ - 逻辑回归(线性分类器) - - 结构:输入 → 线性变换 → Softmax → 输出 - - 原理: - - 线性变换: z = X @ W + b - - Softmax: 将线性输出转为概率分布 - - 参数量:input_size × num_classes + num_classes - """ - - def __init__(self, input_size, num_classes=2, learning_rate=0.1, - class_weight=None, seed=42): - np.random.seed(seed) - - # 权重初始化(Xavier) - self.W = np.random.randn(input_size, num_classes) * np.sqrt(2.0 / input_size) - self.b = np.zeros(num_classes) - - self.lr = learning_rate - self.input_size = input_size - self.num_classes = num_classes - self.class_weight = class_weight # 类别权重 - - total_params = input_size * num_classes + num_classes - print(f"LogisticRegression: {input_size} -> {num_classes}, 参数量: {total_params}") - - def softmax(self, x): - """Softmax函数""" - x_shifted = x - np.max(x, axis=1, keepdims=True) - exp_x = np.exp(x_shifted) - return exp_x / np.sum(exp_x, axis=1, keepdims=True) - - def forward(self, X): - """前向传播""" - # 线性变换 - z = X @ self.W + self.b - # Softmax输出概率 - return self.softmax(z) - - def backward(self, X, y): - """反向传播(梯度下降)""" - batch_size = X.shape[0] - probs = self.forward(X) - - # Softmax + 交叉熵梯度 - d_z = probs.copy() - - # 应用类别权重:减去权重值而不是1 - # 公式: dL/dz_y = w_y * (p_y - 1) = w_y*p_y - w_y - if self.class_weight is not None: - for i in range(batch_size): - d_z[i, y[i]] -= self.class_weight[y[i]] - else: - d_z[np.arange(batch_size), y] -= 1 - - # 梯度 - d_W = X.T @ d_z - d_b = np.sum(d_z, axis=0) - - # 更新 - self.W -= self.lr * d_W / batch_size - self.b -= self.lr * d_b / batch_size - - def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): - """训练""" - num_samples = len(X) - num_batches = (num_samples + batch_size - 1) // batch_size - - for epoch in range(epochs): - # 打乱 - indices = np.random.permutation(num_samples) - X_shuffled = X[indices] - y_shuffled = y[indices] - - epoch_loss = 0 - for batch_idx in range(num_batches): - start = batch_idx * batch_size - end = min(start + batch_size, num_samples) - X_batch = X_shuffled[start:end] - y_batch = y_shuffled[start:end] - - # 前向 + 反向 - probs = self.forward(X_batch) - self.backward(X_batch, y_batch) - - # 损失 - loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) - epoch_loss += loss - - # 评估 - if verbose and (epoch + 1) % 20 == 0: - train_acc = self.accuracy(X, y) - msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" - if X_val is not None: - val_acc = self.accuracy(X_val, y_val) - msg += f" | 测试准确率: {val_acc:.4f}" - print(msg) - - return self - - def predict(self, X): - return np.argmax(self.forward(X), axis=1) - - def predict_proba(self, X): - return self.forward(X) - - def accuracy(self, X, y): - return np.mean(self.predict(X) == y) - - def save(self, filepath): - """保存模型权重""" - np.save(filepath + '_W.npy', self.W) - np.save(filepath + '_b.npy', self.b) - print(f"模型已保存: {filepath}") - - @staticmethod - def load(filepath, input_size, num_classes=2, learning_rate=0.1): - """加载模型权重""" - model = LogisticRegression(input_size, num_classes, learning_rate) - model.W = np.load(filepath + '_W.npy') - model.b = np.load(filepath + '_b.npy') - print(f"模型已加载: {filepath}") - return model - - -class MLP(BaseModel): - """ - 多层感知机(神经网络) - - 结构:输入 → 线性变换 → ReLU → 线性变换 → Softmax → 输出 - - 和LogisticRegression的区别: - - 多了一层隐藏层 + 非线性激活 - - 可以学习非线性关系 - - 参数量更大 - - 参数量: - - W1: input_size × hidden_size - - b1: hidden_size - - W2: hidden_size × num_classes - - b2: num_classes - """ - - def __init__(self, input_size, hidden_size=64, num_classes=2, - learning_rate=0.1, keep_prob=1.0, class_weight=None, seed=42): - np.random.seed(seed) - - # 第一层权重 - self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size) - self.b1 = np.zeros(hidden_size) - - # 第二层权重 - self.W2 = np.random.randn(hidden_size, num_classes) * np.sqrt(2.0 / hidden_size) - self.b2 = np.zeros(num_classes) - - self.lr = learning_rate - self.keep_prob = keep_prob - self.hidden_size = hidden_size - self.input_size = input_size - self.num_classes = num_classes - self.class_weight = class_weight # 类别权重 - - total_params = (input_size * hidden_size + hidden_size + - hidden_size * num_classes + num_classes) - print(f"MLP: {input_size} -> {hidden_size} -> {num_classes}, 参数量: {total_params}") - - def relu(self, x): - """ReLU激活""" - return np.maximum(0, x) - - def relu_derivative(self, x): - """ReLU导数""" - return (x > 0).astype(float) - - def softmax(self, x): - """Softmax函数""" - x_shifted = x - np.max(x, axis=1, keepdims=True) - exp_x = np.exp(x_shifted) - return exp_x / np.sum(exp_x, axis=1, keepdims=True) - - def forward(self, X): - """前向传播""" - # 第一层 - self.z1 = X @ self.W1 + self.b1 - self.a1 = self.relu(self.z1) - - # Dropout(训练时) - if self.keep_prob < 1.0 and hasattr(self, 'training'): - self.d1 = (np.random.rand(*self.a1.shape) < self.keep_prob).astype(float) - self.a1 *= self.d1 - self.a1 /= self.keep_prob - - # 第二层 - self.z2 = self.a1 @ self.W2 + self.b2 - self.probs = self.softmax(self.z2) - - return self.probs - - def backward(self, X, y): - """反向传播""" - batch_size = X.shape[0] - - # 输出层梯度 - d_z2 = self.probs.copy() - - # 应用类别权重 - if self.class_weight is not None: - for i in range(batch_size): - d_z2[i, y[i]] -= self.class_weight[y[i]] - else: - d_z2[np.arange(batch_size), y] -= 1 - - # 第二层梯度 - d_W2 = self.a1.T @ d_z2 - d_b2 = np.sum(d_z2, axis=0) - - # 隐藏层梯度 - d_a1 = d_z2 @ self.W2.T - d_z1 = d_a1 * self.relu_derivative(self.z1) - - # Dropout梯度 - if self.keep_prob < 1.0 and hasattr(self, 'd1'): - d_z1 *= self.d1 - d_z1 /= self.keep_prob - - # 第一层梯度 - d_W1 = X.T @ d_z1 - d_b1 = np.sum(d_z1, axis=0) - - # 更新 - self.W1 -= self.lr * d_W1 / batch_size - self.b1 -= self.lr * d_b1 / batch_size - self.W2 -= self.lr * d_W2 / batch_size - self.b2 -= self.lr * d_b2 / batch_size - - def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): - """训练""" - num_samples = len(X) - num_batches = (num_samples + batch_size - 1) // batch_size - - for epoch in range(epochs): - # 打乱 - indices = np.random.permutation(num_samples) - X_shuffled = X[indices] - y_shuffled = y[indices] - - epoch_loss = 0 - self.training = True # 开启Dropout - - for batch_idx in range(num_batches): - start = batch_idx * batch_size - end = min(start + batch_size, num_samples) - X_batch = X_shuffled[start:end] - y_batch = y_shuffled[start:end] - - # 前向 + 反向 - probs = self.forward(X_batch) - self.backward(X_batch, y_batch) - - # 损失 - loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) - epoch_loss += loss - - self.training = False # 关闭Dropout - - # 评估 - if verbose and (epoch + 1) % 20 == 0: - train_acc = self.accuracy(X, y) - msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" - if X_val is not None: - val_acc = self.accuracy(X_val, y_val) - msg += f" | 测试准确率: {val_acc:.4f}" - print(msg) - - return self - - def predict(self, X): - return np.argmax(self.forward(X), axis=1) - - def predict_proba(self, X): - return self.forward(X) - - def accuracy(self, X, y): - return np.mean(self.predict(X) == y) - - def save(self, filepath): - """保存模型权重""" - np.save(filepath + '_W1.npy', self.W1) - np.save(filepath + '_b1.npy', self.b1) - np.save(filepath + '_W2.npy', self.W2) - np.save(filepath + '_b2.npy', self.b2) - print(f"模型已保存: {filepath}") - - @staticmethod - def load(filepath, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0): - """加载模型权重""" - model = MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob) - model.W1 = np.load(filepath + '_W1.npy') - model.b1 = np.load(filepath + '_b1.npy') - model.W2 = np.load(filepath + '_W2.npy') - model.b2 = np.load(filepath + '_b2.npy') - print(f"模型已加载: {filepath}") - return model - - -def create_model(model_type, input_size, hidden_size=64, num_classes=2, - learning_rate=0.1, keep_prob=1.0, class_weight=None): - """工厂函数:创建模型""" - if model_type == 'lr': - return LogisticRegression(input_size, num_classes, learning_rate, class_weight) - elif model_type == 'mlp': - return MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob, class_weight) - else: - raise ValueError(f"未知模型类型: {model_type}") +# -*- coding: utf-8 -*- +""" +模型模块 - 纯NumPy实现 + +支持两种模型: +1. Logistic Regression(逻辑回归)- 线性模型 +2. MLP(多层感知机)- 两层全连接网络 + +设计思路: +- 两种模型都共享相同的接口,方便对比 +- 代码简洁,每行都有详细注释 +- 手动实现反向传播,原理透明 +""" + +import numpy as np + + +class BaseModel: + """模型基类""" + def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): pass + def predict(self, X): pass + def predict_proba(self, X): pass + def accuracy(self, X, y): pass + + +class LogisticRegression(BaseModel): + """ + 逻辑回归(线性分类器) + + 结构:输入 → 线性变换 → Softmax → 输出 + + 原理: + - 线性变换: z = X @ W + b + - Softmax: 将线性输出转为概率分布 + + 参数量:input_size × num_classes + num_classes + """ + + def __init__(self, input_size, num_classes=2, learning_rate=0.1, + class_weight=None, seed=42): + np.random.seed(seed) + + # 权重初始化(Xavier) + self.W = np.random.randn(input_size, num_classes) * np.sqrt(2.0 / input_size) + self.b = np.zeros(num_classes) + + self.lr = learning_rate + self.input_size = input_size + self.num_classes = num_classes + self.class_weight = class_weight # 类别权重 + + total_params = input_size * num_classes + num_classes + print(f"LogisticRegression: {input_size} -> {num_classes}, 参数量: {total_params}") + + def softmax(self, x): + """Softmax函数""" + x_shifted = x - np.max(x, axis=1, keepdims=True) + exp_x = np.exp(x_shifted) + return exp_x / np.sum(exp_x, axis=1, keepdims=True) + + def forward(self, X): + """前向传播""" + # 线性变换 + z = X @ self.W + self.b + # Softmax输出概率 + return self.softmax(z) + + def backward(self, X, y): + """反向传播(梯度下降)""" + batch_size = X.shape[0] + probs = self.forward(X) + + # Softmax + 交叉熵梯度 + d_z = probs.copy() + + # 应用类别权重:减去权重值而不是1 + # 公式: dL/dz_y = w_y * (p_y - 1) = w_y*p_y - w_y + if self.class_weight is not None: + for i in range(batch_size): + d_z[i, y[i]] -= self.class_weight[y[i]] + else: + d_z[np.arange(batch_size), y] -= 1 + + # 梯度 + d_W = X.T @ d_z + d_b = np.sum(d_z, axis=0) + + # 更新 + self.W -= self.lr * d_W / batch_size + self.b -= self.lr * d_b / batch_size + + def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): + """训练""" + num_samples = len(X) + num_batches = (num_samples + batch_size - 1) // batch_size + + for epoch in range(epochs): + # 打乱 + indices = np.random.permutation(num_samples) + X_shuffled = X[indices] + y_shuffled = y[indices] + + epoch_loss = 0 + for batch_idx in range(num_batches): + start = batch_idx * batch_size + end = min(start + batch_size, num_samples) + X_batch = X_shuffled[start:end] + y_batch = y_shuffled[start:end] + + # 前向 + 反向 + probs = self.forward(X_batch) + self.backward(X_batch, y_batch) + + # 损失 + loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) + epoch_loss += loss + + # 评估 + if verbose and (epoch + 1) % 20 == 0: + train_acc = self.accuracy(X, y) + msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" + if X_val is not None: + val_acc = self.accuracy(X_val, y_val) + msg += f" | 测试准确率: {val_acc:.4f}" + print(msg) + + return self + + def predict(self, X): + return np.argmax(self.forward(X), axis=1) + + def predict_proba(self, X): + return self.forward(X) + + def accuracy(self, X, y): + return np.mean(self.predict(X) == y) + + def save(self, filepath): + """保存模型权重""" + np.save(filepath + '_W.npy', self.W) + np.save(filepath + '_b.npy', self.b) + print(f"模型已保存: {filepath}") + + @staticmethod + def load(filepath, input_size, num_classes=2, learning_rate=0.1): + """加载模型权重""" + model = LogisticRegression(input_size, num_classes, learning_rate) + model.W = np.load(filepath + '_W.npy') + model.b = np.load(filepath + '_b.npy') + print(f"模型已加载: {filepath}") + return model + + +class MLP(BaseModel): + """ + 多层感知机(神经网络) + + 结构:输入 → 线性变换 → ReLU → 线性变换 → Softmax → 输出 + + 和LogisticRegression的区别: + - 多了一层隐藏层 + 非线性激活 + - 可以学习非线性关系 + - 参数量更大 + + 参数量: + - W1: input_size × hidden_size + - b1: hidden_size + - W2: hidden_size × num_classes + - b2: num_classes + """ + + def __init__(self, input_size, hidden_size=64, num_classes=2, + learning_rate=0.1, keep_prob=1.0, class_weight=None, seed=42): + np.random.seed(seed) + + # 第一层权重 + self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size) + self.b1 = np.zeros(hidden_size) + + # 第二层权重 + self.W2 = np.random.randn(hidden_size, num_classes) * np.sqrt(2.0 / hidden_size) + self.b2 = np.zeros(num_classes) + + self.lr = learning_rate + self.keep_prob = keep_prob + self.hidden_size = hidden_size + self.input_size = input_size + self.num_classes = num_classes + self.class_weight = class_weight # 类别权重 + + total_params = (input_size * hidden_size + hidden_size + + hidden_size * num_classes + num_classes) + print(f"MLP: {input_size} -> {hidden_size} -> {num_classes}, 参数量: {total_params}") + + def relu(self, x): + """ReLU激活""" + return np.maximum(0, x) + + def relu_derivative(self, x): + """ReLU导数""" + return (x > 0).astype(float) + + def softmax(self, x): + """Softmax函数""" + x_shifted = x - np.max(x, axis=1, keepdims=True) + exp_x = np.exp(x_shifted) + return exp_x / np.sum(exp_x, axis=1, keepdims=True) + + def forward(self, X): + """前向传播""" + # 第一层 + self.z1 = X @ self.W1 + self.b1 + self.a1 = self.relu(self.z1) + + # Dropout(训练时) + if self.keep_prob < 1.0 and hasattr(self, 'training'): + self.d1 = (np.random.rand(*self.a1.shape) < self.keep_prob).astype(float) + self.a1 *= self.d1 + self.a1 /= self.keep_prob + + # 第二层 + self.z2 = self.a1 @ self.W2 + self.b2 + self.probs = self.softmax(self.z2) + + return self.probs + + def backward(self, X, y): + """反向传播""" + batch_size = X.shape[0] + + # 输出层梯度 + d_z2 = self.probs.copy() + + # 应用类别权重 + if self.class_weight is not None: + for i in range(batch_size): + d_z2[i, y[i]] -= self.class_weight[y[i]] + else: + d_z2[np.arange(batch_size), y] -= 1 + + # 第二层梯度 + d_W2 = self.a1.T @ d_z2 + d_b2 = np.sum(d_z2, axis=0) + + # 隐藏层梯度 + d_a1 = d_z2 @ self.W2.T + d_z1 = d_a1 * self.relu_derivative(self.z1) + + # Dropout梯度 + if self.keep_prob < 1.0 and hasattr(self, 'd1'): + d_z1 *= self.d1 + d_z1 /= self.keep_prob + + # 第一层梯度 + d_W1 = X.T @ d_z1 + d_b1 = np.sum(d_z1, axis=0) + + # 更新 + self.W1 -= self.lr * d_W1 / batch_size + self.b1 -= self.lr * d_b1 / batch_size + self.W2 -= self.lr * d_W2 / batch_size + self.b2 -= self.lr * d_b2 / batch_size + + def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): + """训练""" + num_samples = len(X) + num_batches = (num_samples + batch_size - 1) // batch_size + + for epoch in range(epochs): + # 打乱 + indices = np.random.permutation(num_samples) + X_shuffled = X[indices] + y_shuffled = y[indices] + + epoch_loss = 0 + self.training = True # 开启Dropout + + for batch_idx in range(num_batches): + start = batch_idx * batch_size + end = min(start + batch_size, num_samples) + X_batch = X_shuffled[start:end] + y_batch = y_shuffled[start:end] + + # 前向 + 反向 + probs = self.forward(X_batch) + self.backward(X_batch, y_batch) + + # 损失 + loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) + epoch_loss += loss + + self.training = False # 关闭Dropout + + # 评估 + if verbose and (epoch + 1) % 20 == 0: + train_acc = self.accuracy(X, y) + msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" + if X_val is not None: + val_acc = self.accuracy(X_val, y_val) + msg += f" | 测试准确率: {val_acc:.4f}" + print(msg) + + return self + + def predict(self, X): + return np.argmax(self.forward(X), axis=1) + + def predict_proba(self, X): + return self.forward(X) + + def accuracy(self, X, y): + return np.mean(self.predict(X) == y) + + def save(self, filepath): + """保存模型权重""" + np.save(filepath + '_W1.npy', self.W1) + np.save(filepath + '_b1.npy', self.b1) + np.save(filepath + '_W2.npy', self.W2) + np.save(filepath + '_b2.npy', self.b2) + print(f"模型已保存: {filepath}") + + @staticmethod + def load(filepath, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0): + """加载模型权重""" + model = MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob) + model.W1 = np.load(filepath + '_W1.npy') + model.b1 = np.load(filepath + '_b1.npy') + model.W2 = np.load(filepath + '_W2.npy') + model.b2 = np.load(filepath + '_b2.npy') + print(f"模型已加载: {filepath}") + return model + + +def create_model(model_type, input_size, hidden_size=64, num_classes=2, + learning_rate=0.1, keep_prob=1.0, class_weight=None): + """工厂函数:创建模型""" + if model_type == 'lr': + return LogisticRegression(input_size, num_classes, learning_rate, class_weight) + elif model_type == 'mlp': + return MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob, class_weight) + else: + raise ValueError(f"未知模型类型: {model_type}") diff --git a/predict.py b/predict.py index c05d86e..b48389b 100644 --- a/predict.py +++ b/predict.py @@ -1,264 +1,264 @@ -# -*- coding: utf-8 -*- -""" -预测脚本 - 加载训练好的模型,测试自定义文本 - -使用方法: - python predict.py - -程序会: -1. 列出已保存的模型 -2. 让学生选择模型 -3. 加载模型和向量化器 -4. 学生输入文本,实时预测情感 -""" - -import os -import re -import sys -import jieba -import numpy as np -import math -import csv -from collections import Counter - -# 添加父目录到路径以便导入 -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from config import DATA_DIR, MAX_FEATURES, MAX_SEQ_LEN, HIDDEN_SIZE - - -def find_saved_models(): - """查找已保存的模型""" - models = {} - for f in os.listdir('.'): - if not f.startswith('model_') or not f.endswith('.npy'): - continue - # MLP: model_mlp_tfidf_W1.npy -> model_mlp_tfidf - # LR: model_lr_bow_W.npy -> model_lr_bow - # 需要精确匹配,避免 tfidf 被截断 - for suffix in ['_W1.npy', '_b1.npy', '_W2.npy', '_b2.npy', '_W.npy', '_b.npy']: - if f.endswith(suffix): - name = f[:-len(suffix)] - models[name] = True - break - return list(models.keys()) - - -def tokenize(text): - """中文分词""" - text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text) - words = jieba.lcut(text) - return [w for w in words if len(w) > 1] - - -class BoWVectorizer: - def __init__(self, max_seq_len, max_features=3000): - self.max_seq_len = max_seq_len - self.max_features = max_features - self.vocab = {} - - def fit(self, texts): - counter = Counter() - for text in texts: - words = tokenize(text) - counter.update(words) - most_common = counter.most_common(self.max_features) - self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)} - return self - - def transform(self, text): - words = tokenize(text) - vec = [0] * self.max_seq_len - for i, word in enumerate(words[:self.max_seq_len]): - if word in self.vocab: - vec[i] = 1 - return np.array([vec], dtype=np.float32) - - -class TFIDFVectorizer: - def __init__(self, max_seq_len, max_features=3000): - self.max_seq_len = max_seq_len - self.max_features = max_features - self.vocab = {} - self.idf = {} - self.num_docs = 0 - - def fit(self, texts): - counter = Counter() - doc_counter = Counter() - for text in texts: - words = tokenize(text) - unique_words = set(words) - counter.update(words) - for w in unique_words: - doc_counter[w] += 1 - - self.num_docs = len(texts) - - # 按词频取最高频的词(和训练时dataset.py一致) - most_common = counter.most_common(self.max_features) - self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)} - - # 计算IDF(只对词表中的词) - self.idf = {} - for word in self.vocab: - df = doc_counter.get(word, 1) - self.idf[word] = math.log(self.num_docs / (df + 1)) + 1 - - return self - - def transform(self, text): - words = tokenize(text) - tf = Counter(words) - tf_sum = len(words) if words else 1 - vec = [0.0] * self.max_seq_len - for i, word in enumerate(words[:self.max_seq_len]): - if word in self.vocab: - vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0) - return np.array([vec], dtype=np.float32) - - -def load_vectorizer(vectorizer_type): - """加载向量化器""" - csv_path = os.path.join(DATA_DIR, 'ChnSentiCorp_htl_all.csv') - if not os.path.exists(csv_path): - print("数据文件不存在,请先运行 main.py 训练模型") - return None - - print("正在加载数据构建词表...") - texts, labels = [], [] - with open(csv_path, 'r', encoding='utf-8') as f: - reader = csv.reader(f) - for row in reader: - if len(row) < 2: - continue - try: - labels.append(int(row[0])) - texts.append(row[1].strip()) - except: - continue - - # 判断向量化器类型 - is_tfidf = 'tfidf' in vectorizer_type.lower() - - if is_tfidf: - vectorizer = TFIDFVectorizer(MAX_SEQ_LEN) - vectorizer.fit(texts) - print(f" TF-IDF词表大小: {len(vectorizer.vocab)}") - else: - vectorizer = BoWVectorizer(MAX_SEQ_LEN) - vectorizer.fit(texts) - print(f" BoW词表大小: {len(vectorizer.vocab)}") - - return vectorizer - - -def load_model(model_name, model_type, input_size, hidden_size): - """加载模型""" - from model_numpy import MLP, LogisticRegression - - if model_type == 'lr': - model = LogisticRegression(input_size, 2, learning_rate=0.05) - model.W = np.load(model_name + '_W.npy') - model.b = np.load(model_name + '_b.npy') - else: # mlp - model = MLP(input_size, hidden_size, 2, learning_rate=0.05, keep_prob=1.0) - model.W1 = np.load(model_name + '_W1.npy') - model.b1 = np.load(model_name + '_b1.npy') - model.W2 = np.load(model_name + '_W2.npy') - model.b2 = np.load(model_name + '_b2.npy') - - return model - - -def predict_text(model, vectorizer, text): - """预测单条文本""" - vec = vectorizer.transform(text) - prob = model.forward(vec)[0] - pred = np.argmax(prob) - label = "正面" if pred == 1 else "负面" - confidence = prob[pred] * 100 - return label, confidence, prob - - -def main(): - print("\n" + "=" * 60) - print("文本情感预测 - 加载已训练模型") - print("=" * 60) - - # 查找已保存的模型 - models = find_saved_models() - - if not models: - print("\n未找到已保存的模型!") - print("请先运行 python main.py 训练模型") - return - - # 让用户选择模型 - print(f"\n已找到 {len(models)} 个模型:") - for i, name in enumerate(models, 1): - print(f" {i}. {name}") - - print(f"\n请选择模型编号 (1-{len(models)}): ", end="", flush=True) - try: - choice = int(sys.stdin.readline().strip()) - if choice < 1 or choice > len(models): - print("无效选择") - return - model_name = models[choice - 1] - except: - print("无效输入") - return - - # 解析模型名称获取类型 - parts = model_name.split('_') - model_type = parts[1] # 'lr' 或 'mlp' - vectorizer_type = parts[2] # 'bow' 或 'tfidf' - - print(f"\n选中的模型: {model_name}") - print(f"模型类型: {model_type.upper()}") - print(f"向量化方式: {vectorizer_type.upper()}") - - # 加载向量化器 - print("\n正在加载向量化器...") - vectorizer = load_vectorizer(vectorizer_type) - if vectorizer is None: - return - - # 加载模型 - print("正在加载模型...") - model = load_model(model_name, model_type, MAX_SEQ_LEN, HIDDEN_SIZE) - print("模型加载成功!") - - # 预测循环 - print("\n" + "=" * 60) - print("开始预测(输入文本后按回车,q退出)") - print("=" * 60) - - while True: - try: - print("\n请输入评论文本: ", end="", flush=True) - text = sys.stdin.readline().strip() - - if text.lower() == 'q': - print("再见!") - break - - if not text: - continue - - label, confidence, prob = predict_text(model, vectorizer, text) - - print(f"\n预测结果: {label}") - print(f"置信度: {confidence:.1f}%") - print(f"详细: 正面概率={prob[1]*100:.1f}%, 负面概率={prob[0]*100:.1f}%") - - except KeyboardInterrupt: - print("\n\n再见!") - break - except Exception as e: - print(f"预测出错: {e}") - - -if __name__ == '__main__': - main() +# -*- coding: utf-8 -*- +""" +预测脚本 - 加载训练好的模型,测试自定义文本 + +使用方法: + python predict.py + +程序会: +1. 列出已保存的模型 +2. 让学生选择模型 +3. 加载模型和向量化器 +4. 学生输入文本,实时预测情感 +""" + +import os +import re +import sys +import jieba +import numpy as np +import math +import csv +from collections import Counter + +# 添加父目录到路径以便导入 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from config import DATA_DIR, MAX_FEATURES, MAX_SEQ_LEN, HIDDEN_SIZE + + +def find_saved_models(): + """查找已保存的模型""" + models = {} + for f in os.listdir('.'): + if not f.startswith('model_') or not f.endswith('.npy'): + continue + # MLP: model_mlp_tfidf_W1.npy -> model_mlp_tfidf + # LR: model_lr_bow_W.npy -> model_lr_bow + # 需要精确匹配,避免 tfidf 被截断 + for suffix in ['_W1.npy', '_b1.npy', '_W2.npy', '_b2.npy', '_W.npy', '_b.npy']: + if f.endswith(suffix): + name = f[:-len(suffix)] + models[name] = True + break + return list(models.keys()) + + +def tokenize(text): + """中文分词""" + text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text) + words = jieba.lcut(text) + return [w for w in words if len(w) > 1] + + +class BoWVectorizer: + def __init__(self, max_seq_len, max_features=3000): + self.max_seq_len = max_seq_len + self.max_features = max_features + self.vocab = {} + + def fit(self, texts): + counter = Counter() + for text in texts: + words = tokenize(text) + counter.update(words) + most_common = counter.most_common(self.max_features) + self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)} + return self + + def transform(self, text): + words = tokenize(text) + vec = [0] * self.max_seq_len + for i, word in enumerate(words[:self.max_seq_len]): + if word in self.vocab: + vec[i] = 1 + return np.array([vec], dtype=np.float32) + + +class TFIDFVectorizer: + def __init__(self, max_seq_len, max_features=3000): + self.max_seq_len = max_seq_len + self.max_features = max_features + self.vocab = {} + self.idf = {} + self.num_docs = 0 + + def fit(self, texts): + counter = Counter() + doc_counter = Counter() + for text in texts: + words = tokenize(text) + unique_words = set(words) + counter.update(words) + for w in unique_words: + doc_counter[w] += 1 + + self.num_docs = len(texts) + + # 按词频取最高频的词(和训练时dataset.py一致) + most_common = counter.most_common(self.max_features) + self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)} + + # 计算IDF(只对词表中的词) + self.idf = {} + for word in self.vocab: + df = doc_counter.get(word, 1) + self.idf[word] = math.log(self.num_docs / (df + 1)) + 1 + + return self + + def transform(self, text): + words = tokenize(text) + tf = Counter(words) + tf_sum = len(words) if words else 1 + vec = [0.0] * self.max_seq_len + for i, word in enumerate(words[:self.max_seq_len]): + if word in self.vocab: + vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0) + return np.array([vec], dtype=np.float32) + + +def load_vectorizer(vectorizer_type): + """加载向量化器""" + csv_path = os.path.join(DATA_DIR, 'ChnSentiCorp_htl_all.csv') + if not os.path.exists(csv_path): + print("数据文件不存在,请先运行 main.py 训练模型") + return None + + print("正在加载数据构建词表...") + texts, labels = [], [] + with open(csv_path, 'r', encoding='utf-8') as f: + reader = csv.reader(f) + for row in reader: + if len(row) < 2: + continue + try: + labels.append(int(row[0])) + texts.append(row[1].strip()) + except: + continue + + # 判断向量化器类型 + is_tfidf = 'tfidf' in vectorizer_type.lower() + + if is_tfidf: + vectorizer = TFIDFVectorizer(MAX_SEQ_LEN) + vectorizer.fit(texts) + print(f" TF-IDF词表大小: {len(vectorizer.vocab)}") + else: + vectorizer = BoWVectorizer(MAX_SEQ_LEN) + vectorizer.fit(texts) + print(f" BoW词表大小: {len(vectorizer.vocab)}") + + return vectorizer + + +def load_model(model_name, model_type, input_size, hidden_size): + """加载模型""" + from model_numpy import MLP, LogisticRegression + + if model_type == 'lr': + model = LogisticRegression(input_size, 2, learning_rate=0.05) + model.W = np.load(model_name + '_W.npy') + model.b = np.load(model_name + '_b.npy') + else: # mlp + model = MLP(input_size, hidden_size, 2, learning_rate=0.05, keep_prob=1.0) + model.W1 = np.load(model_name + '_W1.npy') + model.b1 = np.load(model_name + '_b1.npy') + model.W2 = np.load(model_name + '_W2.npy') + model.b2 = np.load(model_name + '_b2.npy') + + return model + + +def predict_text(model, vectorizer, text): + """预测单条文本""" + vec = vectorizer.transform(text) + prob = model.forward(vec)[0] + pred = np.argmax(prob) + label = "正面" if pred == 1 else "负面" + confidence = prob[pred] * 100 + return label, confidence, prob + + +def main(): + print("\n" + "=" * 60) + print("文本情感预测 - 加载已训练模型") + print("=" * 60) + + # 查找已保存的模型 + models = find_saved_models() + + if not models: + print("\n未找到已保存的模型!") + print("请先运行 python main.py 训练模型") + return + + # 让用户选择模型 + print(f"\n已找到 {len(models)} 个模型:") + for i, name in enumerate(models, 1): + print(f" {i}. {name}") + + print(f"\n请选择模型编号 (1-{len(models)}): ", end="", flush=True) + try: + choice = int(sys.stdin.readline().strip()) + if choice < 1 or choice > len(models): + print("无效选择") + return + model_name = models[choice - 1] + except: + print("无效输入") + return + + # 解析模型名称获取类型 + parts = model_name.split('_') + model_type = parts[1] # 'lr' 或 'mlp' + vectorizer_type = parts[2] # 'bow' 或 'tfidf' + + print(f"\n选中的模型: {model_name}") + print(f"模型类型: {model_type.upper()}") + print(f"向量化方式: {vectorizer_type.upper()}") + + # 加载向量化器 + print("\n正在加载向量化器...") + vectorizer = load_vectorizer(vectorizer_type) + if vectorizer is None: + return + + # 加载模型 + print("正在加载模型...") + model = load_model(model_name, model_type, MAX_SEQ_LEN, HIDDEN_SIZE) + print("模型加载成功!") + + # 预测循环 + print("\n" + "=" * 60) + print("开始预测(输入文本后按回车,q退出)") + print("=" * 60) + + while True: + try: + print("\n请输入评论文本: ", end="", flush=True) + text = sys.stdin.readline().strip() + + if text.lower() == 'q': + print("再见!") + break + + if not text: + continue + + label, confidence, prob = predict_text(model, vectorizer, text) + + print(f"\n预测结果: {label}") + print(f"置信度: {confidence:.1f}%") + print(f"详细: 正面概率={prob[1]*100:.1f}%, 负面概率={prob[0]*100:.1f}%") + + except KeyboardInterrupt: + print("\n\n再见!") + break + except Exception as e: + print(f"预测出错: {e}") + + +if __name__ == '__main__': + main()