Upload dataset.py
This commit is contained in:
286
dataset.py
Normal file
286
dataset.py
Normal file
@@ -0,0 +1,286 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
数据加载与向量化模块
|
||||
|
||||
支持两种向量化方法:
|
||||
1. BoW (Bag of Words) - 词频向量
|
||||
2. TF-IDF - 词频-逆文档频率向量
|
||||
|
||||
TF-IDF 的优势:
|
||||
- 降低常见词(如"的"、"是")的权重
|
||||
- 提升罕见词的信息量
|
||||
- 通常效果优于简单BoW
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import csv
|
||||
import math
|
||||
import jieba
|
||||
import numpy as np
|
||||
from collections import Counter
|
||||
|
||||
try:
|
||||
import urllib.request
|
||||
import ssl
|
||||
DOWNLOAD_AVAILABLE = True
|
||||
except ImportError:
|
||||
DOWNLOAD_AVAILABLE = False
|
||||
|
||||
|
||||
DATASET_URL = "https://raw.githubusercontent.com/SophonPlus/ChineseNlpCorpus/master/datasets/ChnSentiCorp_htl_all/ChnSentiCorp_htl_all.csv"
|
||||
|
||||
|
||||
def download_dataset(data_dir):
|
||||
"""下载数据集(如果不存在)"""
|
||||
csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv')
|
||||
|
||||
if os.path.exists(csv_path):
|
||||
print(f"数据已存在: {csv_path}")
|
||||
return True
|
||||
|
||||
if not DOWNLOAD_AVAILABLE:
|
||||
return False
|
||||
|
||||
print("正在下载数据集...")
|
||||
ssl_context = ssl.create_default_context()
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = ssl.CERT_NONE
|
||||
|
||||
try:
|
||||
request = urllib.request.Request(DATASET_URL, headers={'User-Agent': 'Mozilla/5.0'})
|
||||
response = urllib.request.urlopen(request, timeout=120, context=ssl_context)
|
||||
os.makedirs(data_dir, exist_ok=True)
|
||||
with open(csv_path, 'wb') as f:
|
||||
f.write(response.read())
|
||||
print(f"下载完成: {csv_path}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"下载失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def load_raw_data(data_dir):
|
||||
"""加载原始数据"""
|
||||
csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv')
|
||||
texts, labels = [], []
|
||||
|
||||
with open(csv_path, 'r', encoding='utf-8') as f:
|
||||
reader = csv.reader(f)
|
||||
for row in reader:
|
||||
if len(row) < 2:
|
||||
continue
|
||||
try:
|
||||
label = int(row[0])
|
||||
review = row[1].strip()
|
||||
if review:
|
||||
texts.append(review)
|
||||
labels.append(label)
|
||||
except (ValueError, IndexError):
|
||||
continue
|
||||
|
||||
return texts, np.array(labels)
|
||||
|
||||
|
||||
def tokenize(text):
|
||||
"""中文分词"""
|
||||
text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text)
|
||||
words = jieba.lcut(text)
|
||||
return [w for w in words if len(w) > 1]
|
||||
|
||||
|
||||
# ==================== 向量化器 ====================
|
||||
|
||||
class BaseVectorizer:
|
||||
"""向量化器基类"""
|
||||
def fit(self, texts): pass
|
||||
def transform(self, texts): pass
|
||||
def fit_transform(self, texts): pass
|
||||
|
||||
|
||||
class BoWVectorizer(BaseVectorizer):
|
||||
"""
|
||||
词袋模型 (Bag of Words)
|
||||
|
||||
原理:统计每个词在文本中出现的次数
|
||||
向量维度 = 词表大小
|
||||
每个维度 = 该词在本文本中出现的次数
|
||||
"""
|
||||
|
||||
def __init__(self, max_features, max_seq_len):
|
||||
self.max_features = max_features
|
||||
self.max_seq_len = max_seq_len
|
||||
self.vocab = {}
|
||||
self.doc_freq = {} # 文档频率
|
||||
self.num_docs = 0
|
||||
|
||||
def fit(self, texts):
|
||||
"""构建词表(基于词频)"""
|
||||
counter = Counter()
|
||||
doc_counter = Counter() # 统计包含该词的文档数
|
||||
|
||||
for text in texts:
|
||||
words = tokenize(text)
|
||||
unique_words = set(words)
|
||||
counter.update(words)
|
||||
for w in unique_words:
|
||||
doc_counter[w] += 1
|
||||
|
||||
self.num_docs = len(texts)
|
||||
|
||||
# 取最高频的词
|
||||
most_common = counter.most_common(self.max_features)
|
||||
self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)}
|
||||
|
||||
# 记录文档频率(用于TF-IDF)
|
||||
self.doc_freq = {w: doc_counter[w] for w in self.vocab}
|
||||
|
||||
print(f" BoW词表大小: {len(self.vocab)}")
|
||||
return self
|
||||
|
||||
def transform(self, texts):
|
||||
"""将文本转换为词频向量"""
|
||||
vectors = []
|
||||
for text in texts:
|
||||
words = tokenize(text)
|
||||
freq = [0] * self.max_seq_len
|
||||
for i, word in enumerate(words[:self.max_seq_len]):
|
||||
if word in self.vocab:
|
||||
freq[i] = 1 # 二值(出现=1,不出现=0)
|
||||
vectors.append(freq)
|
||||
return np.array(vectors, dtype=np.float32)
|
||||
|
||||
def fit_transform(self, texts):
|
||||
self.fit(texts)
|
||||
return self.transform(texts)
|
||||
|
||||
|
||||
class TFIDFVectorizer(BaseVectorizer):
|
||||
"""
|
||||
TF-IDF 向量器
|
||||
|
||||
原理:
|
||||
- TF(词频) = 词在本文本中出现的次数
|
||||
- IDF(逆文档频率) = log(总文档数 / 包含该词的文档数)
|
||||
- TF-IDF = TF × IDF
|
||||
|
||||
优势:
|
||||
- 降低常见无意义词的权重(如"的"、"是")
|
||||
- 提升罕见但有信息量的词
|
||||
"""
|
||||
|
||||
def __init__(self, max_features, max_seq_len):
|
||||
self.max_features = max_features
|
||||
self.max_seq_len = max_seq_len
|
||||
self.vocab = {}
|
||||
self.idf = {} # 存储每个词的IDF值
|
||||
self.num_docs = 0
|
||||
|
||||
def fit(self, texts):
|
||||
"""构建词表并计算IDF"""
|
||||
counter = Counter()
|
||||
doc_counter = Counter()
|
||||
|
||||
for text in texts:
|
||||
words = tokenize(text)
|
||||
unique_words = set(words)
|
||||
counter.update(words)
|
||||
for w in unique_words:
|
||||
doc_counter[w] += 1
|
||||
|
||||
self.num_docs = len(texts)
|
||||
|
||||
# 计算每个词的IDF
|
||||
# IDF = log(总文档数 / 包含该词的文档数)
|
||||
idf_values = {}
|
||||
for word, df in doc_counter.items():
|
||||
idf_values[word] = math.log(self.num_docs / (df + 1)) + 1 # 加1防零
|
||||
|
||||
# 取IDF值最高的词(信息量最大的词)
|
||||
sorted_words = sorted(idf_values.items(), key=lambda x: x[1], reverse=True)
|
||||
self.vocab = {word: idx for idx, (word, _) in enumerate(sorted_words[:self.max_features])}
|
||||
|
||||
# 保存IDF值
|
||||
self.idf = {word: idf_values[word] for word in self.vocab}
|
||||
|
||||
print(f" TF-IDF词表大小: {len(self.vocab)}")
|
||||
print(f" 平均IDF: {np.mean(list(self.idf.values())):.3f}")
|
||||
return self
|
||||
|
||||
def transform(self, texts):
|
||||
"""将文本转换为TF-IDF向量"""
|
||||
vectors = []
|
||||
for text in texts:
|
||||
words = tokenize(text)
|
||||
|
||||
# 计算TF
|
||||
tf = Counter(words)
|
||||
tf_sum = len(words) if words else 1
|
||||
|
||||
# 生成向量
|
||||
vec = [0.0] * self.max_seq_len
|
||||
for i, word in enumerate(words[:self.max_seq_len]):
|
||||
if word in self.vocab:
|
||||
# TF × IDF
|
||||
vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0)
|
||||
vectors.append(vec)
|
||||
|
||||
return np.array(vectors, dtype=np.float32)
|
||||
|
||||
def fit_transform(self, texts):
|
||||
self.fit(texts)
|
||||
return self.transform(texts)
|
||||
|
||||
|
||||
def load_data(data_dir, max_features, max_seq_len, vectorizer_type='tfidf'):
|
||||
"""
|
||||
加载并向量化数据
|
||||
|
||||
参数:
|
||||
- vectorizer_type: 'tfidf' 或 'bow'
|
||||
"""
|
||||
if not download_dataset(data_dir):
|
||||
raise RuntimeError("数据加载失败,请检查网络或手动下载数据集")
|
||||
|
||||
print("正在加载数据...")
|
||||
texts, labels = load_raw_data(data_dir)
|
||||
print(f"总评论数: {len(texts)}, 正面: {sum(labels)}, 负面: {len(labels) - sum(labels)}")
|
||||
|
||||
# 选择向量化器
|
||||
if vectorizer_type == 'tfidf':
|
||||
vectorizer = TFIDFVectorizer(max_features, max_seq_len)
|
||||
vec_name = "TF-IDF"
|
||||
else:
|
||||
vectorizer = BoWVectorizer(max_features, max_seq_len)
|
||||
vec_name = "BoW"
|
||||
|
||||
print(f"正在使用{vec_name}向量化...")
|
||||
X = vectorizer.fit_transform(texts)
|
||||
y = labels
|
||||
|
||||
# 打乱并划分
|
||||
np.random.seed(42)
|
||||
indices = np.random.permutation(len(X))
|
||||
X = X[indices]
|
||||
y = y[indices]
|
||||
|
||||
split_idx = int(len(X) * 0.8)
|
||||
X_train, X_test = X[:split_idx], X[split_idx:]
|
||||
y_train, y_test = y[:split_idx], y[split_idx:]
|
||||
|
||||
print(f"训练集: {len(X_train)}条, 测试集: {len(X_test)}条")
|
||||
|
||||
return X_train, y_train, X_test, y_test, vectorizer
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 测试
|
||||
print("=" * 60)
|
||||
print("测试 TF-IDF 向量化")
|
||||
print("=" * 60)
|
||||
X_train, y_train, X_test, y_test, vec = load_data(
|
||||
'data/ChnSentiCorp', max_features=3000, max_seq_len=100,
|
||||
vectorizer_type='tfidf'
|
||||
)
|
||||
print(f"\nX_train shape: {X_train.shape}")
|
||||
print(f"X_train sample (前5个特征): {X_train[0][:5]}")
|
||||
Reference in New Issue
Block a user