From 5f0ba352b4ba0d7ea11bd980537ac175be958fd1 Mon Sep 17 00:00:00 2001 From: gitea_eternal <401029566@qq.com> Date: Mon, 27 Apr 2026 21:57:14 +0800 Subject: [PATCH] Upload dataset.py --- dataset.py | 286 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 286 insertions(+) create mode 100644 dataset.py diff --git a/dataset.py b/dataset.py new file mode 100644 index 0000000..e554362 --- /dev/null +++ b/dataset.py @@ -0,0 +1,286 @@ +# -*- coding: utf-8 -*- +""" +数据加载与向量化模块 + +支持两种向量化方法: +1. BoW (Bag of Words) - 词频向量 +2. TF-IDF - 词频-逆文档频率向量 + +TF-IDF 的优势: +- 降低常见词(如"的"、"是")的权重 +- 提升罕见词的信息量 +- 通常效果优于简单BoW +""" + +import os +import re +import csv +import math +import jieba +import numpy as np +from collections import Counter + +try: + import urllib.request + import ssl + DOWNLOAD_AVAILABLE = True +except ImportError: + DOWNLOAD_AVAILABLE = False + + +DATASET_URL = "https://raw.githubusercontent.com/SophonPlus/ChineseNlpCorpus/master/datasets/ChnSentiCorp_htl_all/ChnSentiCorp_htl_all.csv" + + +def download_dataset(data_dir): + """下载数据集(如果不存在)""" + csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') + + if os.path.exists(csv_path): + print(f"数据已存在: {csv_path}") + return True + + if not DOWNLOAD_AVAILABLE: + return False + + print("正在下载数据集...") + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + + try: + request = urllib.request.Request(DATASET_URL, headers={'User-Agent': 'Mozilla/5.0'}) + response = urllib.request.urlopen(request, timeout=120, context=ssl_context) + os.makedirs(data_dir, exist_ok=True) + with open(csv_path, 'wb') as f: + f.write(response.read()) + print(f"下载完成: {csv_path}") + return True + except Exception as e: + print(f"下载失败: {e}") + return False + + +def load_raw_data(data_dir): + """加载原始数据""" + csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') + texts, labels = [], [] + + with open(csv_path, 'r', encoding='utf-8') as f: + reader = csv.reader(f) + for row in reader: + if len(row) < 2: + continue + try: + label = int(row[0]) + review = row[1].strip() + if review: + texts.append(review) + labels.append(label) + except (ValueError, IndexError): + continue + + return texts, np.array(labels) + + +def tokenize(text): + """中文分词""" + text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text) + words = jieba.lcut(text) + return [w for w in words if len(w) > 1] + + +# ==================== 向量化器 ==================== + +class BaseVectorizer: + """向量化器基类""" + def fit(self, texts): pass + def transform(self, texts): pass + def fit_transform(self, texts): pass + + +class BoWVectorizer(BaseVectorizer): + """ + 词袋模型 (Bag of Words) + + 原理:统计每个词在文本中出现的次数 + 向量维度 = 词表大小 + 每个维度 = 该词在本文本中出现的次数 + """ + + def __init__(self, max_features, max_seq_len): + self.max_features = max_features + self.max_seq_len = max_seq_len + self.vocab = {} + self.doc_freq = {} # 文档频率 + self.num_docs = 0 + + def fit(self, texts): + """构建词表(基于词频)""" + counter = Counter() + doc_counter = Counter() # 统计包含该词的文档数 + + for text in texts: + words = tokenize(text) + unique_words = set(words) + counter.update(words) + for w in unique_words: + doc_counter[w] += 1 + + self.num_docs = len(texts) + + # 取最高频的词 + most_common = counter.most_common(self.max_features) + self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)} + + # 记录文档频率(用于TF-IDF) + self.doc_freq = {w: doc_counter[w] for w in self.vocab} + + print(f" BoW词表大小: {len(self.vocab)}") + return self + + def transform(self, texts): + """将文本转换为词频向量""" + vectors = [] + for text in texts: + words = tokenize(text) + freq = [0] * self.max_seq_len + for i, word in enumerate(words[:self.max_seq_len]): + if word in self.vocab: + freq[i] = 1 # 二值(出现=1,不出现=0) + vectors.append(freq) + return np.array(vectors, dtype=np.float32) + + def fit_transform(self, texts): + self.fit(texts) + return self.transform(texts) + + +class TFIDFVectorizer(BaseVectorizer): + """ + TF-IDF 向量器 + + 原理: + - TF(词频) = 词在本文本中出现的次数 + - IDF(逆文档频率) = log(总文档数 / 包含该词的文档数) + - TF-IDF = TF × IDF + + 优势: + - 降低常见无意义词的权重(如"的"、"是") + - 提升罕见但有信息量的词 + """ + + def __init__(self, max_features, max_seq_len): + self.max_features = max_features + self.max_seq_len = max_seq_len + self.vocab = {} + self.idf = {} # 存储每个词的IDF值 + self.num_docs = 0 + + def fit(self, texts): + """构建词表并计算IDF""" + counter = Counter() + doc_counter = Counter() + + for text in texts: + words = tokenize(text) + unique_words = set(words) + counter.update(words) + for w in unique_words: + doc_counter[w] += 1 + + self.num_docs = len(texts) + + # 计算每个词的IDF + # IDF = log(总文档数 / 包含该词的文档数) + idf_values = {} + for word, df in doc_counter.items(): + idf_values[word] = math.log(self.num_docs / (df + 1)) + 1 # 加1防零 + + # 取IDF值最高的词(信息量最大的词) + sorted_words = sorted(idf_values.items(), key=lambda x: x[1], reverse=True) + self.vocab = {word: idx for idx, (word, _) in enumerate(sorted_words[:self.max_features])} + + # 保存IDF值 + self.idf = {word: idf_values[word] for word in self.vocab} + + print(f" TF-IDF词表大小: {len(self.vocab)}") + print(f" 平均IDF: {np.mean(list(self.idf.values())):.3f}") + return self + + def transform(self, texts): + """将文本转换为TF-IDF向量""" + vectors = [] + for text in texts: + words = tokenize(text) + + # 计算TF + tf = Counter(words) + tf_sum = len(words) if words else 1 + + # 生成向量 + vec = [0.0] * self.max_seq_len + for i, word in enumerate(words[:self.max_seq_len]): + if word in self.vocab: + # TF × IDF + vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0) + vectors.append(vec) + + return np.array(vectors, dtype=np.float32) + + def fit_transform(self, texts): + self.fit(texts) + return self.transform(texts) + + +def load_data(data_dir, max_features, max_seq_len, vectorizer_type='tfidf'): + """ + 加载并向量化数据 + + 参数: + - vectorizer_type: 'tfidf' 或 'bow' + """ + if not download_dataset(data_dir): + raise RuntimeError("数据加载失败,请检查网络或手动下载数据集") + + print("正在加载数据...") + texts, labels = load_raw_data(data_dir) + print(f"总评论数: {len(texts)}, 正面: {sum(labels)}, 负面: {len(labels) - sum(labels)}") + + # 选择向量化器 + if vectorizer_type == 'tfidf': + vectorizer = TFIDFVectorizer(max_features, max_seq_len) + vec_name = "TF-IDF" + else: + vectorizer = BoWVectorizer(max_features, max_seq_len) + vec_name = "BoW" + + print(f"正在使用{vec_name}向量化...") + X = vectorizer.fit_transform(texts) + y = labels + + # 打乱并划分 + np.random.seed(42) + indices = np.random.permutation(len(X)) + X = X[indices] + y = y[indices] + + split_idx = int(len(X) * 0.8) + X_train, X_test = X[:split_idx], X[split_idx:] + y_train, y_test = y[:split_idx], y[split_idx:] + + print(f"训练集: {len(X_train)}条, 测试集: {len(X_test)}条") + + return X_train, y_train, X_test, y_test, vectorizer + + +if __name__ == '__main__': + # 测试 + print("=" * 60) + print("测试 TF-IDF 向量化") + print("=" * 60) + X_train, y_train, X_test, y_test, vec = load_data( + 'data/ChnSentiCorp', max_features=3000, max_seq_len=100, + vectorizer_type='tfidf' + ) + print(f"\nX_train shape: {X_train.shape}") + print(f"X_train sample (前5个特征): {X_train[0][:5]}")