diff --git a/4.30.py b/4.30.py new file mode 100644 index 0000000..bff9ba4 --- /dev/null +++ b/4.30.py @@ -0,0 +1,700 @@ +""" +数据加载与向量化模块 + +支持两种向量化方法: +1. BoW (Bag of Words) - 词频向量 +2. TF-IDF - 词频-逆文档频率向量 + +TF-IDF 的优势: +- 降低常见词(如"的"、"是")的权重 +- 提升罕见词的信息量 +- 通常效果优于简单BoW +""" + +import os +import re +import csv +import math +import jieba +import numpy as np +from collections import Counter + +try: + import urllib.request + import ssl + DOWNLOAD_AVAILABLE = True +except ImportError: + DOWNLOAD_AVAILABLE = False + + +DATASET_URL = "https://raw.githubusercontent.com/SophonPlus/ChineseNlpCorpus/master/datasets/ChnSentiCorp_htl_all/ChnSentiCorp_htl_all.csv" + + +def download_dataset(data_dir): + """下载数据集(如果不存在)""" + csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') + + if os.path.exists(csv_path): + print(f"数据已存在: {csv_path}") + return True + + if not DOWNLOAD_AVAILABLE: + return False + + print("正在下载数据集...") + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + + try: + request = urllib.request.Request(DATASET_URL, headers={'User-Agent': 'Mozilla/5.0'}) + response = urllib.request.urlopen(request, timeout=120, context=ssl_context) + os.makedirs(data_dir, exist_ok=True) + with open(csv_path, 'wb') as f: + f.write(response.read()) + print(f"下载完成: {csv_path}") + return True + except Exception as e: + print(f"下载失败: {e}") + return False + + +def load_raw_data(data_dir): + """加载原始数据""" + csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') + texts, labels = [], [] + + with open(csv_path, 'r', encoding='utf-8') as f: + reader = csv.reader(f) + for row in reader: + if len(row) < 2: + continue + try: + label = int(row[0]) + review = row[1].strip() + if review: + texts.append(review) + labels.append(label) + except (ValueError, IndexError): + continue + + return texts, np.array(labels) + + +def tokenize(text): + """中文分词""" + text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text) + words = jieba.lcut(text) + return [w for w in words if len(w) > 1] + + +# ==================== 向量化器 ==================== + +class BaseVectorizer: + """向量化器基类""" + def fit(self, texts): pass + def transform(self, texts): pass + def fit_transform(self, texts): pass + + +class BoWVectorizer(BaseVectorizer): + """ + 词袋模型 (Bag of Words) + + 原理:统计每个词在文本中出现的次数 + 向量维度 = 词表大小 + 每个维度 = 该词在本文本中出现的次数 + """ + + def __init__(self, max_features, max_seq_len): + self.max_features = max_features + self.max_seq_len = max_seq_len + self.vocab = {} + self.doc_freq = {} # 文档频率 + self.num_docs = 0 + + def fit(self, texts): + """构建词表(基于词频)""" + counter = Counter() + doc_counter = Counter() # 统计包含该词的文档数 + + for text in texts: + words = tokenize(text) + unique_words = set(words) + counter.update(words) + for w in unique_words: + doc_counter[w] += 1 + + self.num_docs = len(texts) + + # 取最高频的词 + most_common = counter.most_common(self.max_features) + self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)} + + # 记录文档频率(用于TF-IDF) + self.doc_freq = {w: doc_counter[w] for w in self.vocab} + + print(f" BoW词表大小: {len(self.vocab)}") + return self + + def transform(self, texts): + """将文本转换为词频向量""" + vectors = [] + for text in texts: + words = tokenize(text) + freq = [0] * self.max_seq_len + for i, word in enumerate(words[:self.max_seq_len]): + if word in self.vocab: + freq[i] = 1 # 二值(出现=1,不出现=0) + vectors.append(freq) + return np.array(vectors, dtype=np.float32) + + def fit_transform(self, texts): + self.fit(texts) + return self.transform(texts) + + +class TFIDFVectorizer(BaseVectorizer): + """ + TF-IDF 向量器 + + 原理: + - TF(词频) = 词在本文本中出现的次数 + - IDF(逆文档频率) = log(总文档数 / 包含该词的文档数) + - TF-IDF = TF × IDF + + 优势: + - 降低常见无意义词的权重(如"的"、"是") + - 提升罕见但有信息量的词 + """ + + def __init__(self, max_features, max_seq_len): + self.max_features = max_features + self.max_seq_len = max_seq_len + self.vocab = {} + self.idf = {} # 存储每个词的IDF值 + self.num_docs = 0 + + def fit(self, texts): + """构建词表并计算IDF""" + counter = Counter() + doc_counter = Counter() + + for text in texts: + words = tokenize(text) + unique_words = set(words) + counter.update(words) + for w in unique_words: + doc_counter[w] += 1 + + self.num_docs = len(texts) + + # 计算每个词的IDF + # IDF = log(总文档数 / 包含该词的文档数) + idf_values = {} + for word, df in doc_counter.items(): + idf_values[word] = math.log(self.num_docs / (df + 1)) + 1 # 加1防零 + + # 取IDF值最高的词(信息量最大的词) + sorted_words = sorted(idf_values.items(), key=lambda x: x[1], reverse=True) + self.vocab = {word: idx for idx, (word, _) in enumerate(sorted_words[:self.max_features])} + + # 保存IDF值 + self.idf = {word: idf_values[word] for word in self.vocab} + + print(f" TF-IDF词表大小: {len(self.vocab)}") + print(f" 平均IDF: {np.mean(list(self.idf.values())):.3f}") + return self + + def transform(self, texts): + """将文本转换为TF-IDF向量""" + vectors = [] + for text in texts: + words = tokenize(text) + + # 计算TF + tf = Counter(words) + tf_sum = len(words) if words else 1 + + # 生成向量 + vec = [0.0] * self.max_seq_len + for i, word in enumerate(words[:self.max_seq_len]): + if word in self.vocab: + # TF × IDF + vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0) + vectors.append(vec) + + return np.array(vectors, dtype=np.float32) + + def fit_transform(self, texts): + self.fit(texts) + return self.transform(texts) + + +def load_data(data_dir, max_features, max_seq_len, vectorizer_type='tfidf'): + """ + 加载并向量化数据 + + 参数: + - vectorizer_type: 'tfidf' 或 'bow' + """ + if not download_dataset(data_dir): + raise RuntimeError("数据加载失败,请检查网络或手动下载数据集") + + print("正在加载数据...") + texts, labels = load_raw_data(data_dir) + print(f"总评论数: {len(texts)}, 正面: {sum(labels)}, 负面: {len(labels) - sum(labels)}") + + # 选择向量化器 + if vectorizer_type == 'tfidf': + vectorizer = TFIDFVectorizer(max_features, max_seq_len) + vec_name = "TF-IDF" + else: + vectorizer = BoWVectorizer(max_features, max_seq_len) + vec_name = "BoW" + + print(f"正在使用{vec_name}向量化...") + X = vectorizer.fit_transform(texts) + y = labels + + # 打乱并划分 + np.random.seed(42) + indices = np.random.permutation(len(X)) + X = X[indices] + y = y[indices] + + split_idx = int(len(X) * 0.8) + X_train, X_test = X[:split_idx], X[split_idx:] + y_train, y_test = y[:split_idx], y[split_idx:] + + print(f"训练集: {len(X_train)}条, 测试集: {len(X_test)}条") + + return X_train, y_train, X_test, y_test, vectorizer + + +if __name__ == '__main__': + # 测试 + print("=" * 60) + print("测试 TF-IDF 向量化") + print("=" * 60) + X_train, y_train, X_test, y_test, vec = load_data( + 'data/ChnSentiCorp', max_features=3000, max_seq_len=100, + vectorizer_type='tfidf' + ) + print(f"\nX_train shape: {X_train.shape}") + print(f"X_train sample (前5个特征): {X_train[0][:5]}") +# -*- coding: utf-8 -*- +""" +配置文件 - 所有超参数集中管理 + +设计思路: +将超参数分门别类,学生可以单独修改某一类而不会影响其他 +""" + +# ==================== 数据相关 ==================== +DATA_DIR = 'data/ChnSentiCorp' # 数据集路径 +MAX_FEATURES = 3000 # 词表最大容量 +MAX_SEQ_LEN = 100 # 句子最大长度(词数) +VECTORIZER_TYPE = 'tfidf' # 'tfidf' 或 'bow'(向量化方式) + +# ==================== 模型相关 ==================== +MODEL_TYPE = 'mlp' # 'mlp' 或 'lr'(模型类型) +HIDDEN_SIZE = 64 # MLP隐藏层大小(LR忽略) +NUM_CLASSES = 2 # 类别数(正面/负面二分类) +KEEP_PROB = 1.0 # Dropout保留概率(LR忽略,设为1即可) + +# ==================== 训练相关 ==================== +LEARNING_RATE = 0.05 # 学习率 +NUM_EPOCHS = 100 # 训练轮数 +BATCH_SIZE = 64 # 批次大小 + +# ==================== 类别权重(解决数据不平衡问题)==================== +USE_CLASS_WEIGHT = True # True=启用类别权重, False=不启用(对比用) +# 权重计算公式: n_samples / (n_classes * n_class_i) +# 正面评论多所以权重小,负面评论少所以权重大 +CLASS_WEIGHT_POS = 0.73 # 正面类权重(自动计算) +CLASS_WEIGHT_NEG = 1.58 # 负面类权重(自动计算) + +# ==================== 实验相关 ==================== +RUN_COMPARISON = False # True=运行对比实验, False=运行单个模型 +COMPARE_MODELS = ['lr', 'mlp'] # 要对比的模型列表 +COMPARE_VECTORS = ['bow', 'tfidf'] # 要对比的向量化方式 + +# ==================== 其他 ==================== +RANDOM_SEED = 42 # 随机种子(保证可复现) +VERBOSE = True # 打印详细日志 +# -*- coding: utf-8 -*- +""" +主程序入口 + +使用方式: + +1. 运行单个模型(默认): + python main.py + + 修改 config.py 中的 MODEL_TYPE 和 VECTORIZER_TYPE 来切换配置 + +2. 运行对比实验: + 修改 config.py 中 RUN_COMPARISON = True + + 这会依次运行: + - 实验1: BoW vs TF-IDF (固定LR模型) + - 实验2: LR vs MLP (固定TF-IDF) + - 实验3: 不同学习率对比 + - 实验4: 不同隐藏层大小对比 + + 最后输出汇总报告 +""" + +from train import main + +if __name__ == '__main__': + print("\n" + "=" * 70) + print("文本分类实验 - 纯NumPy实现") + print("数据集: ChnSentiCorp (中文酒店评论)") + print("模型: Logistic Regression / MLP") + print("向量化: BoW / TF-IDF") + print("=" * 70 + "\n") + + main() +""" +模型模块 - 纯NumPy实现 + +支持两种模型: +1. Logistic Regression(逻辑回归)- 线性模型 +2. MLP(多层感知机)- 两层全连接网络 + +设计思路: +- 两种模型都共享相同的接口,方便对比 +- 代码简洁,每行都有详细注释 +- 手动实现反向传播,原理透明 +""" + +import numpy as np + + +class BaseModel: + """模型基类""" + def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): pass + def predict(self, X): pass + def predict_proba(self, X): pass + def accuracy(self, X, y): pass + + +class LogisticRegression(BaseModel): + """ + 逻辑回归(线性分类器) + + 结构:输入 → 线性变换 → Softmax → 输出 + + 原理: + - 线性变换: z = X @ W + b + - Softmax: 将线性输出转为概率分布 + + 参数量:input_size × num_classes + num_classes + """ + + def __init__(self, input_size, num_classes=2, learning_rate=0.1, + class_weight=None, seed=42): + np.random.seed(seed) + + # 权重初始化(Xavier) + self.W = np.random.randn(input_size, num_classes) * np.sqrt(2.0 / input_size) + self.b = np.zeros(num_classes) + + self.lr = learning_rate + self.input_size = input_size + self.num_classes = num_classes + self.class_weight = class_weight # 类别权重 + + total_params = input_size * num_classes + num_classes + print(f"LogisticRegression: {input_size} -> {num_classes}, 参数量: {total_params}") + + def softmax(self, x): + """Softmax函数""" + x_shifted = x - np.max(x, axis=1, keepdims=True) + exp_x = np.exp(x_shifted) + return exp_x / np.sum(exp_x, axis=1, keepdims=True) + + def forward(self, X): + """前向传播""" + # 线性变换 + z = X @ self.W + self.b + # Softmax输出概率 + return self.softmax(z) + + def backward(self, X, y): + """反向传播(梯度下降)""" + batch_size = X.shape[0] + probs = self.forward(X) + + # Softmax + 交叉熵梯度 + d_z = probs.copy() + + # 应用类别权重:减去权重值而不是1 + # 公式: dL/dz_y = w_y * (p_y - 1) = w_y*p_y - w_y + if self.class_weight is not None: + for i in range(batch_size): + d_z[i, y[i]] -= self.class_weight[y[i]] + else: + d_z[np.arange(batch_size), y] -= 1 + + # 梯度 + d_W = X.T @ d_z + d_b = np.sum(d_z, axis=0) + + # 更新 + self.W -= self.lr * d_W / batch_size + self.b -= self.lr * d_b / batch_size + + def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): + """训练""" + num_samples = len(X) + num_batches = (num_samples + batch_size - 1) // batch_size + + for epoch in range(epochs): + # 打乱 + indices = np.random.permutation(num_samples) + X_shuffled = X[indices] + y_shuffled = y[indices] + + epoch_loss = 0 + for batch_idx in range(num_batches): + start = batch_idx * batch_size + end = min(start + batch_size, num_samples) + X_batch = X_shuffled[start:end] + y_batch = y_shuffled[start:end] + + # 前向 + 反向 + probs = self.forward(X_batch) + self.backward(X_batch, y_batch) + + # 损失 + loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) + epoch_loss += loss + + # 评估 + if verbose and (epoch + 1) % 20 == 0: + train_acc = self.accuracy(X, y) + msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" + if X_val is not None: + val_acc = self.accuracy(X_val, y_val) + msg += f" | 测试准确率: {val_acc:.4f}" + print(msg) + + return self + + def predict(self, X): + return np.argmax(self.forward(X), axis=1) + + def predict_proba(self, X): + return self.forward(X) + + def accuracy(self, X, y): + return np.mean(self.predict(X) == y) + + def save(self, filepath): + """保存模型权重""" + np.save(filepath + '_W.npy', self.W) + np.save(filepath + '_b.npy', self.b) + print(f"模型已保存: {filepath}") + + @staticmethod + def load(filepath, input_size, num_classes=2, learning_rate=0.1): + """加载模型权重""" + model = LogisticRegression(input_size, num_classes, learning_rate) + model.W = np.load(filepath + '_W.npy') + model.b = np.load(filepath + '_b.npy') + print(f"模型已加载: {filepath}") + return model + + +class MLP(BaseModel): + """ + 多层感知机(神经网络) + + 结构:输入 → 线性变换 → ReLU → 线性变换 → Softmax → 输出 + + 和LogisticRegression的区别: + - 多了一层隐藏层 + 非线性激活 + - 可以学习非线性关系 + - 参数量更大 + + 参数量: + - W1: input_size × hidden_size + - b1: hidden_size + - W2: hidden_size × num_classes + - b2: num_classes + """ + + def __init__(self, input_size, hidden_size=64, num_classes=2, + learning_rate=0.1, keep_prob=1.0, class_weight=None, seed=42): + np.random.seed(seed) + + # 第一层权重 + self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size) + self.b1 = np.zeros(hidden_size) + + # 第二层权重 + self.W2 = np.random.randn(hidden_size, num_classes) * np.sqrt(2.0 / hidden_size) + self.b2 = np.zeros(num_classes) + + self.lr = learning_rate + self.keep_prob = keep_prob + self.hidden_size = hidden_size + self.input_size = input_size + self.num_classes = num_classes + self.class_weight = class_weight # 类别权重 + + total_params = (input_size * hidden_size + hidden_size + + hidden_size * num_classes + num_classes) + print(f"MLP: {input_size} -> {hidden_size} -> {num_classes}, 参数量: {total_params}") + + def relu(self, x): + """ReLU激活""" + return np.maximum(0, x) + + def relu_derivative(self, x): + """ReLU导数""" + return (x > 0).astype(float) + + def softmax(self, x): + """Softmax函数""" + x_shifted = x - np.max(x, axis=1, keepdims=True) + exp_x = np.exp(x_shifted) + return exp_x / np.sum(exp_x, axis=1, keepdims=True) + + def forward(self, X): + """前向传播""" + # 第一层 + self.z1 = X @ self.W1 + self.b1 + self.a1 = self.relu(self.z1) + + # Dropout(训练时) + if self.keep_prob < 1.0 and hasattr(self, 'training'): + self.d1 = (np.random.rand(*self.a1.shape) < self.keep_prob).astype(float) + self.a1 *= self.d1 + self.a1 /= self.keep_prob + + # 第二层 + self.z2 = self.a1 @ self.W2 + self.b2 + self.probs = self.softmax(self.z2) + + return self.probs + + def backward(self, X, y): + """反向传播""" + batch_size = X.shape[0] + + # 输出层梯度 + d_z2 = self.probs.copy() + + # 应用类别权重 + if self.class_weight is not None: + for i in range(batch_size): + d_z2[i, y[i]] -= self.class_weight[y[i]] + else: + d_z2[np.arange(batch_size), y] -= 1 + + # 第二层梯度 + d_W2 = self.a1.T @ d_z2 + d_b2 = np.sum(d_z2, axis=0) + + # 隐藏层梯度 + d_a1 = d_z2 @ self.W2.T + d_z1 = d_a1 * self.relu_derivative(self.z1) + + # Dropout梯度 + if self.keep_prob < 1.0 and hasattr(self, 'd1'): + d_z1 *= self.d1 + d_z1 /= self.keep_prob + + # 第一层梯度 + d_W1 = X.T @ d_z1 + d_b1 = np.sum(d_z1, axis=0) + + # 更新 + self.W1 -= self.lr * d_W1 / batch_size + self.b1 -= self.lr * d_b1 / batch_size + self.W2 -= self.lr * d_W2 / batch_size + self.b2 -= self.lr * d_b2 / batch_size + + def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): + """训练""" + num_samples = len(X) + num_batches = (num_samples + batch_size - 1) // batch_size + + for epoch in range(epochs): + # 打乱 + indices = np.random.permutation(num_samples) + X_shuffled = X[indices] + y_shuffled = y[indices] + + epoch_loss = 0 + self.training = True # 开启Dropout + + for batch_idx in range(num_batches): + start = batch_idx * batch_size + end = min(start + batch_size, num_samples) + X_batch = X_shuffled[start:end] + y_batch = y_shuffled[start:end] + + # 前向 + 反向 + probs = self.forward(X_batch) + self.backward(X_batch, y_batch) + + # 损失 + loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) + epoch_loss += loss + + self.training = False # 关闭Dropout + + # 评估 + if verbose and (epoch + 1) % 20 == 0: + train_acc = self.accuracy(X, y) + msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" + if X_val is not None: + val_acc = self.accuracy(X_val, y_val) + msg += f" | 测试准确率: {val_acc:.4f}" + print(msg) + + return self + + def predict(self, X): + return np.argmax(self.forward(X), axis=1) + + def predict_proba(self, X): + return self.forward(X) + + def accuracy(self, X, y): + return np.mean(self.predict(X) == y) + + def save(self, filepath): + """保存模型权重""" + np.save(filepath + '_W1.npy', self.W1) + np.save(filepath + '_b1.npy', self.b1) + np.save(filepath + '_W2.npy', self.W2) + np.save(filepath + '_b2.npy', self.b2) + print(f"模型已保存: {filepath}") + + @staticmethod + def load(filepath, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0): + """加载模型权重""" + model = MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob) + model.W1 = np.load(filepath + '_W1.npy') + model.b1 = np.load(filepath + '_b1.npy') + model.W2 = np.load(filepath + '_W2.npy') + model.b2 = np.load(filepath + '_b2.npy') + print(f"模型已加载: {filepath}") + return model + + +def create_model(model_type, input_size, hidden_size=64, num_classes=2, + learning_rate=0.1, keep_prob=1.0, class_weight=None): + """工厂函数:创建模型""" + if model_type == 'lr': + return LogisticRegression(input_size, num_classes, learning_rate, class_weight) + elif model_type == 'mlp': + return MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob, class_weight) + else: + raise ValueError(f"未知模型类型: {model_type}") diff --git a/新建 XLS 工作表.xls b/新建 XLS 工作表.xls new file mode 100644 index 0000000..1b4a4e1 Binary files /dev/null and b/新建 XLS 工作表.xls differ