Files
task-3-2-2-text-classification/4.30.py
2509165029 47877a2159 4.30
2026-04-30 15:56:23 +08:00

701 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
数据加载与向量化模块
支持两种向量化方法:
1. BoW (Bag of Words) - 词频向量
2. TF-IDF - 词频-逆文档频率向量
TF-IDF 的优势:
- 降低常见词(如"""")的权重
- 提升罕见词的信息量
- 通常效果优于简单BoW
"""
import os
import re
import csv
import math
import jieba
import numpy as np
from collections import Counter
try:
import urllib.request
import ssl
DOWNLOAD_AVAILABLE = True
except ImportError:
DOWNLOAD_AVAILABLE = False
DATASET_URL = "https://raw.githubusercontent.com/SophonPlus/ChineseNlpCorpus/master/datasets/ChnSentiCorp_htl_all/ChnSentiCorp_htl_all.csv"
def download_dataset(data_dir):
"""下载数据集(如果不存在)"""
csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv')
if os.path.exists(csv_path):
print(f"数据已存在: {csv_path}")
return True
if not DOWNLOAD_AVAILABLE:
return False
print("正在下载数据集...")
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
try:
request = urllib.request.Request(DATASET_URL, headers={'User-Agent': 'Mozilla/5.0'})
response = urllib.request.urlopen(request, timeout=120, context=ssl_context)
os.makedirs(data_dir, exist_ok=True)
with open(csv_path, 'wb') as f:
f.write(response.read())
print(f"下载完成: {csv_path}")
return True
except Exception as e:
print(f"下载失败: {e}")
return False
def load_raw_data(data_dir):
"""加载原始数据"""
csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv')
texts, labels = [], []
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
for row in reader:
if len(row) < 2:
continue
try:
label = int(row[0])
review = row[1].strip()
if review:
texts.append(review)
labels.append(label)
except (ValueError, IndexError):
continue
return texts, np.array(labels)
def tokenize(text):
"""中文分词"""
text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text)
words = jieba.lcut(text)
return [w for w in words if len(w) > 1]
# ==================== 向量化器 ====================
class BaseVectorizer:
"""向量化器基类"""
def fit(self, texts): pass
def transform(self, texts): pass
def fit_transform(self, texts): pass
class BoWVectorizer(BaseVectorizer):
"""
词袋模型 (Bag of Words)
原理:统计每个词在文本中出现的次数
向量维度 = 词表大小
每个维度 = 该词在本文本中出现的次数
"""
def __init__(self, max_features, max_seq_len):
self.max_features = max_features
self.max_seq_len = max_seq_len
self.vocab = {}
self.doc_freq = {} # 文档频率
self.num_docs = 0
def fit(self, texts):
"""构建词表(基于词频)"""
counter = Counter()
doc_counter = Counter() # 统计包含该词的文档数
for text in texts:
words = tokenize(text)
unique_words = set(words)
counter.update(words)
for w in unique_words:
doc_counter[w] += 1
self.num_docs = len(texts)
# 取最高频的词
most_common = counter.most_common(self.max_features)
self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)}
# 记录文档频率用于TF-IDF
self.doc_freq = {w: doc_counter[w] for w in self.vocab}
print(f" BoW词表大小: {len(self.vocab)}")
return self
def transform(self, texts):
"""将文本转换为词频向量"""
vectors = []
for text in texts:
words = tokenize(text)
freq = [0] * self.max_seq_len
for i, word in enumerate(words[:self.max_seq_len]):
if word in self.vocab:
freq[i] = 1 # 二值(出现=1不出现=0
vectors.append(freq)
return np.array(vectors, dtype=np.float32)
def fit_transform(self, texts):
self.fit(texts)
return self.transform(texts)
class TFIDFVectorizer(BaseVectorizer):
"""
TF-IDF 向量器
原理:
- TF(词频) = 词在本文本中出现的次数
- IDF(逆文档频率) = log(总文档数 / 包含该词的文档数)
- TF-IDF = TF × IDF
优势:
- 降低常见无意义词的权重(如""""
- 提升罕见但有信息量的词
"""
def __init__(self, max_features, max_seq_len):
self.max_features = max_features
self.max_seq_len = max_seq_len
self.vocab = {}
self.idf = {} # 存储每个词的IDF值
self.num_docs = 0
def fit(self, texts):
"""构建词表并计算IDF"""
counter = Counter()
doc_counter = Counter()
for text in texts:
words = tokenize(text)
unique_words = set(words)
counter.update(words)
for w in unique_words:
doc_counter[w] += 1
self.num_docs = len(texts)
# 计算每个词的IDF
# IDF = log(总文档数 / 包含该词的文档数)
idf_values = {}
for word, df in doc_counter.items():
idf_values[word] = math.log(self.num_docs / (df + 1)) + 1 # 加1防零
# 取IDF值最高的词信息量最大的词
sorted_words = sorted(idf_values.items(), key=lambda x: x[1], reverse=True)
self.vocab = {word: idx for idx, (word, _) in enumerate(sorted_words[:self.max_features])}
# 保存IDF值
self.idf = {word: idf_values[word] for word in self.vocab}
print(f" TF-IDF词表大小: {len(self.vocab)}")
print(f" 平均IDF: {np.mean(list(self.idf.values())):.3f}")
return self
def transform(self, texts):
"""将文本转换为TF-IDF向量"""
vectors = []
for text in texts:
words = tokenize(text)
# 计算TF
tf = Counter(words)
tf_sum = len(words) if words else 1
# 生成向量
vec = [0.0] * self.max_seq_len
for i, word in enumerate(words[:self.max_seq_len]):
if word in self.vocab:
# TF × IDF
vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0)
vectors.append(vec)
return np.array(vectors, dtype=np.float32)
def fit_transform(self, texts):
self.fit(texts)
return self.transform(texts)
def load_data(data_dir, max_features, max_seq_len, vectorizer_type='tfidf'):
"""
加载并向量化数据
参数:
- vectorizer_type: 'tfidf''bow'
"""
if not download_dataset(data_dir):
raise RuntimeError("数据加载失败,请检查网络或手动下载数据集")
print("正在加载数据...")
texts, labels = load_raw_data(data_dir)
print(f"总评论数: {len(texts)}, 正面: {sum(labels)}, 负面: {len(labels) - sum(labels)}")
# 选择向量化器
if vectorizer_type == 'tfidf':
vectorizer = TFIDFVectorizer(max_features, max_seq_len)
vec_name = "TF-IDF"
else:
vectorizer = BoWVectorizer(max_features, max_seq_len)
vec_name = "BoW"
print(f"正在使用{vec_name}向量化...")
X = vectorizer.fit_transform(texts)
y = labels
# 打乱并划分
np.random.seed(42)
indices = np.random.permutation(len(X))
X = X[indices]
y = y[indices]
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
print(f"训练集: {len(X_train)}条, 测试集: {len(X_test)}")
return X_train, y_train, X_test, y_test, vectorizer
if __name__ == '__main__':
# 测试
print("=" * 60)
print("测试 TF-IDF 向量化")
print("=" * 60)
X_train, y_train, X_test, y_test, vec = load_data(
'data/ChnSentiCorp', max_features=3000, max_seq_len=100,
vectorizer_type='tfidf'
)
print(f"\nX_train shape: {X_train.shape}")
print(f"X_train sample (前5个特征): {X_train[0][:5]}")
# -*- coding: utf-8 -*-
"""
配置文件 - 所有超参数集中管理
设计思路:
将超参数分门别类,学生可以单独修改某一类而不会影响其他
"""
# ==================== 数据相关 ====================
DATA_DIR = 'data/ChnSentiCorp' # 数据集路径
MAX_FEATURES = 3000 # 词表最大容量
MAX_SEQ_LEN = 100 # 句子最大长度(词数)
VECTORIZER_TYPE = 'tfidf' # 'tfidf' 或 'bow'(向量化方式)
# ==================== 模型相关 ====================
MODEL_TYPE = 'mlp' # 'mlp' 或 'lr'(模型类型)
HIDDEN_SIZE = 64 # MLP隐藏层大小LR忽略
NUM_CLASSES = 2 # 类别数(正面/负面二分类)
KEEP_PROB = 1.0 # Dropout保留概率LR忽略设为1即可
# ==================== 训练相关 ====================
LEARNING_RATE = 0.05 # 学习率
NUM_EPOCHS = 100 # 训练轮数
BATCH_SIZE = 64 # 批次大小
# ==================== 类别权重(解决数据不平衡问题)====================
USE_CLASS_WEIGHT = True # True=启用类别权重, False=不启用(对比用)
# 权重计算公式: n_samples / (n_classes * n_class_i)
# 正面评论多所以权重小,负面评论少所以权重大
CLASS_WEIGHT_POS = 0.73 # 正面类权重(自动计算)
CLASS_WEIGHT_NEG = 1.58 # 负面类权重(自动计算)
# ==================== 实验相关 ====================
RUN_COMPARISON = False # True=运行对比实验, False=运行单个模型
COMPARE_MODELS = ['lr', 'mlp'] # 要对比的模型列表
COMPARE_VECTORS = ['bow', 'tfidf'] # 要对比的向量化方式
# ==================== 其他 ====================
RANDOM_SEED = 42 # 随机种子(保证可复现)
VERBOSE = True # 打印详细日志
# -*- coding: utf-8 -*-
"""
主程序入口
使用方式:
1. 运行单个模型(默认):
python main.py
修改 config.py 中的 MODEL_TYPE 和 VECTORIZER_TYPE 来切换配置
2. 运行对比实验:
修改 config.py 中 RUN_COMPARISON = True
这会依次运行:
- 实验1: BoW vs TF-IDF (固定LR模型)
- 实验2: LR vs MLP (固定TF-IDF)
- 实验3: 不同学习率对比
- 实验4: 不同隐藏层大小对比
最后输出汇总报告
"""
from train import main
if __name__ == '__main__':
print("\n" + "=" * 70)
print("文本分类实验 - 纯NumPy实现")
print("数据集: ChnSentiCorp (中文酒店评论)")
print("模型: Logistic Regression / MLP")
print("向量化: BoW / TF-IDF")
print("=" * 70 + "\n")
main()
"""
模型模块 - 纯NumPy实现
支持两种模型:
1. Logistic Regression(逻辑回归)- 线性模型
2. MLP(多层感知机)- 两层全连接网络
设计思路:
- 两种模型都共享相同的接口,方便对比
- 代码简洁,每行都有详细注释
- 手动实现反向传播,原理透明
"""
import numpy as np
class BaseModel:
"""模型基类"""
def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): pass
def predict(self, X): pass
def predict_proba(self, X): pass
def accuracy(self, X, y): pass
class LogisticRegression(BaseModel):
"""
逻辑回归(线性分类器)
结构:输入 → 线性变换 → Softmax → 输出
原理:
- 线性变换: z = X @ W + b
- Softmax: 将线性输出转为概率分布
参数量:input_size × num_classes + num_classes
"""
def __init__(self, input_size, num_classes=2, learning_rate=0.1,
class_weight=None, seed=42):
np.random.seed(seed)
# 权重初始化(Xavier)
self.W = np.random.randn(input_size, num_classes) * np.sqrt(2.0 / input_size)
self.b = np.zeros(num_classes)
self.lr = learning_rate
self.input_size = input_size
self.num_classes = num_classes
self.class_weight = class_weight # 类别权重
total_params = input_size * num_classes + num_classes
print(f"LogisticRegression: {input_size} -> {num_classes}, 参数量: {total_params}")
def softmax(self, x):
"""Softmax函数"""
x_shifted = x - np.max(x, axis=1, keepdims=True)
exp_x = np.exp(x_shifted)
return exp_x / np.sum(exp_x, axis=1, keepdims=True)
def forward(self, X):
"""前向传播"""
# 线性变换
z = X @ self.W + self.b
# Softmax输出概率
return self.softmax(z)
def backward(self, X, y):
"""反向传播(梯度下降)"""
batch_size = X.shape[0]
probs = self.forward(X)
# Softmax + 交叉熵梯度
d_z = probs.copy()
# 应用类别权重:减去权重值而不是1
# 公式: dL/dz_y = w_y * (p_y - 1) = w_y*p_y - w_y
if self.class_weight is not None:
for i in range(batch_size):
d_z[i, y[i]] -= self.class_weight[y[i]]
else:
d_z[np.arange(batch_size), y] -= 1
# 梯度
d_W = X.T @ d_z
d_b = np.sum(d_z, axis=0)
# 更新
self.W -= self.lr * d_W / batch_size
self.b -= self.lr * d_b / batch_size
def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True):
"""训练"""
num_samples = len(X)
num_batches = (num_samples + batch_size - 1) // batch_size
for epoch in range(epochs):
# 打乱
indices = np.random.permutation(num_samples)
X_shuffled = X[indices]
y_shuffled = y[indices]
epoch_loss = 0
for batch_idx in range(num_batches):
start = batch_idx * batch_size
end = min(start + batch_size, num_samples)
X_batch = X_shuffled[start:end]
y_batch = y_shuffled[start:end]
# 前向 + 反向
probs = self.forward(X_batch)
self.backward(X_batch, y_batch)
# 损失
loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1)))
epoch_loss += loss
# 评估
if verbose and (epoch + 1) % 20 == 0:
train_acc = self.accuracy(X, y)
msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}"
if X_val is not None:
val_acc = self.accuracy(X_val, y_val)
msg += f" | 测试准确率: {val_acc:.4f}"
print(msg)
return self
def predict(self, X):
return np.argmax(self.forward(X), axis=1)
def predict_proba(self, X):
return self.forward(X)
def accuracy(self, X, y):
return np.mean(self.predict(X) == y)
def save(self, filepath):
"""保存模型权重"""
np.save(filepath + '_W.npy', self.W)
np.save(filepath + '_b.npy', self.b)
print(f"模型已保存: {filepath}")
@staticmethod
def load(filepath, input_size, num_classes=2, learning_rate=0.1):
"""加载模型权重"""
model = LogisticRegression(input_size, num_classes, learning_rate)
model.W = np.load(filepath + '_W.npy')
model.b = np.load(filepath + '_b.npy')
print(f"模型已加载: {filepath}")
return model
class MLP(BaseModel):
"""
多层感知机(神经网络)
结构:输入 → 线性变换 → ReLU → 线性变换 → Softmax → 输出
和LogisticRegression的区别:
- 多了一层隐藏层 + 非线性激活
- 可以学习非线性关系
- 参数量更大
参数量:
- W1: input_size × hidden_size
- b1: hidden_size
- W2: hidden_size × num_classes
- b2: num_classes
"""
def __init__(self, input_size, hidden_size=64, num_classes=2,
learning_rate=0.1, keep_prob=1.0, class_weight=None, seed=42):
np.random.seed(seed)
# 第一层权重
self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size)
self.b1 = np.zeros(hidden_size)
# 第二层权重
self.W2 = np.random.randn(hidden_size, num_classes) * np.sqrt(2.0 / hidden_size)
self.b2 = np.zeros(num_classes)
self.lr = learning_rate
self.keep_prob = keep_prob
self.hidden_size = hidden_size
self.input_size = input_size
self.num_classes = num_classes
self.class_weight = class_weight # 类别权重
total_params = (input_size * hidden_size + hidden_size +
hidden_size * num_classes + num_classes)
print(f"MLP: {input_size} -> {hidden_size} -> {num_classes}, 参数量: {total_params}")
def relu(self, x):
"""ReLU激活"""
return np.maximum(0, x)
def relu_derivative(self, x):
"""ReLU导数"""
return (x > 0).astype(float)
def softmax(self, x):
"""Softmax函数"""
x_shifted = x - np.max(x, axis=1, keepdims=True)
exp_x = np.exp(x_shifted)
return exp_x / np.sum(exp_x, axis=1, keepdims=True)
def forward(self, X):
"""前向传播"""
# 第一层
self.z1 = X @ self.W1 + self.b1
self.a1 = self.relu(self.z1)
# Dropout(训练时)
if self.keep_prob < 1.0 and hasattr(self, 'training'):
self.d1 = (np.random.rand(*self.a1.shape) < self.keep_prob).astype(float)
self.a1 *= self.d1
self.a1 /= self.keep_prob
# 第二层
self.z2 = self.a1 @ self.W2 + self.b2
self.probs = self.softmax(self.z2)
return self.probs
def backward(self, X, y):
"""反向传播"""
batch_size = X.shape[0]
# 输出层梯度
d_z2 = self.probs.copy()
# 应用类别权重
if self.class_weight is not None:
for i in range(batch_size):
d_z2[i, y[i]] -= self.class_weight[y[i]]
else:
d_z2[np.arange(batch_size), y] -= 1
# 第二层梯度
d_W2 = self.a1.T @ d_z2
d_b2 = np.sum(d_z2, axis=0)
# 隐藏层梯度
d_a1 = d_z2 @ self.W2.T
d_z1 = d_a1 * self.relu_derivative(self.z1)
# Dropout梯度
if self.keep_prob < 1.0 and hasattr(self, 'd1'):
d_z1 *= self.d1
d_z1 /= self.keep_prob
# 第一层梯度
d_W1 = X.T @ d_z1
d_b1 = np.sum(d_z1, axis=0)
# 更新
self.W1 -= self.lr * d_W1 / batch_size
self.b1 -= self.lr * d_b1 / batch_size
self.W2 -= self.lr * d_W2 / batch_size
self.b2 -= self.lr * d_b2 / batch_size
def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True):
"""训练"""
num_samples = len(X)
num_batches = (num_samples + batch_size - 1) // batch_size
for epoch in range(epochs):
# 打乱
indices = np.random.permutation(num_samples)
X_shuffled = X[indices]
y_shuffled = y[indices]
epoch_loss = 0
self.training = True # 开启Dropout
for batch_idx in range(num_batches):
start = batch_idx * batch_size
end = min(start + batch_size, num_samples)
X_batch = X_shuffled[start:end]
y_batch = y_shuffled[start:end]
# 前向 + 反向
probs = self.forward(X_batch)
self.backward(X_batch, y_batch)
# 损失
loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1)))
epoch_loss += loss
self.training = False # 关闭Dropout
# 评估
if verbose and (epoch + 1) % 20 == 0:
train_acc = self.accuracy(X, y)
msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}"
if X_val is not None:
val_acc = self.accuracy(X_val, y_val)
msg += f" | 测试准确率: {val_acc:.4f}"
print(msg)
return self
def predict(self, X):
return np.argmax(self.forward(X), axis=1)
def predict_proba(self, X):
return self.forward(X)
def accuracy(self, X, y):
return np.mean(self.predict(X) == y)
def save(self, filepath):
"""保存模型权重"""
np.save(filepath + '_W1.npy', self.W1)
np.save(filepath + '_b1.npy', self.b1)
np.save(filepath + '_W2.npy', self.W2)
np.save(filepath + '_b2.npy', self.b2)
print(f"模型已保存: {filepath}")
@staticmethod
def load(filepath, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0):
"""加载模型权重"""
model = MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob)
model.W1 = np.load(filepath + '_W1.npy')
model.b1 = np.load(filepath + '_b1.npy')
model.W2 = np.load(filepath + '_W2.npy')
model.b2 = np.load(filepath + '_b2.npy')
print(f"模型已加载: {filepath}")
return model
def create_model(model_type, input_size, hidden_size=64, num_classes=2,
learning_rate=0.1, keep_prob=1.0, class_weight=None):
"""工厂函数:创建模型"""
if model_type == 'lr':
return LogisticRegression(input_size, num_classes, learning_rate, class_weight)
elif model_type == 'mlp':
return MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob, class_weight)
else:
raise ValueError(f"未知模型类型: {model_type}")