上传文件至 /
This commit is contained in:
80
config.py
80
config.py
@@ -1,40 +1,40 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
配置文件 - 所有超参数集中管理
|
||||
|
||||
设计思路:
|
||||
将超参数分门别类,学生可以单独修改某一类而不会影响其他
|
||||
"""
|
||||
|
||||
# ==================== 数据相关 ====================
|
||||
DATA_DIR = 'data/ChnSentiCorp' # 数据集路径
|
||||
MAX_FEATURES = 3000 # 词表最大容量
|
||||
MAX_SEQ_LEN = 100 # 句子最大长度(词数)
|
||||
VECTORIZER_TYPE = 'tfidf' # 'tfidf' 或 'bow'(向量化方式)
|
||||
|
||||
# ==================== 模型相关 ====================
|
||||
MODEL_TYPE = 'mlp' # 'mlp' 或 'lr'(模型类型)
|
||||
HIDDEN_SIZE = 64 # MLP隐藏层大小(LR忽略)
|
||||
NUM_CLASSES = 2 # 类别数(正面/负面二分类)
|
||||
KEEP_PROB = 1.0 # Dropout保留概率(LR忽略,设为1即可)
|
||||
|
||||
# ==================== 训练相关 ====================
|
||||
LEARNING_RATE = 0.05 # 学习率
|
||||
NUM_EPOCHS = 100 # 训练轮数
|
||||
BATCH_SIZE = 64 # 批次大小
|
||||
|
||||
# ==================== 类别权重(解决数据不平衡问题)====================
|
||||
USE_CLASS_WEIGHT = True # True=启用类别权重, False=不启用(对比用)
|
||||
# 权重计算公式: n_samples / (n_classes * n_class_i)
|
||||
# 正面评论多所以权重小,负面评论少所以权重大
|
||||
CLASS_WEIGHT_POS = 0.73 # 正面类权重(自动计算)
|
||||
CLASS_WEIGHT_NEG = 1.58 # 负面类权重(自动计算)
|
||||
|
||||
# ==================== 实验相关 ====================
|
||||
RUN_COMPARISON = False # True=运行对比实验, False=运行单个模型
|
||||
COMPARE_MODELS = ['lr', 'mlp'] # 要对比的模型列表
|
||||
COMPARE_VECTORS = ['bow', 'tfidf'] # 要对比的向量化方式
|
||||
|
||||
# ==================== 其他 ====================
|
||||
RANDOM_SEED = 42 # 随机种子(保证可复现)
|
||||
VERBOSE = True # 打印详细日志
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
配置文件 - 所有超参数集中管理
|
||||
|
||||
设计思路:
|
||||
将超参数分门别类,学生可以单独修改某一类而不会影响其他
|
||||
"""
|
||||
|
||||
# ==================== 数据相关 ====================
|
||||
DATA_DIR = 'data/ChnSentiCorp' # 数据集路径
|
||||
MAX_FEATURES = 3000 # 词表最大容量
|
||||
MAX_SEQ_LEN = 100 # 句子最大长度(词数)
|
||||
VECTORIZER_TYPE = 'tfidf' # 'tfidf' 或 'bow'(向量化方式)
|
||||
|
||||
# ==================== 模型相关 ====================
|
||||
MODEL_TYPE = 'mlp' # 'mlp' 或 'lr'(模型类型)
|
||||
HIDDEN_SIZE = 64 # MLP隐藏层大小(LR忽略)
|
||||
NUM_CLASSES = 2 # 类别数(正面/负面二分类)
|
||||
KEEP_PROB = 1.0 # Dropout保留概率(LR忽略,设为1即可)
|
||||
|
||||
# ==================== 训练相关 ====================
|
||||
LEARNING_RATE = 0.05 # 学习率
|
||||
NUM_EPOCHS = 100 # 训练轮数
|
||||
BATCH_SIZE = 64 # 批次大小
|
||||
|
||||
# ==================== 类别权重(解决数据不平衡问题)====================
|
||||
USE_CLASS_WEIGHT = True # True=启用类别权重, False=不启用(对比用)
|
||||
# 权重计算公式: n_samples / (n_classes * n_class_i)
|
||||
# 正面评论多所以权重小,负面评论少所以权重大
|
||||
CLASS_WEIGHT_POS = 0.73 # 正面类权重(自动计算)
|
||||
CLASS_WEIGHT_NEG = 1.58 # 负面类权重(自动计算)
|
||||
|
||||
# ==================== 实验相关 ====================
|
||||
RUN_COMPARISON = False # True=运行对比实验, False=运行单个模型
|
||||
COMPARE_MODELS = ['lr', 'mlp'] # 要对比的模型列表
|
||||
COMPARE_VECTORS = ['bow', 'tfidf'] # 要对比的向量化方式
|
||||
|
||||
# ==================== 其他 ====================
|
||||
RANDOM_SEED = 42 # 随机种子(保证可复现)
|
||||
VERBOSE = True # 打印详细日志
|
||||
|
||||
572
dataset.py
572
dataset.py
@@ -1,286 +1,286 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
数据加载与向量化模块
|
||||
|
||||
支持两种向量化方法:
|
||||
1. BoW (Bag of Words) - 词频向量
|
||||
2. TF-IDF - 词频-逆文档频率向量
|
||||
|
||||
TF-IDF 的优势:
|
||||
- 降低常见词(如"的"、"是")的权重
|
||||
- 提升罕见词的信息量
|
||||
- 通常效果优于简单BoW
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import csv
|
||||
import math
|
||||
import jieba
|
||||
import numpy as np
|
||||
from collections import Counter
|
||||
|
||||
try:
|
||||
import urllib.request
|
||||
import ssl
|
||||
DOWNLOAD_AVAILABLE = True
|
||||
except ImportError:
|
||||
DOWNLOAD_AVAILABLE = False
|
||||
|
||||
|
||||
DATASET_URL = "https://raw.githubusercontent.com/SophonPlus/ChineseNlpCorpus/master/datasets/ChnSentiCorp_htl_all/ChnSentiCorp_htl_all.csv"
|
||||
|
||||
|
||||
def download_dataset(data_dir):
|
||||
"""下载数据集(如果不存在)"""
|
||||
csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv')
|
||||
|
||||
if os.path.exists(csv_path):
|
||||
print(f"数据已存在: {csv_path}")
|
||||
return True
|
||||
|
||||
if not DOWNLOAD_AVAILABLE:
|
||||
return False
|
||||
|
||||
print("正在下载数据集...")
|
||||
ssl_context = ssl.create_default_context()
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = ssl.CERT_NONE
|
||||
|
||||
try:
|
||||
request = urllib.request.Request(DATASET_URL, headers={'User-Agent': 'Mozilla/5.0'})
|
||||
response = urllib.request.urlopen(request, timeout=120, context=ssl_context)
|
||||
os.makedirs(data_dir, exist_ok=True)
|
||||
with open(csv_path, 'wb') as f:
|
||||
f.write(response.read())
|
||||
print(f"下载完成: {csv_path}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"下载失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def load_raw_data(data_dir):
|
||||
"""加载原始数据"""
|
||||
csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv')
|
||||
texts, labels = [], []
|
||||
|
||||
with open(csv_path, 'r', encoding='utf-8') as f:
|
||||
reader = csv.reader(f)
|
||||
for row in reader:
|
||||
if len(row) < 2:
|
||||
continue
|
||||
try:
|
||||
label = int(row[0])
|
||||
review = row[1].strip()
|
||||
if review:
|
||||
texts.append(review)
|
||||
labels.append(label)
|
||||
except (ValueError, IndexError):
|
||||
continue
|
||||
|
||||
return texts, np.array(labels)
|
||||
|
||||
|
||||
def tokenize(text):
|
||||
"""中文分词"""
|
||||
text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text)
|
||||
words = jieba.lcut(text)
|
||||
return [w for w in words if len(w) > 1]
|
||||
|
||||
|
||||
# ==================== 向量化器 ====================
|
||||
|
||||
class BaseVectorizer:
|
||||
"""向量化器基类"""
|
||||
def fit(self, texts): pass
|
||||
def transform(self, texts): pass
|
||||
def fit_transform(self, texts): pass
|
||||
|
||||
|
||||
class BoWVectorizer(BaseVectorizer):
|
||||
"""
|
||||
词袋模型 (Bag of Words)
|
||||
|
||||
原理:统计每个词在文本中出现的次数
|
||||
向量维度 = 词表大小
|
||||
每个维度 = 该词在本文本中出现的次数
|
||||
"""
|
||||
|
||||
def __init__(self, max_features, max_seq_len):
|
||||
self.max_features = max_features
|
||||
self.max_seq_len = max_seq_len
|
||||
self.vocab = {}
|
||||
self.doc_freq = {} # 文档频率
|
||||
self.num_docs = 0
|
||||
|
||||
def fit(self, texts):
|
||||
"""构建词表(基于词频)"""
|
||||
counter = Counter()
|
||||
doc_counter = Counter() # 统计包含该词的文档数
|
||||
|
||||
for text in texts:
|
||||
words = tokenize(text)
|
||||
unique_words = set(words)
|
||||
counter.update(words)
|
||||
for w in unique_words:
|
||||
doc_counter[w] += 1
|
||||
|
||||
self.num_docs = len(texts)
|
||||
|
||||
# 取最高频的词
|
||||
most_common = counter.most_common(self.max_features)
|
||||
self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)}
|
||||
|
||||
# 记录文档频率(用于TF-IDF)
|
||||
self.doc_freq = {w: doc_counter[w] for w in self.vocab}
|
||||
|
||||
print(f" BoW词表大小: {len(self.vocab)}")
|
||||
return self
|
||||
|
||||
def transform(self, texts):
|
||||
"""将文本转换为词频向量"""
|
||||
vectors = []
|
||||
for text in texts:
|
||||
words = tokenize(text)
|
||||
freq = [0] * self.max_seq_len
|
||||
for i, word in enumerate(words[:self.max_seq_len]):
|
||||
if word in self.vocab:
|
||||
freq[i] = 1 # 二值(出现=1,不出现=0)
|
||||
vectors.append(freq)
|
||||
return np.array(vectors, dtype=np.float32)
|
||||
|
||||
def fit_transform(self, texts):
|
||||
self.fit(texts)
|
||||
return self.transform(texts)
|
||||
|
||||
|
||||
class TFIDFVectorizer(BaseVectorizer):
|
||||
"""
|
||||
TF-IDF 向量器
|
||||
|
||||
原理:
|
||||
- TF(词频) = 词在本文本中出现的次数
|
||||
- IDF(逆文档频率) = log(总文档数 / 包含该词的文档数)
|
||||
- TF-IDF = TF × IDF
|
||||
|
||||
优势:
|
||||
- 降低常见无意义词的权重(如"的"、"是")
|
||||
- 提升罕见但有信息量的词
|
||||
"""
|
||||
|
||||
def __init__(self, max_features, max_seq_len):
|
||||
self.max_features = max_features
|
||||
self.max_seq_len = max_seq_len
|
||||
self.vocab = {}
|
||||
self.idf = {} # 存储每个词的IDF值
|
||||
self.num_docs = 0
|
||||
|
||||
def fit(self, texts):
|
||||
"""构建词表并计算IDF"""
|
||||
counter = Counter()
|
||||
doc_counter = Counter()
|
||||
|
||||
for text in texts:
|
||||
words = tokenize(text)
|
||||
unique_words = set(words)
|
||||
counter.update(words)
|
||||
for w in unique_words:
|
||||
doc_counter[w] += 1
|
||||
|
||||
self.num_docs = len(texts)
|
||||
|
||||
# 计算每个词的IDF
|
||||
# IDF = log(总文档数 / 包含该词的文档数)
|
||||
idf_values = {}
|
||||
for word, df in doc_counter.items():
|
||||
idf_values[word] = math.log(self.num_docs / (df + 1)) + 1 # 加1防零
|
||||
|
||||
# 取IDF值最高的词(信息量最大的词)
|
||||
sorted_words = sorted(idf_values.items(), key=lambda x: x[1], reverse=True)
|
||||
self.vocab = {word: idx for idx, (word, _) in enumerate(sorted_words[:self.max_features])}
|
||||
|
||||
# 保存IDF值
|
||||
self.idf = {word: idf_values[word] for word in self.vocab}
|
||||
|
||||
print(f" TF-IDF词表大小: {len(self.vocab)}")
|
||||
print(f" 平均IDF: {np.mean(list(self.idf.values())):.3f}")
|
||||
return self
|
||||
|
||||
def transform(self, texts):
|
||||
"""将文本转换为TF-IDF向量"""
|
||||
vectors = []
|
||||
for text in texts:
|
||||
words = tokenize(text)
|
||||
|
||||
# 计算TF
|
||||
tf = Counter(words)
|
||||
tf_sum = len(words) if words else 1
|
||||
|
||||
# 生成向量
|
||||
vec = [0.0] * self.max_seq_len
|
||||
for i, word in enumerate(words[:self.max_seq_len]):
|
||||
if word in self.vocab:
|
||||
# TF × IDF
|
||||
vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0)
|
||||
vectors.append(vec)
|
||||
|
||||
return np.array(vectors, dtype=np.float32)
|
||||
|
||||
def fit_transform(self, texts):
|
||||
self.fit(texts)
|
||||
return self.transform(texts)
|
||||
|
||||
|
||||
def load_data(data_dir, max_features, max_seq_len, vectorizer_type='tfidf'):
|
||||
"""
|
||||
加载并向量化数据
|
||||
|
||||
参数:
|
||||
- vectorizer_type: 'tfidf' 或 'bow'
|
||||
"""
|
||||
if not download_dataset(data_dir):
|
||||
raise RuntimeError("数据加载失败,请检查网络或手动下载数据集")
|
||||
|
||||
print("正在加载数据...")
|
||||
texts, labels = load_raw_data(data_dir)
|
||||
print(f"总评论数: {len(texts)}, 正面: {sum(labels)}, 负面: {len(labels) - sum(labels)}")
|
||||
|
||||
# 选择向量化器
|
||||
if vectorizer_type == 'tfidf':
|
||||
vectorizer = TFIDFVectorizer(max_features, max_seq_len)
|
||||
vec_name = "TF-IDF"
|
||||
else:
|
||||
vectorizer = BoWVectorizer(max_features, max_seq_len)
|
||||
vec_name = "BoW"
|
||||
|
||||
print(f"正在使用{vec_name}向量化...")
|
||||
X = vectorizer.fit_transform(texts)
|
||||
y = labels
|
||||
|
||||
# 打乱并划分
|
||||
np.random.seed(42)
|
||||
indices = np.random.permutation(len(X))
|
||||
X = X[indices]
|
||||
y = y[indices]
|
||||
|
||||
split_idx = int(len(X) * 0.8)
|
||||
X_train, X_test = X[:split_idx], X[split_idx:]
|
||||
y_train, y_test = y[:split_idx], y[split_idx:]
|
||||
|
||||
print(f"训练集: {len(X_train)}条, 测试集: {len(X_test)}条")
|
||||
|
||||
return X_train, y_train, X_test, y_test, vectorizer
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 测试
|
||||
print("=" * 60)
|
||||
print("测试 TF-IDF 向量化")
|
||||
print("=" * 60)
|
||||
X_train, y_train, X_test, y_test, vec = load_data(
|
||||
'data/ChnSentiCorp', max_features=3000, max_seq_len=100,
|
||||
vectorizer_type='tfidf'
|
||||
)
|
||||
print(f"\nX_train shape: {X_train.shape}")
|
||||
print(f"X_train sample (前5个特征): {X_train[0][:5]}")
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
数据加载与向量化模块
|
||||
|
||||
支持两种向量化方法:
|
||||
1. BoW (Bag of Words) - 词频向量
|
||||
2. TF-IDF - 词频-逆文档频率向量
|
||||
|
||||
TF-IDF 的优势:
|
||||
- 降低常见词(如"的"、"是")的权重
|
||||
- 提升罕见词的信息量
|
||||
- 通常效果优于简单BoW
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import csv
|
||||
import math
|
||||
import jieba
|
||||
import numpy as np
|
||||
from collections import Counter
|
||||
|
||||
try:
|
||||
import urllib.request
|
||||
import ssl
|
||||
DOWNLOAD_AVAILABLE = True
|
||||
except ImportError:
|
||||
DOWNLOAD_AVAILABLE = False
|
||||
|
||||
|
||||
DATASET_URL = "https://raw.githubusercontent.com/SophonPlus/ChineseNlpCorpus/master/datasets/ChnSentiCorp_htl_all/ChnSentiCorp_htl_all.csv"
|
||||
|
||||
|
||||
def download_dataset(data_dir):
|
||||
"""下载数据集(如果不存在)"""
|
||||
csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv')
|
||||
|
||||
if os.path.exists(csv_path):
|
||||
print(f"数据已存在: {csv_path}")
|
||||
return True
|
||||
|
||||
if not DOWNLOAD_AVAILABLE:
|
||||
return False
|
||||
|
||||
print("正在下载数据集...")
|
||||
ssl_context = ssl.create_default_context()
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = ssl.CERT_NONE
|
||||
|
||||
try:
|
||||
request = urllib.request.Request(DATASET_URL, headers={'User-Agent': 'Mozilla/5.0'})
|
||||
response = urllib.request.urlopen(request, timeout=120, context=ssl_context)
|
||||
os.makedirs(data_dir, exist_ok=True)
|
||||
with open(csv_path, 'wb') as f:
|
||||
f.write(response.read())
|
||||
print(f"下载完成: {csv_path}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"下载失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def load_raw_data(data_dir):
|
||||
"""加载原始数据"""
|
||||
csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv')
|
||||
texts, labels = [], []
|
||||
|
||||
with open(csv_path, 'r', encoding='utf-8') as f:
|
||||
reader = csv.reader(f)
|
||||
for row in reader:
|
||||
if len(row) < 2:
|
||||
continue
|
||||
try:
|
||||
label = int(row[0])
|
||||
review = row[1].strip()
|
||||
if review:
|
||||
texts.append(review)
|
||||
labels.append(label)
|
||||
except (ValueError, IndexError):
|
||||
continue
|
||||
|
||||
return texts, np.array(labels)
|
||||
|
||||
|
||||
def tokenize(text):
|
||||
"""中文分词"""
|
||||
text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text)
|
||||
words = jieba.lcut(text)
|
||||
return [w for w in words if len(w) > 1]
|
||||
|
||||
|
||||
# ==================== 向量化器 ====================
|
||||
|
||||
class BaseVectorizer:
|
||||
"""向量化器基类"""
|
||||
def fit(self, texts): pass
|
||||
def transform(self, texts): pass
|
||||
def fit_transform(self, texts): pass
|
||||
|
||||
|
||||
class BoWVectorizer(BaseVectorizer):
|
||||
"""
|
||||
词袋模型 (Bag of Words)
|
||||
|
||||
原理:统计每个词在文本中出现的次数
|
||||
向量维度 = 词表大小
|
||||
每个维度 = 该词在本文本中出现的次数
|
||||
"""
|
||||
|
||||
def __init__(self, max_features, max_seq_len):
|
||||
self.max_features = max_features
|
||||
self.max_seq_len = max_seq_len
|
||||
self.vocab = {}
|
||||
self.doc_freq = {} # 文档频率
|
||||
self.num_docs = 0
|
||||
|
||||
def fit(self, texts):
|
||||
"""构建词表(基于词频)"""
|
||||
counter = Counter()
|
||||
doc_counter = Counter() # 统计包含该词的文档数
|
||||
|
||||
for text in texts:
|
||||
words = tokenize(text)
|
||||
unique_words = set(words)
|
||||
counter.update(words)
|
||||
for w in unique_words:
|
||||
doc_counter[w] += 1
|
||||
|
||||
self.num_docs = len(texts)
|
||||
|
||||
# 取最高频的词
|
||||
most_common = counter.most_common(self.max_features)
|
||||
self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)}
|
||||
|
||||
# 记录文档频率(用于TF-IDF)
|
||||
self.doc_freq = {w: doc_counter[w] for w in self.vocab}
|
||||
|
||||
print(f" BoW词表大小: {len(self.vocab)}")
|
||||
return self
|
||||
|
||||
def transform(self, texts):
|
||||
"""将文本转换为词频向量"""
|
||||
vectors = []
|
||||
for text in texts:
|
||||
words = tokenize(text)
|
||||
freq = [0] * self.max_seq_len
|
||||
for i, word in enumerate(words[:self.max_seq_len]):
|
||||
if word in self.vocab:
|
||||
freq[i] = 1 # 二值(出现=1,不出现=0)
|
||||
vectors.append(freq)
|
||||
return np.array(vectors, dtype=np.float32)
|
||||
|
||||
def fit_transform(self, texts):
|
||||
self.fit(texts)
|
||||
return self.transform(texts)
|
||||
|
||||
|
||||
class TFIDFVectorizer(BaseVectorizer):
|
||||
"""
|
||||
TF-IDF 向量器
|
||||
|
||||
原理:
|
||||
- TF(词频) = 词在本文本中出现的次数
|
||||
- IDF(逆文档频率) = log(总文档数 / 包含该词的文档数)
|
||||
- TF-IDF = TF × IDF
|
||||
|
||||
优势:
|
||||
- 降低常见无意义词的权重(如"的"、"是")
|
||||
- 提升罕见但有信息量的词
|
||||
"""
|
||||
|
||||
def __init__(self, max_features, max_seq_len):
|
||||
self.max_features = max_features
|
||||
self.max_seq_len = max_seq_len
|
||||
self.vocab = {}
|
||||
self.idf = {} # 存储每个词的IDF值
|
||||
self.num_docs = 0
|
||||
|
||||
def fit(self, texts):
|
||||
"""构建词表并计算IDF"""
|
||||
counter = Counter()
|
||||
doc_counter = Counter()
|
||||
|
||||
for text in texts:
|
||||
words = tokenize(text)
|
||||
unique_words = set(words)
|
||||
counter.update(words)
|
||||
for w in unique_words:
|
||||
doc_counter[w] += 1
|
||||
|
||||
self.num_docs = len(texts)
|
||||
|
||||
# 计算每个词的IDF
|
||||
# IDF = log(总文档数 / 包含该词的文档数)
|
||||
idf_values = {}
|
||||
for word, df in doc_counter.items():
|
||||
idf_values[word] = math.log(self.num_docs / (df + 1)) + 1 # 加1防零
|
||||
|
||||
# 取IDF值最高的词(信息量最大的词)
|
||||
sorted_words = sorted(idf_values.items(), key=lambda x: x[1], reverse=True)
|
||||
self.vocab = {word: idx for idx, (word, _) in enumerate(sorted_words[:self.max_features])}
|
||||
|
||||
# 保存IDF值
|
||||
self.idf = {word: idf_values[word] for word in self.vocab}
|
||||
|
||||
print(f" TF-IDF词表大小: {len(self.vocab)}")
|
||||
print(f" 平均IDF: {np.mean(list(self.idf.values())):.3f}")
|
||||
return self
|
||||
|
||||
def transform(self, texts):
|
||||
"""将文本转换为TF-IDF向量"""
|
||||
vectors = []
|
||||
for text in texts:
|
||||
words = tokenize(text)
|
||||
|
||||
# 计算TF
|
||||
tf = Counter(words)
|
||||
tf_sum = len(words) if words else 1
|
||||
|
||||
# 生成向量
|
||||
vec = [0.0] * self.max_seq_len
|
||||
for i, word in enumerate(words[:self.max_seq_len]):
|
||||
if word in self.vocab:
|
||||
# TF × IDF
|
||||
vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0)
|
||||
vectors.append(vec)
|
||||
|
||||
return np.array(vectors, dtype=np.float32)
|
||||
|
||||
def fit_transform(self, texts):
|
||||
self.fit(texts)
|
||||
return self.transform(texts)
|
||||
|
||||
|
||||
def load_data(data_dir, max_features, max_seq_len, vectorizer_type='tfidf'):
|
||||
"""
|
||||
加载并向量化数据
|
||||
|
||||
参数:
|
||||
- vectorizer_type: 'tfidf' 或 'bow'
|
||||
"""
|
||||
if not download_dataset(data_dir):
|
||||
raise RuntimeError("数据加载失败,请检查网络或手动下载数据集")
|
||||
|
||||
print("正在加载数据...")
|
||||
texts, labels = load_raw_data(data_dir)
|
||||
print(f"总评论数: {len(texts)}, 正面: {sum(labels)}, 负面: {len(labels) - sum(labels)}")
|
||||
|
||||
# 选择向量化器
|
||||
if vectorizer_type == 'tfidf':
|
||||
vectorizer = TFIDFVectorizer(max_features, max_seq_len)
|
||||
vec_name = "TF-IDF"
|
||||
else:
|
||||
vectorizer = BoWVectorizer(max_features, max_seq_len)
|
||||
vec_name = "BoW"
|
||||
|
||||
print(f"正在使用{vec_name}向量化...")
|
||||
X = vectorizer.fit_transform(texts)
|
||||
y = labels
|
||||
|
||||
# 打乱并划分
|
||||
np.random.seed(42)
|
||||
indices = np.random.permutation(len(X))
|
||||
X = X[indices]
|
||||
y = y[indices]
|
||||
|
||||
split_idx = int(len(X) * 0.8)
|
||||
X_train, X_test = X[:split_idx], X[split_idx:]
|
||||
y_train, y_test = y[:split_idx], y[split_idx:]
|
||||
|
||||
print(f"训练集: {len(X_train)}条, 测试集: {len(X_test)}条")
|
||||
|
||||
return X_train, y_train, X_test, y_test, vectorizer
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 测试
|
||||
print("=" * 60)
|
||||
print("测试 TF-IDF 向量化")
|
||||
print("=" * 60)
|
||||
X_train, y_train, X_test, y_test, vec = load_data(
|
||||
'data/ChnSentiCorp', max_features=3000, max_seq_len=100,
|
||||
vectorizer_type='tfidf'
|
||||
)
|
||||
print(f"\nX_train shape: {X_train.shape}")
|
||||
print(f"X_train sample (前5个特征): {X_train[0][:5]}")
|
||||
|
||||
68
main.py
68
main.py
@@ -1,34 +1,34 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
主程序入口
|
||||
|
||||
使用方式:
|
||||
|
||||
1. 运行单个模型(默认):
|
||||
python main.py
|
||||
|
||||
修改 config.py 中的 MODEL_TYPE 和 VECTORIZER_TYPE 来切换配置
|
||||
|
||||
2. 运行对比实验:
|
||||
修改 config.py 中 RUN_COMPARISON = True
|
||||
|
||||
这会依次运行:
|
||||
- 实验1: BoW vs TF-IDF (固定LR模型)
|
||||
- 实验2: LR vs MLP (固定TF-IDF)
|
||||
- 实验3: 不同学习率对比
|
||||
- 实验4: 不同隐藏层大小对比
|
||||
|
||||
最后输出汇总报告
|
||||
"""
|
||||
|
||||
from train import main
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("\n" + "=" * 70)
|
||||
print("文本分类实验 - 纯NumPy实现")
|
||||
print("数据集: ChnSentiCorp (中文酒店评论)")
|
||||
print("模型: Logistic Regression / MLP")
|
||||
print("向量化: BoW / TF-IDF")
|
||||
print("=" * 70 + "\n")
|
||||
|
||||
main()
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
主程序入口
|
||||
|
||||
使用方式:
|
||||
|
||||
1. 运行单个模型(默认):
|
||||
python main.py
|
||||
|
||||
修改 config.py 中的 MODEL_TYPE 和 VECTORIZER_TYPE 来切换配置
|
||||
|
||||
2. 运行对比实验:
|
||||
修改 config.py 中 RUN_COMPARISON = True
|
||||
|
||||
这会依次运行:
|
||||
- 实验1: BoW vs TF-IDF (固定LR模型)
|
||||
- 实验2: LR vs MLP (固定TF-IDF)
|
||||
- 实验3: 不同学习率对比
|
||||
- 实验4: 不同隐藏层大小对比
|
||||
|
||||
最后输出汇总报告
|
||||
"""
|
||||
|
||||
from train import main
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("\n" + "=" * 70)
|
||||
print("文本分类实验 - 纯NumPy实现")
|
||||
print("数据集: ChnSentiCorp (中文酒店评论)")
|
||||
print("模型: Logistic Regression / MLP")
|
||||
print("向量化: BoW / TF-IDF")
|
||||
print("=" * 70 + "\n")
|
||||
|
||||
main()
|
||||
|
||||
684
model_numpy.py
684
model_numpy.py
@@ -1,342 +1,342 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
模型模块 - 纯NumPy实现
|
||||
|
||||
支持两种模型:
|
||||
1. Logistic Regression(逻辑回归)- 线性模型
|
||||
2. MLP(多层感知机)- 两层全连接网络
|
||||
|
||||
设计思路:
|
||||
- 两种模型都共享相同的接口,方便对比
|
||||
- 代码简洁,每行都有详细注释
|
||||
- 手动实现反向传播,原理透明
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
class BaseModel:
|
||||
"""模型基类"""
|
||||
def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): pass
|
||||
def predict(self, X): pass
|
||||
def predict_proba(self, X): pass
|
||||
def accuracy(self, X, y): pass
|
||||
|
||||
|
||||
class LogisticRegression(BaseModel):
|
||||
"""
|
||||
逻辑回归(线性分类器)
|
||||
|
||||
结构:输入 → 线性变换 → Softmax → 输出
|
||||
|
||||
原理:
|
||||
- 线性变换: z = X @ W + b
|
||||
- Softmax: 将线性输出转为概率分布
|
||||
|
||||
参数量:input_size × num_classes + num_classes
|
||||
"""
|
||||
|
||||
def __init__(self, input_size, num_classes=2, learning_rate=0.1,
|
||||
class_weight=None, seed=42):
|
||||
np.random.seed(seed)
|
||||
|
||||
# 权重初始化(Xavier)
|
||||
self.W = np.random.randn(input_size, num_classes) * np.sqrt(2.0 / input_size)
|
||||
self.b = np.zeros(num_classes)
|
||||
|
||||
self.lr = learning_rate
|
||||
self.input_size = input_size
|
||||
self.num_classes = num_classes
|
||||
self.class_weight = class_weight # 类别权重
|
||||
|
||||
total_params = input_size * num_classes + num_classes
|
||||
print(f"LogisticRegression: {input_size} -> {num_classes}, 参数量: {total_params}")
|
||||
|
||||
def softmax(self, x):
|
||||
"""Softmax函数"""
|
||||
x_shifted = x - np.max(x, axis=1, keepdims=True)
|
||||
exp_x = np.exp(x_shifted)
|
||||
return exp_x / np.sum(exp_x, axis=1, keepdims=True)
|
||||
|
||||
def forward(self, X):
|
||||
"""前向传播"""
|
||||
# 线性变换
|
||||
z = X @ self.W + self.b
|
||||
# Softmax输出概率
|
||||
return self.softmax(z)
|
||||
|
||||
def backward(self, X, y):
|
||||
"""反向传播(梯度下降)"""
|
||||
batch_size = X.shape[0]
|
||||
probs = self.forward(X)
|
||||
|
||||
# Softmax + 交叉熵梯度
|
||||
d_z = probs.copy()
|
||||
|
||||
# 应用类别权重:减去权重值而不是1
|
||||
# 公式: dL/dz_y = w_y * (p_y - 1) = w_y*p_y - w_y
|
||||
if self.class_weight is not None:
|
||||
for i in range(batch_size):
|
||||
d_z[i, y[i]] -= self.class_weight[y[i]]
|
||||
else:
|
||||
d_z[np.arange(batch_size), y] -= 1
|
||||
|
||||
# 梯度
|
||||
d_W = X.T @ d_z
|
||||
d_b = np.sum(d_z, axis=0)
|
||||
|
||||
# 更新
|
||||
self.W -= self.lr * d_W / batch_size
|
||||
self.b -= self.lr * d_b / batch_size
|
||||
|
||||
def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True):
|
||||
"""训练"""
|
||||
num_samples = len(X)
|
||||
num_batches = (num_samples + batch_size - 1) // batch_size
|
||||
|
||||
for epoch in range(epochs):
|
||||
# 打乱
|
||||
indices = np.random.permutation(num_samples)
|
||||
X_shuffled = X[indices]
|
||||
y_shuffled = y[indices]
|
||||
|
||||
epoch_loss = 0
|
||||
for batch_idx in range(num_batches):
|
||||
start = batch_idx * batch_size
|
||||
end = min(start + batch_size, num_samples)
|
||||
X_batch = X_shuffled[start:end]
|
||||
y_batch = y_shuffled[start:end]
|
||||
|
||||
# 前向 + 反向
|
||||
probs = self.forward(X_batch)
|
||||
self.backward(X_batch, y_batch)
|
||||
|
||||
# 损失
|
||||
loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1)))
|
||||
epoch_loss += loss
|
||||
|
||||
# 评估
|
||||
if verbose and (epoch + 1) % 20 == 0:
|
||||
train_acc = self.accuracy(X, y)
|
||||
msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}"
|
||||
if X_val is not None:
|
||||
val_acc = self.accuracy(X_val, y_val)
|
||||
msg += f" | 测试准确率: {val_acc:.4f}"
|
||||
print(msg)
|
||||
|
||||
return self
|
||||
|
||||
def predict(self, X):
|
||||
return np.argmax(self.forward(X), axis=1)
|
||||
|
||||
def predict_proba(self, X):
|
||||
return self.forward(X)
|
||||
|
||||
def accuracy(self, X, y):
|
||||
return np.mean(self.predict(X) == y)
|
||||
|
||||
def save(self, filepath):
|
||||
"""保存模型权重"""
|
||||
np.save(filepath + '_W.npy', self.W)
|
||||
np.save(filepath + '_b.npy', self.b)
|
||||
print(f"模型已保存: {filepath}")
|
||||
|
||||
@staticmethod
|
||||
def load(filepath, input_size, num_classes=2, learning_rate=0.1):
|
||||
"""加载模型权重"""
|
||||
model = LogisticRegression(input_size, num_classes, learning_rate)
|
||||
model.W = np.load(filepath + '_W.npy')
|
||||
model.b = np.load(filepath + '_b.npy')
|
||||
print(f"模型已加载: {filepath}")
|
||||
return model
|
||||
|
||||
|
||||
class MLP(BaseModel):
|
||||
"""
|
||||
多层感知机(神经网络)
|
||||
|
||||
结构:输入 → 线性变换 → ReLU → 线性变换 → Softmax → 输出
|
||||
|
||||
和LogisticRegression的区别:
|
||||
- 多了一层隐藏层 + 非线性激活
|
||||
- 可以学习非线性关系
|
||||
- 参数量更大
|
||||
|
||||
参数量:
|
||||
- W1: input_size × hidden_size
|
||||
- b1: hidden_size
|
||||
- W2: hidden_size × num_classes
|
||||
- b2: num_classes
|
||||
"""
|
||||
|
||||
def __init__(self, input_size, hidden_size=64, num_classes=2,
|
||||
learning_rate=0.1, keep_prob=1.0, class_weight=None, seed=42):
|
||||
np.random.seed(seed)
|
||||
|
||||
# 第一层权重
|
||||
self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size)
|
||||
self.b1 = np.zeros(hidden_size)
|
||||
|
||||
# 第二层权重
|
||||
self.W2 = np.random.randn(hidden_size, num_classes) * np.sqrt(2.0 / hidden_size)
|
||||
self.b2 = np.zeros(num_classes)
|
||||
|
||||
self.lr = learning_rate
|
||||
self.keep_prob = keep_prob
|
||||
self.hidden_size = hidden_size
|
||||
self.input_size = input_size
|
||||
self.num_classes = num_classes
|
||||
self.class_weight = class_weight # 类别权重
|
||||
|
||||
total_params = (input_size * hidden_size + hidden_size +
|
||||
hidden_size * num_classes + num_classes)
|
||||
print(f"MLP: {input_size} -> {hidden_size} -> {num_classes}, 参数量: {total_params}")
|
||||
|
||||
def relu(self, x):
|
||||
"""ReLU激活"""
|
||||
return np.maximum(0, x)
|
||||
|
||||
def relu_derivative(self, x):
|
||||
"""ReLU导数"""
|
||||
return (x > 0).astype(float)
|
||||
|
||||
def softmax(self, x):
|
||||
"""Softmax函数"""
|
||||
x_shifted = x - np.max(x, axis=1, keepdims=True)
|
||||
exp_x = np.exp(x_shifted)
|
||||
return exp_x / np.sum(exp_x, axis=1, keepdims=True)
|
||||
|
||||
def forward(self, X):
|
||||
"""前向传播"""
|
||||
# 第一层
|
||||
self.z1 = X @ self.W1 + self.b1
|
||||
self.a1 = self.relu(self.z1)
|
||||
|
||||
# Dropout(训练时)
|
||||
if self.keep_prob < 1.0 and hasattr(self, 'training'):
|
||||
self.d1 = (np.random.rand(*self.a1.shape) < self.keep_prob).astype(float)
|
||||
self.a1 *= self.d1
|
||||
self.a1 /= self.keep_prob
|
||||
|
||||
# 第二层
|
||||
self.z2 = self.a1 @ self.W2 + self.b2
|
||||
self.probs = self.softmax(self.z2)
|
||||
|
||||
return self.probs
|
||||
|
||||
def backward(self, X, y):
|
||||
"""反向传播"""
|
||||
batch_size = X.shape[0]
|
||||
|
||||
# 输出层梯度
|
||||
d_z2 = self.probs.copy()
|
||||
|
||||
# 应用类别权重
|
||||
if self.class_weight is not None:
|
||||
for i in range(batch_size):
|
||||
d_z2[i, y[i]] -= self.class_weight[y[i]]
|
||||
else:
|
||||
d_z2[np.arange(batch_size), y] -= 1
|
||||
|
||||
# 第二层梯度
|
||||
d_W2 = self.a1.T @ d_z2
|
||||
d_b2 = np.sum(d_z2, axis=0)
|
||||
|
||||
# 隐藏层梯度
|
||||
d_a1 = d_z2 @ self.W2.T
|
||||
d_z1 = d_a1 * self.relu_derivative(self.z1)
|
||||
|
||||
# Dropout梯度
|
||||
if self.keep_prob < 1.0 and hasattr(self, 'd1'):
|
||||
d_z1 *= self.d1
|
||||
d_z1 /= self.keep_prob
|
||||
|
||||
# 第一层梯度
|
||||
d_W1 = X.T @ d_z1
|
||||
d_b1 = np.sum(d_z1, axis=0)
|
||||
|
||||
# 更新
|
||||
self.W1 -= self.lr * d_W1 / batch_size
|
||||
self.b1 -= self.lr * d_b1 / batch_size
|
||||
self.W2 -= self.lr * d_W2 / batch_size
|
||||
self.b2 -= self.lr * d_b2 / batch_size
|
||||
|
||||
def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True):
|
||||
"""训练"""
|
||||
num_samples = len(X)
|
||||
num_batches = (num_samples + batch_size - 1) // batch_size
|
||||
|
||||
for epoch in range(epochs):
|
||||
# 打乱
|
||||
indices = np.random.permutation(num_samples)
|
||||
X_shuffled = X[indices]
|
||||
y_shuffled = y[indices]
|
||||
|
||||
epoch_loss = 0
|
||||
self.training = True # 开启Dropout
|
||||
|
||||
for batch_idx in range(num_batches):
|
||||
start = batch_idx * batch_size
|
||||
end = min(start + batch_size, num_samples)
|
||||
X_batch = X_shuffled[start:end]
|
||||
y_batch = y_shuffled[start:end]
|
||||
|
||||
# 前向 + 反向
|
||||
probs = self.forward(X_batch)
|
||||
self.backward(X_batch, y_batch)
|
||||
|
||||
# 损失
|
||||
loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1)))
|
||||
epoch_loss += loss
|
||||
|
||||
self.training = False # 关闭Dropout
|
||||
|
||||
# 评估
|
||||
if verbose and (epoch + 1) % 20 == 0:
|
||||
train_acc = self.accuracy(X, y)
|
||||
msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}"
|
||||
if X_val is not None:
|
||||
val_acc = self.accuracy(X_val, y_val)
|
||||
msg += f" | 测试准确率: {val_acc:.4f}"
|
||||
print(msg)
|
||||
|
||||
return self
|
||||
|
||||
def predict(self, X):
|
||||
return np.argmax(self.forward(X), axis=1)
|
||||
|
||||
def predict_proba(self, X):
|
||||
return self.forward(X)
|
||||
|
||||
def accuracy(self, X, y):
|
||||
return np.mean(self.predict(X) == y)
|
||||
|
||||
def save(self, filepath):
|
||||
"""保存模型权重"""
|
||||
np.save(filepath + '_W1.npy', self.W1)
|
||||
np.save(filepath + '_b1.npy', self.b1)
|
||||
np.save(filepath + '_W2.npy', self.W2)
|
||||
np.save(filepath + '_b2.npy', self.b2)
|
||||
print(f"模型已保存: {filepath}")
|
||||
|
||||
@staticmethod
|
||||
def load(filepath, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0):
|
||||
"""加载模型权重"""
|
||||
model = MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob)
|
||||
model.W1 = np.load(filepath + '_W1.npy')
|
||||
model.b1 = np.load(filepath + '_b1.npy')
|
||||
model.W2 = np.load(filepath + '_W2.npy')
|
||||
model.b2 = np.load(filepath + '_b2.npy')
|
||||
print(f"模型已加载: {filepath}")
|
||||
return model
|
||||
|
||||
|
||||
def create_model(model_type, input_size, hidden_size=64, num_classes=2,
|
||||
learning_rate=0.1, keep_prob=1.0, class_weight=None):
|
||||
"""工厂函数:创建模型"""
|
||||
if model_type == 'lr':
|
||||
return LogisticRegression(input_size, num_classes, learning_rate, class_weight)
|
||||
elif model_type == 'mlp':
|
||||
return MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob, class_weight)
|
||||
else:
|
||||
raise ValueError(f"未知模型类型: {model_type}")
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
模型模块 - 纯NumPy实现
|
||||
|
||||
支持两种模型:
|
||||
1. Logistic Regression(逻辑回归)- 线性模型
|
||||
2. MLP(多层感知机)- 两层全连接网络
|
||||
|
||||
设计思路:
|
||||
- 两种模型都共享相同的接口,方便对比
|
||||
- 代码简洁,每行都有详细注释
|
||||
- 手动实现反向传播,原理透明
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
class BaseModel:
|
||||
"""模型基类"""
|
||||
def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): pass
|
||||
def predict(self, X): pass
|
||||
def predict_proba(self, X): pass
|
||||
def accuracy(self, X, y): pass
|
||||
|
||||
|
||||
class LogisticRegression(BaseModel):
|
||||
"""
|
||||
逻辑回归(线性分类器)
|
||||
|
||||
结构:输入 → 线性变换 → Softmax → 输出
|
||||
|
||||
原理:
|
||||
- 线性变换: z = X @ W + b
|
||||
- Softmax: 将线性输出转为概率分布
|
||||
|
||||
参数量:input_size × num_classes + num_classes
|
||||
"""
|
||||
|
||||
def __init__(self, input_size, num_classes=2, learning_rate=0.1,
|
||||
class_weight=None, seed=42):
|
||||
np.random.seed(seed)
|
||||
|
||||
# 权重初始化(Xavier)
|
||||
self.W = np.random.randn(input_size, num_classes) * np.sqrt(2.0 / input_size)
|
||||
self.b = np.zeros(num_classes)
|
||||
|
||||
self.lr = learning_rate
|
||||
self.input_size = input_size
|
||||
self.num_classes = num_classes
|
||||
self.class_weight = class_weight # 类别权重
|
||||
|
||||
total_params = input_size * num_classes + num_classes
|
||||
print(f"LogisticRegression: {input_size} -> {num_classes}, 参数量: {total_params}")
|
||||
|
||||
def softmax(self, x):
|
||||
"""Softmax函数"""
|
||||
x_shifted = x - np.max(x, axis=1, keepdims=True)
|
||||
exp_x = np.exp(x_shifted)
|
||||
return exp_x / np.sum(exp_x, axis=1, keepdims=True)
|
||||
|
||||
def forward(self, X):
|
||||
"""前向传播"""
|
||||
# 线性变换
|
||||
z = X @ self.W + self.b
|
||||
# Softmax输出概率
|
||||
return self.softmax(z)
|
||||
|
||||
def backward(self, X, y):
|
||||
"""反向传播(梯度下降)"""
|
||||
batch_size = X.shape[0]
|
||||
probs = self.forward(X)
|
||||
|
||||
# Softmax + 交叉熵梯度
|
||||
d_z = probs.copy()
|
||||
|
||||
# 应用类别权重:减去权重值而不是1
|
||||
# 公式: dL/dz_y = w_y * (p_y - 1) = w_y*p_y - w_y
|
||||
if self.class_weight is not None:
|
||||
for i in range(batch_size):
|
||||
d_z[i, y[i]] -= self.class_weight[y[i]]
|
||||
else:
|
||||
d_z[np.arange(batch_size), y] -= 1
|
||||
|
||||
# 梯度
|
||||
d_W = X.T @ d_z
|
||||
d_b = np.sum(d_z, axis=0)
|
||||
|
||||
# 更新
|
||||
self.W -= self.lr * d_W / batch_size
|
||||
self.b -= self.lr * d_b / batch_size
|
||||
|
||||
def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True):
|
||||
"""训练"""
|
||||
num_samples = len(X)
|
||||
num_batches = (num_samples + batch_size - 1) // batch_size
|
||||
|
||||
for epoch in range(epochs):
|
||||
# 打乱
|
||||
indices = np.random.permutation(num_samples)
|
||||
X_shuffled = X[indices]
|
||||
y_shuffled = y[indices]
|
||||
|
||||
epoch_loss = 0
|
||||
for batch_idx in range(num_batches):
|
||||
start = batch_idx * batch_size
|
||||
end = min(start + batch_size, num_samples)
|
||||
X_batch = X_shuffled[start:end]
|
||||
y_batch = y_shuffled[start:end]
|
||||
|
||||
# 前向 + 反向
|
||||
probs = self.forward(X_batch)
|
||||
self.backward(X_batch, y_batch)
|
||||
|
||||
# 损失
|
||||
loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1)))
|
||||
epoch_loss += loss
|
||||
|
||||
# 评估
|
||||
if verbose and (epoch + 1) % 20 == 0:
|
||||
train_acc = self.accuracy(X, y)
|
||||
msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}"
|
||||
if X_val is not None:
|
||||
val_acc = self.accuracy(X_val, y_val)
|
||||
msg += f" | 测试准确率: {val_acc:.4f}"
|
||||
print(msg)
|
||||
|
||||
return self
|
||||
|
||||
def predict(self, X):
|
||||
return np.argmax(self.forward(X), axis=1)
|
||||
|
||||
def predict_proba(self, X):
|
||||
return self.forward(X)
|
||||
|
||||
def accuracy(self, X, y):
|
||||
return np.mean(self.predict(X) == y)
|
||||
|
||||
def save(self, filepath):
|
||||
"""保存模型权重"""
|
||||
np.save(filepath + '_W.npy', self.W)
|
||||
np.save(filepath + '_b.npy', self.b)
|
||||
print(f"模型已保存: {filepath}")
|
||||
|
||||
@staticmethod
|
||||
def load(filepath, input_size, num_classes=2, learning_rate=0.1):
|
||||
"""加载模型权重"""
|
||||
model = LogisticRegression(input_size, num_classes, learning_rate)
|
||||
model.W = np.load(filepath + '_W.npy')
|
||||
model.b = np.load(filepath + '_b.npy')
|
||||
print(f"模型已加载: {filepath}")
|
||||
return model
|
||||
|
||||
|
||||
class MLP(BaseModel):
|
||||
"""
|
||||
多层感知机(神经网络)
|
||||
|
||||
结构:输入 → 线性变换 → ReLU → 线性变换 → Softmax → 输出
|
||||
|
||||
和LogisticRegression的区别:
|
||||
- 多了一层隐藏层 + 非线性激活
|
||||
- 可以学习非线性关系
|
||||
- 参数量更大
|
||||
|
||||
参数量:
|
||||
- W1: input_size × hidden_size
|
||||
- b1: hidden_size
|
||||
- W2: hidden_size × num_classes
|
||||
- b2: num_classes
|
||||
"""
|
||||
|
||||
def __init__(self, input_size, hidden_size=64, num_classes=2,
|
||||
learning_rate=0.1, keep_prob=1.0, class_weight=None, seed=42):
|
||||
np.random.seed(seed)
|
||||
|
||||
# 第一层权重
|
||||
self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size)
|
||||
self.b1 = np.zeros(hidden_size)
|
||||
|
||||
# 第二层权重
|
||||
self.W2 = np.random.randn(hidden_size, num_classes) * np.sqrt(2.0 / hidden_size)
|
||||
self.b2 = np.zeros(num_classes)
|
||||
|
||||
self.lr = learning_rate
|
||||
self.keep_prob = keep_prob
|
||||
self.hidden_size = hidden_size
|
||||
self.input_size = input_size
|
||||
self.num_classes = num_classes
|
||||
self.class_weight = class_weight # 类别权重
|
||||
|
||||
total_params = (input_size * hidden_size + hidden_size +
|
||||
hidden_size * num_classes + num_classes)
|
||||
print(f"MLP: {input_size} -> {hidden_size} -> {num_classes}, 参数量: {total_params}")
|
||||
|
||||
def relu(self, x):
|
||||
"""ReLU激活"""
|
||||
return np.maximum(0, x)
|
||||
|
||||
def relu_derivative(self, x):
|
||||
"""ReLU导数"""
|
||||
return (x > 0).astype(float)
|
||||
|
||||
def softmax(self, x):
|
||||
"""Softmax函数"""
|
||||
x_shifted = x - np.max(x, axis=1, keepdims=True)
|
||||
exp_x = np.exp(x_shifted)
|
||||
return exp_x / np.sum(exp_x, axis=1, keepdims=True)
|
||||
|
||||
def forward(self, X):
|
||||
"""前向传播"""
|
||||
# 第一层
|
||||
self.z1 = X @ self.W1 + self.b1
|
||||
self.a1 = self.relu(self.z1)
|
||||
|
||||
# Dropout(训练时)
|
||||
if self.keep_prob < 1.0 and hasattr(self, 'training'):
|
||||
self.d1 = (np.random.rand(*self.a1.shape) < self.keep_prob).astype(float)
|
||||
self.a1 *= self.d1
|
||||
self.a1 /= self.keep_prob
|
||||
|
||||
# 第二层
|
||||
self.z2 = self.a1 @ self.W2 + self.b2
|
||||
self.probs = self.softmax(self.z2)
|
||||
|
||||
return self.probs
|
||||
|
||||
def backward(self, X, y):
|
||||
"""反向传播"""
|
||||
batch_size = X.shape[0]
|
||||
|
||||
# 输出层梯度
|
||||
d_z2 = self.probs.copy()
|
||||
|
||||
# 应用类别权重
|
||||
if self.class_weight is not None:
|
||||
for i in range(batch_size):
|
||||
d_z2[i, y[i]] -= self.class_weight[y[i]]
|
||||
else:
|
||||
d_z2[np.arange(batch_size), y] -= 1
|
||||
|
||||
# 第二层梯度
|
||||
d_W2 = self.a1.T @ d_z2
|
||||
d_b2 = np.sum(d_z2, axis=0)
|
||||
|
||||
# 隐藏层梯度
|
||||
d_a1 = d_z2 @ self.W2.T
|
||||
d_z1 = d_a1 * self.relu_derivative(self.z1)
|
||||
|
||||
# Dropout梯度
|
||||
if self.keep_prob < 1.0 and hasattr(self, 'd1'):
|
||||
d_z1 *= self.d1
|
||||
d_z1 /= self.keep_prob
|
||||
|
||||
# 第一层梯度
|
||||
d_W1 = X.T @ d_z1
|
||||
d_b1 = np.sum(d_z1, axis=0)
|
||||
|
||||
# 更新
|
||||
self.W1 -= self.lr * d_W1 / batch_size
|
||||
self.b1 -= self.lr * d_b1 / batch_size
|
||||
self.W2 -= self.lr * d_W2 / batch_size
|
||||
self.b2 -= self.lr * d_b2 / batch_size
|
||||
|
||||
def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True):
|
||||
"""训练"""
|
||||
num_samples = len(X)
|
||||
num_batches = (num_samples + batch_size - 1) // batch_size
|
||||
|
||||
for epoch in range(epochs):
|
||||
# 打乱
|
||||
indices = np.random.permutation(num_samples)
|
||||
X_shuffled = X[indices]
|
||||
y_shuffled = y[indices]
|
||||
|
||||
epoch_loss = 0
|
||||
self.training = True # 开启Dropout
|
||||
|
||||
for batch_idx in range(num_batches):
|
||||
start = batch_idx * batch_size
|
||||
end = min(start + batch_size, num_samples)
|
||||
X_batch = X_shuffled[start:end]
|
||||
y_batch = y_shuffled[start:end]
|
||||
|
||||
# 前向 + 反向
|
||||
probs = self.forward(X_batch)
|
||||
self.backward(X_batch, y_batch)
|
||||
|
||||
# 损失
|
||||
loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1)))
|
||||
epoch_loss += loss
|
||||
|
||||
self.training = False # 关闭Dropout
|
||||
|
||||
# 评估
|
||||
if verbose and (epoch + 1) % 20 == 0:
|
||||
train_acc = self.accuracy(X, y)
|
||||
msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}"
|
||||
if X_val is not None:
|
||||
val_acc = self.accuracy(X_val, y_val)
|
||||
msg += f" | 测试准确率: {val_acc:.4f}"
|
||||
print(msg)
|
||||
|
||||
return self
|
||||
|
||||
def predict(self, X):
|
||||
return np.argmax(self.forward(X), axis=1)
|
||||
|
||||
def predict_proba(self, X):
|
||||
return self.forward(X)
|
||||
|
||||
def accuracy(self, X, y):
|
||||
return np.mean(self.predict(X) == y)
|
||||
|
||||
def save(self, filepath):
|
||||
"""保存模型权重"""
|
||||
np.save(filepath + '_W1.npy', self.W1)
|
||||
np.save(filepath + '_b1.npy', self.b1)
|
||||
np.save(filepath + '_W2.npy', self.W2)
|
||||
np.save(filepath + '_b2.npy', self.b2)
|
||||
print(f"模型已保存: {filepath}")
|
||||
|
||||
@staticmethod
|
||||
def load(filepath, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0):
|
||||
"""加载模型权重"""
|
||||
model = MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob)
|
||||
model.W1 = np.load(filepath + '_W1.npy')
|
||||
model.b1 = np.load(filepath + '_b1.npy')
|
||||
model.W2 = np.load(filepath + '_W2.npy')
|
||||
model.b2 = np.load(filepath + '_b2.npy')
|
||||
print(f"模型已加载: {filepath}")
|
||||
return model
|
||||
|
||||
|
||||
def create_model(model_type, input_size, hidden_size=64, num_classes=2,
|
||||
learning_rate=0.1, keep_prob=1.0, class_weight=None):
|
||||
"""工厂函数:创建模型"""
|
||||
if model_type == 'lr':
|
||||
return LogisticRegression(input_size, num_classes, learning_rate, class_weight)
|
||||
elif model_type == 'mlp':
|
||||
return MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob, class_weight)
|
||||
else:
|
||||
raise ValueError(f"未知模型类型: {model_type}")
|
||||
|
||||
528
predict.py
528
predict.py
@@ -1,264 +1,264 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
预测脚本 - 加载训练好的模型,测试自定义文本
|
||||
|
||||
使用方法:
|
||||
python predict.py
|
||||
|
||||
程序会:
|
||||
1. 列出已保存的模型
|
||||
2. 让学生选择模型
|
||||
3. 加载模型和向量化器
|
||||
4. 学生输入文本,实时预测情感
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import jieba
|
||||
import numpy as np
|
||||
import math
|
||||
import csv
|
||||
from collections import Counter
|
||||
|
||||
# 添加父目录到路径以便导入
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
from config import DATA_DIR, MAX_FEATURES, MAX_SEQ_LEN, HIDDEN_SIZE
|
||||
|
||||
|
||||
def find_saved_models():
|
||||
"""查找已保存的模型"""
|
||||
models = {}
|
||||
for f in os.listdir('.'):
|
||||
if not f.startswith('model_') or not f.endswith('.npy'):
|
||||
continue
|
||||
# MLP: model_mlp_tfidf_W1.npy -> model_mlp_tfidf
|
||||
# LR: model_lr_bow_W.npy -> model_lr_bow
|
||||
# 需要精确匹配,避免 tfidf 被截断
|
||||
for suffix in ['_W1.npy', '_b1.npy', '_W2.npy', '_b2.npy', '_W.npy', '_b.npy']:
|
||||
if f.endswith(suffix):
|
||||
name = f[:-len(suffix)]
|
||||
models[name] = True
|
||||
break
|
||||
return list(models.keys())
|
||||
|
||||
|
||||
def tokenize(text):
|
||||
"""中文分词"""
|
||||
text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text)
|
||||
words = jieba.lcut(text)
|
||||
return [w for w in words if len(w) > 1]
|
||||
|
||||
|
||||
class BoWVectorizer:
|
||||
def __init__(self, max_seq_len, max_features=3000):
|
||||
self.max_seq_len = max_seq_len
|
||||
self.max_features = max_features
|
||||
self.vocab = {}
|
||||
|
||||
def fit(self, texts):
|
||||
counter = Counter()
|
||||
for text in texts:
|
||||
words = tokenize(text)
|
||||
counter.update(words)
|
||||
most_common = counter.most_common(self.max_features)
|
||||
self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)}
|
||||
return self
|
||||
|
||||
def transform(self, text):
|
||||
words = tokenize(text)
|
||||
vec = [0] * self.max_seq_len
|
||||
for i, word in enumerate(words[:self.max_seq_len]):
|
||||
if word in self.vocab:
|
||||
vec[i] = 1
|
||||
return np.array([vec], dtype=np.float32)
|
||||
|
||||
|
||||
class TFIDFVectorizer:
|
||||
def __init__(self, max_seq_len, max_features=3000):
|
||||
self.max_seq_len = max_seq_len
|
||||
self.max_features = max_features
|
||||
self.vocab = {}
|
||||
self.idf = {}
|
||||
self.num_docs = 0
|
||||
|
||||
def fit(self, texts):
|
||||
counter = Counter()
|
||||
doc_counter = Counter()
|
||||
for text in texts:
|
||||
words = tokenize(text)
|
||||
unique_words = set(words)
|
||||
counter.update(words)
|
||||
for w in unique_words:
|
||||
doc_counter[w] += 1
|
||||
|
||||
self.num_docs = len(texts)
|
||||
|
||||
# 按词频取最高频的词(和训练时dataset.py一致)
|
||||
most_common = counter.most_common(self.max_features)
|
||||
self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)}
|
||||
|
||||
# 计算IDF(只对词表中的词)
|
||||
self.idf = {}
|
||||
for word in self.vocab:
|
||||
df = doc_counter.get(word, 1)
|
||||
self.idf[word] = math.log(self.num_docs / (df + 1)) + 1
|
||||
|
||||
return self
|
||||
|
||||
def transform(self, text):
|
||||
words = tokenize(text)
|
||||
tf = Counter(words)
|
||||
tf_sum = len(words) if words else 1
|
||||
vec = [0.0] * self.max_seq_len
|
||||
for i, word in enumerate(words[:self.max_seq_len]):
|
||||
if word in self.vocab:
|
||||
vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0)
|
||||
return np.array([vec], dtype=np.float32)
|
||||
|
||||
|
||||
def load_vectorizer(vectorizer_type):
|
||||
"""加载向量化器"""
|
||||
csv_path = os.path.join(DATA_DIR, 'ChnSentiCorp_htl_all.csv')
|
||||
if not os.path.exists(csv_path):
|
||||
print("数据文件不存在,请先运行 main.py 训练模型")
|
||||
return None
|
||||
|
||||
print("正在加载数据构建词表...")
|
||||
texts, labels = [], []
|
||||
with open(csv_path, 'r', encoding='utf-8') as f:
|
||||
reader = csv.reader(f)
|
||||
for row in reader:
|
||||
if len(row) < 2:
|
||||
continue
|
||||
try:
|
||||
labels.append(int(row[0]))
|
||||
texts.append(row[1].strip())
|
||||
except:
|
||||
continue
|
||||
|
||||
# 判断向量化器类型
|
||||
is_tfidf = 'tfidf' in vectorizer_type.lower()
|
||||
|
||||
if is_tfidf:
|
||||
vectorizer = TFIDFVectorizer(MAX_SEQ_LEN)
|
||||
vectorizer.fit(texts)
|
||||
print(f" TF-IDF词表大小: {len(vectorizer.vocab)}")
|
||||
else:
|
||||
vectorizer = BoWVectorizer(MAX_SEQ_LEN)
|
||||
vectorizer.fit(texts)
|
||||
print(f" BoW词表大小: {len(vectorizer.vocab)}")
|
||||
|
||||
return vectorizer
|
||||
|
||||
|
||||
def load_model(model_name, model_type, input_size, hidden_size):
|
||||
"""加载模型"""
|
||||
from model_numpy import MLP, LogisticRegression
|
||||
|
||||
if model_type == 'lr':
|
||||
model = LogisticRegression(input_size, 2, learning_rate=0.05)
|
||||
model.W = np.load(model_name + '_W.npy')
|
||||
model.b = np.load(model_name + '_b.npy')
|
||||
else: # mlp
|
||||
model = MLP(input_size, hidden_size, 2, learning_rate=0.05, keep_prob=1.0)
|
||||
model.W1 = np.load(model_name + '_W1.npy')
|
||||
model.b1 = np.load(model_name + '_b1.npy')
|
||||
model.W2 = np.load(model_name + '_W2.npy')
|
||||
model.b2 = np.load(model_name + '_b2.npy')
|
||||
|
||||
return model
|
||||
|
||||
|
||||
def predict_text(model, vectorizer, text):
|
||||
"""预测单条文本"""
|
||||
vec = vectorizer.transform(text)
|
||||
prob = model.forward(vec)[0]
|
||||
pred = np.argmax(prob)
|
||||
label = "正面" if pred == 1 else "负面"
|
||||
confidence = prob[pred] * 100
|
||||
return label, confidence, prob
|
||||
|
||||
|
||||
def main():
|
||||
print("\n" + "=" * 60)
|
||||
print("文本情感预测 - 加载已训练模型")
|
||||
print("=" * 60)
|
||||
|
||||
# 查找已保存的模型
|
||||
models = find_saved_models()
|
||||
|
||||
if not models:
|
||||
print("\n未找到已保存的模型!")
|
||||
print("请先运行 python main.py 训练模型")
|
||||
return
|
||||
|
||||
# 让用户选择模型
|
||||
print(f"\n已找到 {len(models)} 个模型:")
|
||||
for i, name in enumerate(models, 1):
|
||||
print(f" {i}. {name}")
|
||||
|
||||
print(f"\n请选择模型编号 (1-{len(models)}): ", end="", flush=True)
|
||||
try:
|
||||
choice = int(sys.stdin.readline().strip())
|
||||
if choice < 1 or choice > len(models):
|
||||
print("无效选择")
|
||||
return
|
||||
model_name = models[choice - 1]
|
||||
except:
|
||||
print("无效输入")
|
||||
return
|
||||
|
||||
# 解析模型名称获取类型
|
||||
parts = model_name.split('_')
|
||||
model_type = parts[1] # 'lr' 或 'mlp'
|
||||
vectorizer_type = parts[2] # 'bow' 或 'tfidf'
|
||||
|
||||
print(f"\n选中的模型: {model_name}")
|
||||
print(f"模型类型: {model_type.upper()}")
|
||||
print(f"向量化方式: {vectorizer_type.upper()}")
|
||||
|
||||
# 加载向量化器
|
||||
print("\n正在加载向量化器...")
|
||||
vectorizer = load_vectorizer(vectorizer_type)
|
||||
if vectorizer is None:
|
||||
return
|
||||
|
||||
# 加载模型
|
||||
print("正在加载模型...")
|
||||
model = load_model(model_name, model_type, MAX_SEQ_LEN, HIDDEN_SIZE)
|
||||
print("模型加载成功!")
|
||||
|
||||
# 预测循环
|
||||
print("\n" + "=" * 60)
|
||||
print("开始预测(输入文本后按回车,q退出)")
|
||||
print("=" * 60)
|
||||
|
||||
while True:
|
||||
try:
|
||||
print("\n请输入评论文本: ", end="", flush=True)
|
||||
text = sys.stdin.readline().strip()
|
||||
|
||||
if text.lower() == 'q':
|
||||
print("再见!")
|
||||
break
|
||||
|
||||
if not text:
|
||||
continue
|
||||
|
||||
label, confidence, prob = predict_text(model, vectorizer, text)
|
||||
|
||||
print(f"\n预测结果: {label}")
|
||||
print(f"置信度: {confidence:.1f}%")
|
||||
print(f"详细: 正面概率={prob[1]*100:.1f}%, 负面概率={prob[0]*100:.1f}%")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\n再见!")
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"预测出错: {e}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
预测脚本 - 加载训练好的模型,测试自定义文本
|
||||
|
||||
使用方法:
|
||||
python predict.py
|
||||
|
||||
程序会:
|
||||
1. 列出已保存的模型
|
||||
2. 让学生选择模型
|
||||
3. 加载模型和向量化器
|
||||
4. 学生输入文本,实时预测情感
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import jieba
|
||||
import numpy as np
|
||||
import math
|
||||
import csv
|
||||
from collections import Counter
|
||||
|
||||
# 添加父目录到路径以便导入
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
from config import DATA_DIR, MAX_FEATURES, MAX_SEQ_LEN, HIDDEN_SIZE
|
||||
|
||||
|
||||
def find_saved_models():
|
||||
"""查找已保存的模型"""
|
||||
models = {}
|
||||
for f in os.listdir('.'):
|
||||
if not f.startswith('model_') or not f.endswith('.npy'):
|
||||
continue
|
||||
# MLP: model_mlp_tfidf_W1.npy -> model_mlp_tfidf
|
||||
# LR: model_lr_bow_W.npy -> model_lr_bow
|
||||
# 需要精确匹配,避免 tfidf 被截断
|
||||
for suffix in ['_W1.npy', '_b1.npy', '_W2.npy', '_b2.npy', '_W.npy', '_b.npy']:
|
||||
if f.endswith(suffix):
|
||||
name = f[:-len(suffix)]
|
||||
models[name] = True
|
||||
break
|
||||
return list(models.keys())
|
||||
|
||||
|
||||
def tokenize(text):
|
||||
"""中文分词"""
|
||||
text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text)
|
||||
words = jieba.lcut(text)
|
||||
return [w for w in words if len(w) > 1]
|
||||
|
||||
|
||||
class BoWVectorizer:
|
||||
def __init__(self, max_seq_len, max_features=3000):
|
||||
self.max_seq_len = max_seq_len
|
||||
self.max_features = max_features
|
||||
self.vocab = {}
|
||||
|
||||
def fit(self, texts):
|
||||
counter = Counter()
|
||||
for text in texts:
|
||||
words = tokenize(text)
|
||||
counter.update(words)
|
||||
most_common = counter.most_common(self.max_features)
|
||||
self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)}
|
||||
return self
|
||||
|
||||
def transform(self, text):
|
||||
words = tokenize(text)
|
||||
vec = [0] * self.max_seq_len
|
||||
for i, word in enumerate(words[:self.max_seq_len]):
|
||||
if word in self.vocab:
|
||||
vec[i] = 1
|
||||
return np.array([vec], dtype=np.float32)
|
||||
|
||||
|
||||
class TFIDFVectorizer:
|
||||
def __init__(self, max_seq_len, max_features=3000):
|
||||
self.max_seq_len = max_seq_len
|
||||
self.max_features = max_features
|
||||
self.vocab = {}
|
||||
self.idf = {}
|
||||
self.num_docs = 0
|
||||
|
||||
def fit(self, texts):
|
||||
counter = Counter()
|
||||
doc_counter = Counter()
|
||||
for text in texts:
|
||||
words = tokenize(text)
|
||||
unique_words = set(words)
|
||||
counter.update(words)
|
||||
for w in unique_words:
|
||||
doc_counter[w] += 1
|
||||
|
||||
self.num_docs = len(texts)
|
||||
|
||||
# 按词频取最高频的词(和训练时dataset.py一致)
|
||||
most_common = counter.most_common(self.max_features)
|
||||
self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)}
|
||||
|
||||
# 计算IDF(只对词表中的词)
|
||||
self.idf = {}
|
||||
for word in self.vocab:
|
||||
df = doc_counter.get(word, 1)
|
||||
self.idf[word] = math.log(self.num_docs / (df + 1)) + 1
|
||||
|
||||
return self
|
||||
|
||||
def transform(self, text):
|
||||
words = tokenize(text)
|
||||
tf = Counter(words)
|
||||
tf_sum = len(words) if words else 1
|
||||
vec = [0.0] * self.max_seq_len
|
||||
for i, word in enumerate(words[:self.max_seq_len]):
|
||||
if word in self.vocab:
|
||||
vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0)
|
||||
return np.array([vec], dtype=np.float32)
|
||||
|
||||
|
||||
def load_vectorizer(vectorizer_type):
|
||||
"""加载向量化器"""
|
||||
csv_path = os.path.join(DATA_DIR, 'ChnSentiCorp_htl_all.csv')
|
||||
if not os.path.exists(csv_path):
|
||||
print("数据文件不存在,请先运行 main.py 训练模型")
|
||||
return None
|
||||
|
||||
print("正在加载数据构建词表...")
|
||||
texts, labels = [], []
|
||||
with open(csv_path, 'r', encoding='utf-8') as f:
|
||||
reader = csv.reader(f)
|
||||
for row in reader:
|
||||
if len(row) < 2:
|
||||
continue
|
||||
try:
|
||||
labels.append(int(row[0]))
|
||||
texts.append(row[1].strip())
|
||||
except:
|
||||
continue
|
||||
|
||||
# 判断向量化器类型
|
||||
is_tfidf = 'tfidf' in vectorizer_type.lower()
|
||||
|
||||
if is_tfidf:
|
||||
vectorizer = TFIDFVectorizer(MAX_SEQ_LEN)
|
||||
vectorizer.fit(texts)
|
||||
print(f" TF-IDF词表大小: {len(vectorizer.vocab)}")
|
||||
else:
|
||||
vectorizer = BoWVectorizer(MAX_SEQ_LEN)
|
||||
vectorizer.fit(texts)
|
||||
print(f" BoW词表大小: {len(vectorizer.vocab)}")
|
||||
|
||||
return vectorizer
|
||||
|
||||
|
||||
def load_model(model_name, model_type, input_size, hidden_size):
|
||||
"""加载模型"""
|
||||
from model_numpy import MLP, LogisticRegression
|
||||
|
||||
if model_type == 'lr':
|
||||
model = LogisticRegression(input_size, 2, learning_rate=0.05)
|
||||
model.W = np.load(model_name + '_W.npy')
|
||||
model.b = np.load(model_name + '_b.npy')
|
||||
else: # mlp
|
||||
model = MLP(input_size, hidden_size, 2, learning_rate=0.05, keep_prob=1.0)
|
||||
model.W1 = np.load(model_name + '_W1.npy')
|
||||
model.b1 = np.load(model_name + '_b1.npy')
|
||||
model.W2 = np.load(model_name + '_W2.npy')
|
||||
model.b2 = np.load(model_name + '_b2.npy')
|
||||
|
||||
return model
|
||||
|
||||
|
||||
def predict_text(model, vectorizer, text):
|
||||
"""预测单条文本"""
|
||||
vec = vectorizer.transform(text)
|
||||
prob = model.forward(vec)[0]
|
||||
pred = np.argmax(prob)
|
||||
label = "正面" if pred == 1 else "负面"
|
||||
confidence = prob[pred] * 100
|
||||
return label, confidence, prob
|
||||
|
||||
|
||||
def main():
|
||||
print("\n" + "=" * 60)
|
||||
print("文本情感预测 - 加载已训练模型")
|
||||
print("=" * 60)
|
||||
|
||||
# 查找已保存的模型
|
||||
models = find_saved_models()
|
||||
|
||||
if not models:
|
||||
print("\n未找到已保存的模型!")
|
||||
print("请先运行 python main.py 训练模型")
|
||||
return
|
||||
|
||||
# 让用户选择模型
|
||||
print(f"\n已找到 {len(models)} 个模型:")
|
||||
for i, name in enumerate(models, 1):
|
||||
print(f" {i}. {name}")
|
||||
|
||||
print(f"\n请选择模型编号 (1-{len(models)}): ", end="", flush=True)
|
||||
try:
|
||||
choice = int(sys.stdin.readline().strip())
|
||||
if choice < 1 or choice > len(models):
|
||||
print("无效选择")
|
||||
return
|
||||
model_name = models[choice - 1]
|
||||
except:
|
||||
print("无效输入")
|
||||
return
|
||||
|
||||
# 解析模型名称获取类型
|
||||
parts = model_name.split('_')
|
||||
model_type = parts[1] # 'lr' 或 'mlp'
|
||||
vectorizer_type = parts[2] # 'bow' 或 'tfidf'
|
||||
|
||||
print(f"\n选中的模型: {model_name}")
|
||||
print(f"模型类型: {model_type.upper()}")
|
||||
print(f"向量化方式: {vectorizer_type.upper()}")
|
||||
|
||||
# 加载向量化器
|
||||
print("\n正在加载向量化器...")
|
||||
vectorizer = load_vectorizer(vectorizer_type)
|
||||
if vectorizer is None:
|
||||
return
|
||||
|
||||
# 加载模型
|
||||
print("正在加载模型...")
|
||||
model = load_model(model_name, model_type, MAX_SEQ_LEN, HIDDEN_SIZE)
|
||||
print("模型加载成功!")
|
||||
|
||||
# 预测循环
|
||||
print("\n" + "=" * 60)
|
||||
print("开始预测(输入文本后按回车,q退出)")
|
||||
print("=" * 60)
|
||||
|
||||
while True:
|
||||
try:
|
||||
print("\n请输入评论文本: ", end="", flush=True)
|
||||
text = sys.stdin.readline().strip()
|
||||
|
||||
if text.lower() == 'q':
|
||||
print("再见!")
|
||||
break
|
||||
|
||||
if not text:
|
||||
continue
|
||||
|
||||
label, confidence, prob = predict_text(model, vectorizer, text)
|
||||
|
||||
print(f"\n预测结果: {label}")
|
||||
print(f"置信度: {confidence:.1f}%")
|
||||
print(f"详细: 正面概率={prob[1]*100:.1f}%, 负面概率={prob[0]*100:.1f}%")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\n再见!")
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"预测出错: {e}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user