From 308de282a89c229663f548705b37ad329e3731bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=85=B4=E5=AE=87?= <2505155046@student.example.com> Date: Thu, 30 Apr 2026 16:02:56 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 49张兴宇.xls | Bin 0 -> 24064 bytes config.py | 80 +++--- dataset.py | 572 ++++++++++++++++++++--------------------- main.py | 68 ++--- model_numpy.py | 684 ++++++++++++++++++++++++------------------------- 5 files changed, 702 insertions(+), 702 deletions(-) create mode 100644 49张兴宇.xls diff --git a/49张兴宇.xls b/49张兴宇.xls new file mode 100644 index 0000000000000000000000000000000000000000..a74a26130b13fd021992c6481fad181d313af34e GIT binary patch literal 24064 zcmeHP30xG%((hdsSQK19Km?Hm5fA}6RJ=hvqJrFN5V>6rMKPcVQ55kO5R8c;5;Yps zc;CkR7Ez2R3NcZU;1MGpPlI@5zp5E#cbT2thk1GLdwD+_dXDPqe^*skcTacE?Bz=a zcfZ+bR!vx;AJHIxNOg#o0-Xof7^$F7NGG_!`#+>oDNQ5hMG$ec;Q$C)XlNBW#?@MzIA|ar8I%qzSM8h$Nv?oplQmXF!pXfu)F zBo#Dg$_@qz&>j+GX;v_U9{8W>&)A(gD+22Xp%rUsl0$kIwNGU6h!!;dYQiTw1SjA~ z$UTT-$Rud@y#8b?nLrZ3O%g$9bQLrhiYxx>gEu8ZG!JDs4hHVa(*jNT&`-6cUFI`*Y{BSNw!y>sb7rSiz9d@t%Mk2+dY$5wFUk^NMjA1R6}^k2^aQ#!K% zE0oNGn#~Cd_D4D);=hFM2*};ZVHCXyzCZ>2C*TtjWqFX!g@bf`75q9J3~q%(2l(u)nJtQ{AJ zx}v&3n=&T6G4Oz;SWBItmd3zCR#W;Iuy%VvPw-3_TAC(BLQ-!c_YqZ@dxLkHvokY7 zbC*h|(q#{VlEvz>tCK(ubeHP433(;O=Ts&f7bBeqw-o(l^XS-NbMdR9BPtjF|1zCN z%?D6J478mI^c0jGUei>DjIOA!`Pvf9RQFQPhN`9HXI-EE-L*nAcSrbJsHZU-0 zX#)D1QSvC4OWOfwWL!A-gu+F~*%%icXKh?`oc&Ss>jY}W3np-MJ;FmFCtX=iM@oK~ zoHPzh2x%N;ep)Y_F;eA1y&tnYX36*i&NLZ1?GLn{;Ea@}$Hme79#py73|;0|4l3-~ zjiTc$wo%_G&=)4Ew469|ZKOwgiCtuNz*#LtXW`@QxGBA}3VAG5$m61d-&_Tq#?e+P zm-ZiJI&EiV`_pz)rn{>sR~0_3uQE<|6*#KwU`p{T(PL$MsSqGVcTGW9z|1FC4+=k z@Kngp6fR+bV3(i`%PTN}gsYh|XUe9oEjVb}95f*Z4TVBHY558n9twptPtPWbcP>eQK^7W8oLDx-#V0;Y zU1|Msg3jbnkpWk!GtAuutK}%MPyLRo8WngMik;c@bB8{m%azWYB=w4K_-Jb`95zAvpA6yKaZS0tA;aPS7gK$@YB zEd@e%RBrCtvNGj1Q6MqS!S1N+fJfd@d3t)v>Y8zJMH=JciZsT>6={r%zj*ONhQ~Bi zMZQM!u!XX3BGg~qdvXc&CR54D*+Sud$iL50B_$lxLXqTqOg)E+uIvh40!Rl~v4?qA`R&M6%ntn4> zbf%zj~9wh9*nV@p^mEp2zPe~ z46>j?9XDWLs$CMt)d@925_biVa^F7*(p3RO62~>nHA9ka3Lxdae-fm-0tmfT1_UZF zwZq)wS%qOeAc-+7z1PNM`T&p~3Lug=uHL;FHtDGVQttaFL3$~G(A$Q8LY>|UAmzS) z((U^wfVjDHfiRNcSw-#iRRE!PK$%P*P&@q;K+1jpWWVdF0OID(1;R*%XG)!=d}v}> zMEZr$jC`SB0NACH@=1?~(+iOmx%9KNrAzr%u!Y--@Z4bOxFZq6BCMG7iqy!RFBuh9 zT$qwAChh>^))Y!$D{TcUz^Zf5Md-lH3wBwS=1T&Vq~RNZG?>Cj(>hB4(m?;4rA0|) zj8b(~Q--osb|96y9CT6Ius_Wsyt?o&gr-f^;+ac%q%NG1s7;mzC5b6Y(*1)Zu?I=i zO-hl@4o;U3)farNd4i z9YhZ{3Iv3wolsaP2D5%4Y~R#c8m*-;5I3eWSzzUc@ZxfYh0K>Y#w%svw}dQADxF0K z{0($P93@>fM4&6C(5xb21ygBW&BF6b?4oFVaZ{1_-K3gCp3VqmE!V@9Ll1qK9yn;y zs&|lQLF=(qH;`pvtYrm@>uM_kOJ-MuSZzhfrRgY_23sRr8bg^hIM$*9R`3pmT4}I8 z+0rKu>i!XlP^DpAiyX)4mW*W(yHn z=)e&<7VXAiQD?Ol#m6qx%NX{%lq`zP zf^AU?C5tjm!3ru)t#n+iti4+4 zO}*4y&>hrD%|IK&2&PnB*!{4ptD{nNG2M_c@lb}qu7?fLNeP1KZmdWT^|s^c$sE+$ zP7d8uz3sT5JFB%F18ofZ%1XAwmcv%uNl9_0wOGMct6G2LYKUFbN+*ZzgV3rUP`IF7 z09suRrj7SiRvbHFw&JczisQtJ9=rOfSDdR)cUP;p9NJU8;#|;O)hbRy#|lkItiq^Z zLg|8sWx|WP%M?=ry3#vR~57`fj*>z#tq2I}X zVzd6GYp$-ob+fi+w#N^5j4D?8=!H5KYwInNTy$Qr|JiiM4dxqv9MbNLV^Vdg@%=~X zre%BbTHhIZE;TT-Lq);dt$TkK-`Lx^!s@Bx&k+Y~#`VqlWYoiJ^_}hwdfeHz1;598 z9x5mo+t*^*KwFo zT+7kX%+7ayyQ;um z&p^_t?scl;yUm9zXH3$3a?aGQQsZsNZ)+A?`mA^o;$WQ>ZR5OCcf+%!lj+$u(`y4e z{_;@p&UVkP5nH==*0KKXX^A~=|9;!nhEX%8R`2saG1XzNnb6+SVMDrwx!?E}R`=KX z9GIG$P&&V@cbeNtQOVOl8=F24UypeIU{u=nkjsA218cRyPEA}RZE@(NxOn2Do*7-8 z?Y7=De|10NQKZJkF{Q7<-p3saeRGW%33gm9&Uv(Z=bhV6yS3QDJMYBhFMLAR{fBZYmKEr%)Amb06ll3;UyBDmMj-=2!^bVs@u^)u+UK&yS#1*Fwa8{@ z-=3GA2{Z$ag%2z%Z2h!g;Leb1Yb?TdhGSvTmDCmt;($bwQSVH!!FN{ zNnchaADi(=>tw&tZl5>2y5_mcGqd61gPDIkJI%XOlaO@s6Ppt!4=waR+}m;1-bcqP z%byn>TXDvI@abJ%uCLR+ue(smzg^tn*OO0Td#Bg;7_y*c`h?$)-|Lmz;A>b~?s4xl zzrE|=`nFTG?#?T^u;5w3vcYv_4dthQ?*Fi|+tfBCBgehnt-ELG_hZsxldc>d9_SD_ zC)ly?tK8wPXjbJ(gE8VmchK30{6_!0Ez;zwJ!)7(Dd;$yP(nR#xVpF&bu+ zv46$n6ZwTl23((dPGgwI`nT;*j68VBB>T{sV%rkQtc`Z=b?0ikm2Y`t@xvoQ^j0_V zZ_6U=Z2ATt{zu!}7t%`a)SNz$(Q4%B&s*1d+F$A0)h9W&()h8{uSdL(jY^Qt+T(j$%No4kUzsO#nWmsH@dA%{s z%{QvrdhwpT(iQLKoJ%)dr|b8$TV&6L1FL%(Mw}J=WcF&uwf!frg{GW7`R+#O=I_ko zoZIuuYSZ@yEu`lUZJI{k3X^KXWRDr;sCp_ zfgREtjEAgBew}0Qb9U};Rt67vH`gxR+~3>ge*2C?OGjPka?LAvt5s3hgqDKpuRGq< z3eC8AuK(31HZ^4e?W?|rT-REEJyUb;rM)K`LRLn$+u~TId$n)wsn*H9@$Xt?8;;*I z&LPXn;Mt-L%RX_pc{XyF(JsmRtzFl9r(L|6eQ@53&j#)`HMTfo^pAGiN)CU!?$}qZ zE=P^W&&x8i>SR{kBS`Ot?xu?27q%@)t*o4Kse8~8_kpQZ?oJ~D!>?_PxuKiWeTU(N zoNgJjE38jncD~qZwNL$~U-q>by+6gz{*XX$ICu5B$E(&^nO;S0m6Iuu_!ad&Pn&&sVMopgtrJQ!w?`2N(~hL#%Up5*z% z6Ni3k`;~PYq27=0wN;OII3`A>NRm>iF;PBbl-$_x;2@-RhgD8||AOXmM$WnUm&ToP z(VhQv?vdG7G6S!kGVPewrKDg$mvxz>+J0t&wfQac$G7}TwOSk!&iVFf{dax;*p|~! z{pAC(Nzo>)Q}+gVUP!1;>w9cwcG=u-H*^$+rB9l-ZrZKAk%t4Wtm|{p)x2cLK9{cy zMGLC@4+m}Y9($!s|LU+ox4OpI%-pc#v&fs4g{#`$?e2T+l}XS|kEm_=8_T4-Pq~z@ z@;@ob2($ zIDUzb{cu~GAbV%m!jx8a!ZfF`dA^c2{fy0# z82xD#r0ueRBI)k;Jyit8r(E<0)em_$yVhw~@h&f6$4A+v zi%Zu|KYnh7jY(PMt_>sJ*85%!@!lvlNph^JIq)ET%(nWPHEE_x{}J+NL`1^ev6(d+ zH@|#)ztf=#2iE|N*>+oA@|H&2T;8VkzP?MRN9#i#t?Kzx+oehQ9lrfZ+vdAH{T3hJ zmG`wyl;@fOso_Q*XFrdz`62!M|af9xRb`#5(? za#;M%=V`sqi(mA7@yi>#`|bs~?th3fbb9L)HY7jx5AfmfjbNvYc{XI53KC%TKnzDu zSX$;V91XzjkMdVC@ZB0C*g0T4vT330W(v%Wh4zr;p}es9g&_p(+UoM)b21%JKpTjP z%|TsGY-)7CoaXTKu9%ZsO?K16{IqR-IP-HhL|Q0$g|VNC<)E6-!WD(0uBm5aHU@h_ zAjiNJA0-Th=|O;xCRNEg@P3OYw5|lo5pv;!kzCe%<}BS`KfSenMYs3*1=>+u>VYd`zgWxxkVW%or5w%b)};N zYQv9CKRd%Ns^0JVc+07C(&pxEm{C~-Z(XknOH8?ww=PWVpBt7CsOM)pjb>jyy=po_ z+2>taf-rUAfW0Wen$I#_X?KZBkX>G7zD|6rxPEaJ={wTf5Z>IpH-QE*bJFG%TE-8N6vyjISB2S6 zv5Q&|*$X7MoO&vM@-&31?AP1o(S0xI~s&R{JQX6LBM`A=bM}bkq7Y(>ocA>#Bg)y;Ecy7q0*k}25;Xmg& zgxY~K==r^g*p_1#x-U!;=o16%v{$qU369YX@m+wGv_j$+vnotK%YKTkR1k7N60`h$ z!Gg#}bmtMRLU>W@Bc@7*FVOSTntTwff^~@w(KEp@UeB*;M$N+AfvbWllHgT#+KDWq zD~mSF(9dc)Rac7jTs^URB9@dJ_A;#`)-|OCTIE1k>$u2R|518=bkXRxamhnxgAP#b zu4B5y5ip`%6q*~@dr#OHgessf8wM>0$HlU+a&p5K&(xKkiJVsWrJrS7X6Wf0sL@_y zUWe)V2}Ae=DcN=59sMpYK_^H|K@YWzMuE_TuA%1fP@{Aem&Krz7srnA$GXf6Ma%0- zH!nVr-!lF}@cV+z3k+fwhII-8bZkLAz`kjL7jqDI&@yzhvY4k?y3(qAtMPQh=(MKm_13SNZ*a1LJK74>9qOJ|#h;NVz z$-D2KUmYG3;2BBL9VmKd>X=PYvVeyP9dRF)Sny^+$_Z+v!x-*oQ`~c?qpghlp{Jd~ zwY}{6nP(@3>u-oP^u%Mamx4c(2_Ag+MZy1FPv*=c#jw;?EEj{~;y#Q33?a#+tVD=c zchnYS=HpA(^ePO#qyP^Od>jVx+Jf#(c_NC2H*6Y8#ik(!n})Vw)9{51HVu1iHVrCD ziG~WZ`QVvGiH6?5=EKPYn}(jlrs2m4Y#Q2%O~aSU*)+5un`Xd4!$%J`A3oKxY3RFb z8v2(a?LFoDP_sZMI#cVPSI zPaNl)RQ*u@)RU$lH`6*W2j&#P<`*GH;3WVJDhDCYJo^I$UmZmr#1p~WvotCUvMf{q z@HIt6JQ2M7MdyI6HAvU!jDpZ|z>C&Ec{5|&;mJ$WJ+TZ#2XAf6fuE2zm0U}y3`7?} zJEP0crgDsI<+SVTr*s+G3OS;vG7wz^Wzjic*&oW7_0@o!qz7~kp@LKiR1QQJQIl4* zjt-TBZ5_lDQEd}>u(r^4O)ZF?D}o-2wsK2O9`s}pta{Ttx|}?SAc8-d2R`-XC<_rp z)O++O4{RR*5AXP=UT_4d=o&=?5j^+NI3i9QYzZPjpsl4(@t}1#oTaU$PgyJdk*%w6 z%XG?Gh$o`vcIaycR1QV&!~VIcpCG!3n)slshMaO>s}fN^SAeMS*(P9YX*YUo?>n>{ z_-QeNI*F14+nk76oyHva5m{4T!ne$s9061gY>^`Brw}j)3}UPt#!YhEgss;EaPe<> z?WhkdZQ%Ks`LlrujQkI&bZS4oA=yW@HZLe^EJO?zP^tztT70C1-Ca1Zz@ttrR2n`!wDAgGFVWZ{=;Qu&kejGLRsoqL)1i~kWkE15t(|sH@nZEJksQGc! zls`NE|BaeBPTu_&XP1g| zae8)1uMp64arx{L{f(YoviJBMAVPrzsv&oyo2il*rf^};u0IyKQ4n|*Mq;kh)Yqp)Pw^SPUnz@OFg)h zgUtt<2hu)X= zb7?f*92|wRg4Mnss{e{ z>rnW4&p7HwJ&`gQG)xAH>HuGO@o?~(2W^+)s*o5u3hb2cyaL(6X-YV*1N)$-8~Fcd;WKf-{1geDq;H{efW;eh^= z1QF?Yjcv~d{Kp^KL<-d6Sg6HV@ELD7Cj#eKc-2M%DHzzkhwTa5G=7NJ4USImvX3i# X&4$s1I^&O7x)A(6{bTs|S>XQwT2*u8 literal 0 HcmV?d00001 diff --git a/config.py b/config.py index 12a552d..ec6ef16 100644 --- a/config.py +++ b/config.py @@ -1,40 +1,40 @@ -# -*- coding: utf-8 -*- -""" -配置文件 - 所有超参数集中管理 - -设计思路: -将超参数分门别类,学生可以单独修改某一类而不会影响其他 -""" - -# ==================== 数据相关 ==================== -DATA_DIR = 'data/ChnSentiCorp' # 数据集路径 -MAX_FEATURES = 3000 # 词表最大容量 -MAX_SEQ_LEN = 100 # 句子最大长度(词数) -VECTORIZER_TYPE = 'tfidf' # 'tfidf' 或 'bow'(向量化方式) - -# ==================== 模型相关 ==================== -MODEL_TYPE = 'mlp' # 'mlp' 或 'lr'(模型类型) -HIDDEN_SIZE = 64 # MLP隐藏层大小(LR忽略) -NUM_CLASSES = 2 # 类别数(正面/负面二分类) -KEEP_PROB = 1.0 # Dropout保留概率(LR忽略,设为1即可) - -# ==================== 训练相关 ==================== -LEARNING_RATE = 0.05 # 学习率 -NUM_EPOCHS = 100 # 训练轮数 -BATCH_SIZE = 64 # 批次大小 - -# ==================== 类别权重(解决数据不平衡问题)==================== -USE_CLASS_WEIGHT = True # True=启用类别权重, False=不启用(对比用) -# 权重计算公式: n_samples / (n_classes * n_class_i) -# 正面评论多所以权重小,负面评论少所以权重大 -CLASS_WEIGHT_POS = 0.73 # 正面类权重(自动计算) -CLASS_WEIGHT_NEG = 1.58 # 负面类权重(自动计算) - -# ==================== 实验相关 ==================== -RUN_COMPARISON = False # True=运行对比实验, False=运行单个模型 -COMPARE_MODELS = ['lr', 'mlp'] # 要对比的模型列表 -COMPARE_VECTORS = ['bow', 'tfidf'] # 要对比的向量化方式 - -# ==================== 其他 ==================== -RANDOM_SEED = 42 # 随机种子(保证可复现) -VERBOSE = True # 打印详细日志 +# -*- coding: utf-8 -*- +""" +配置文件 - 所有超参数集中管理 + +设计思路: +将超参数分门别类,学生可以单独修改某一类而不会影响其他 +""" + +# ==================== 数据相关 ==================== +DATA_DIR = 'data/ChnSentiCorp' # 数据集路径 +MAX_FEATURES = 3000 # 词表最大容量 +MAX_SEQ_LEN = 100 # 句子最大长度(词数) +VECTORIZER_TYPE = 'tfidf' # 'tfidf' 或 'bow'(向量化方式) + +# ==================== 模型相关 ==================== +MODEL_TYPE = 'mlp' # 'mlp' 或 'lr'(模型类型) +HIDDEN_SIZE = 60 # MLP隐藏层大小(LR忽略) +NUM_CLASSES = 2 # 类别数(正面/负面二分类) +KEEP_PROB = 1.0 # Dropout保留概率(LR忽略,设为1即可) + +# ==================== 训练相关 ==================== +LEARNING_RATE = 0.05 # 学习率 +NUM_EPOCHS = 100 # 训练轮数 +BATCH_SIZE = 50 # 批次大小 + +# ==================== 类别权重(解决数据不平衡问题)==================== +USE_CLASS_WEIGHT = True # True=启用类别权重, False=不启用(对比用) +# 权重计算公式: n_samples / (n_classes * n_class_i) +# 正面评论多所以权重小,负面评论少所以权重大 +CLASS_WEIGHT_POS = 1.66 # 正面类权重(自动计算) +CLASS_WEIGHT_NEG = 0.99 # 负面类权重(自动计算) + +# ==================== 实验相关 ==================== +RUN_COMPARISON = False # True=运行对比实验, False=运行单个模型 +COMPARE_MODELS = ['lr', 'mlp'] # 要对比的模型列表 +COMPARE_VECTORS = ['bow', 'tfidf'] # 要对比的向量化方式 + +# ==================== 其他 ==================== +RANDOM_SEED = 42 # 随机种子(保证可复现) +VERBOSE = True # 打印详细日志 diff --git a/dataset.py b/dataset.py index e554362..4f4163e 100644 --- a/dataset.py +++ b/dataset.py @@ -1,286 +1,286 @@ -# -*- coding: utf-8 -*- -""" -数据加载与向量化模块 - -支持两种向量化方法: -1. BoW (Bag of Words) - 词频向量 -2. TF-IDF - 词频-逆文档频率向量 - -TF-IDF 的优势: -- 降低常见词(如"的"、"是")的权重 -- 提升罕见词的信息量 -- 通常效果优于简单BoW -""" - -import os -import re -import csv -import math -import jieba -import numpy as np -from collections import Counter - -try: - import urllib.request - import ssl - DOWNLOAD_AVAILABLE = True -except ImportError: - DOWNLOAD_AVAILABLE = False - - -DATASET_URL = "https://raw.githubusercontent.com/SophonPlus/ChineseNlpCorpus/master/datasets/ChnSentiCorp_htl_all/ChnSentiCorp_htl_all.csv" - - -def download_dataset(data_dir): - """下载数据集(如果不存在)""" - csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') - - if os.path.exists(csv_path): - print(f"数据已存在: {csv_path}") - return True - - if not DOWNLOAD_AVAILABLE: - return False - - print("正在下载数据集...") - ssl_context = ssl.create_default_context() - ssl_context.check_hostname = False - ssl_context.verify_mode = ssl.CERT_NONE - - try: - request = urllib.request.Request(DATASET_URL, headers={'User-Agent': 'Mozilla/5.0'}) - response = urllib.request.urlopen(request, timeout=120, context=ssl_context) - os.makedirs(data_dir, exist_ok=True) - with open(csv_path, 'wb') as f: - f.write(response.read()) - print(f"下载完成: {csv_path}") - return True - except Exception as e: - print(f"下载失败: {e}") - return False - - -def load_raw_data(data_dir): - """加载原始数据""" - csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') - texts, labels = [], [] - - with open(csv_path, 'r', encoding='utf-8') as f: - reader = csv.reader(f) - for row in reader: - if len(row) < 2: - continue - try: - label = int(row[0]) - review = row[1].strip() - if review: - texts.append(review) - labels.append(label) - except (ValueError, IndexError): - continue - - return texts, np.array(labels) - - -def tokenize(text): - """中文分词""" - text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text) - words = jieba.lcut(text) - return [w for w in words if len(w) > 1] - - -# ==================== 向量化器 ==================== - -class BaseVectorizer: - """向量化器基类""" - def fit(self, texts): pass - def transform(self, texts): pass - def fit_transform(self, texts): pass - - -class BoWVectorizer(BaseVectorizer): - """ - 词袋模型 (Bag of Words) - - 原理:统计每个词在文本中出现的次数 - 向量维度 = 词表大小 - 每个维度 = 该词在本文本中出现的次数 - """ - - def __init__(self, max_features, max_seq_len): - self.max_features = max_features - self.max_seq_len = max_seq_len - self.vocab = {} - self.doc_freq = {} # 文档频率 - self.num_docs = 0 - - def fit(self, texts): - """构建词表(基于词频)""" - counter = Counter() - doc_counter = Counter() # 统计包含该词的文档数 - - for text in texts: - words = tokenize(text) - unique_words = set(words) - counter.update(words) - for w in unique_words: - doc_counter[w] += 1 - - self.num_docs = len(texts) - - # 取最高频的词 - most_common = counter.most_common(self.max_features) - self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)} - - # 记录文档频率(用于TF-IDF) - self.doc_freq = {w: doc_counter[w] for w in self.vocab} - - print(f" BoW词表大小: {len(self.vocab)}") - return self - - def transform(self, texts): - """将文本转换为词频向量""" - vectors = [] - for text in texts: - words = tokenize(text) - freq = [0] * self.max_seq_len - for i, word in enumerate(words[:self.max_seq_len]): - if word in self.vocab: - freq[i] = 1 # 二值(出现=1,不出现=0) - vectors.append(freq) - return np.array(vectors, dtype=np.float32) - - def fit_transform(self, texts): - self.fit(texts) - return self.transform(texts) - - -class TFIDFVectorizer(BaseVectorizer): - """ - TF-IDF 向量器 - - 原理: - - TF(词频) = 词在本文本中出现的次数 - - IDF(逆文档频率) = log(总文档数 / 包含该词的文档数) - - TF-IDF = TF × IDF - - 优势: - - 降低常见无意义词的权重(如"的"、"是") - - 提升罕见但有信息量的词 - """ - - def __init__(self, max_features, max_seq_len): - self.max_features = max_features - self.max_seq_len = max_seq_len - self.vocab = {} - self.idf = {} # 存储每个词的IDF值 - self.num_docs = 0 - - def fit(self, texts): - """构建词表并计算IDF""" - counter = Counter() - doc_counter = Counter() - - for text in texts: - words = tokenize(text) - unique_words = set(words) - counter.update(words) - for w in unique_words: - doc_counter[w] += 1 - - self.num_docs = len(texts) - - # 计算每个词的IDF - # IDF = log(总文档数 / 包含该词的文档数) - idf_values = {} - for word, df in doc_counter.items(): - idf_values[word] = math.log(self.num_docs / (df + 1)) + 1 # 加1防零 - - # 取IDF值最高的词(信息量最大的词) - sorted_words = sorted(idf_values.items(), key=lambda x: x[1], reverse=True) - self.vocab = {word: idx for idx, (word, _) in enumerate(sorted_words[:self.max_features])} - - # 保存IDF值 - self.idf = {word: idf_values[word] for word in self.vocab} - - print(f" TF-IDF词表大小: {len(self.vocab)}") - print(f" 平均IDF: {np.mean(list(self.idf.values())):.3f}") - return self - - def transform(self, texts): - """将文本转换为TF-IDF向量""" - vectors = [] - for text in texts: - words = tokenize(text) - - # 计算TF - tf = Counter(words) - tf_sum = len(words) if words else 1 - - # 生成向量 - vec = [0.0] * self.max_seq_len - for i, word in enumerate(words[:self.max_seq_len]): - if word in self.vocab: - # TF × IDF - vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0) - vectors.append(vec) - - return np.array(vectors, dtype=np.float32) - - def fit_transform(self, texts): - self.fit(texts) - return self.transform(texts) - - -def load_data(data_dir, max_features, max_seq_len, vectorizer_type='tfidf'): - """ - 加载并向量化数据 - - 参数: - - vectorizer_type: 'tfidf' 或 'bow' - """ - if not download_dataset(data_dir): - raise RuntimeError("数据加载失败,请检查网络或手动下载数据集") - - print("正在加载数据...") - texts, labels = load_raw_data(data_dir) - print(f"总评论数: {len(texts)}, 正面: {sum(labels)}, 负面: {len(labels) - sum(labels)}") - - # 选择向量化器 - if vectorizer_type == 'tfidf': - vectorizer = TFIDFVectorizer(max_features, max_seq_len) - vec_name = "TF-IDF" - else: - vectorizer = BoWVectorizer(max_features, max_seq_len) - vec_name = "BoW" - - print(f"正在使用{vec_name}向量化...") - X = vectorizer.fit_transform(texts) - y = labels - - # 打乱并划分 - np.random.seed(42) - indices = np.random.permutation(len(X)) - X = X[indices] - y = y[indices] - - split_idx = int(len(X) * 0.8) - X_train, X_test = X[:split_idx], X[split_idx:] - y_train, y_test = y[:split_idx], y[split_idx:] - - print(f"训练集: {len(X_train)}条, 测试集: {len(X_test)}条") - - return X_train, y_train, X_test, y_test, vectorizer - - -if __name__ == '__main__': - # 测试 - print("=" * 60) - print("测试 TF-IDF 向量化") - print("=" * 60) - X_train, y_train, X_test, y_test, vec = load_data( - 'data/ChnSentiCorp', max_features=3000, max_seq_len=100, - vectorizer_type='tfidf' - ) - print(f"\nX_train shape: {X_train.shape}") - print(f"X_train sample (前5个特征): {X_train[0][:5]}") +# -*- coding: utf-8 -*- +""" +数据加载与向量化模块 + +支持两种向量化方法: +1. BoW (Bag of Words) - 词频向量 +2. TF-IDF - 词频-逆文档频率向量 + +TF-IDF 的优势: +- 降低常见词(如"的"、"是")的权重 +- 提升罕见词的信息量 +- 通常效果优于简单BoW +""" + +import os +import re +import csv +import math +import jieba +import numpy as np +from collections import Counter + +try: + import urllib.request + import ssl + DOWNLOAD_AVAILABLE = True +except ImportError: + DOWNLOAD_AVAILABLE = False + + +DATASET_URL = "https://raw.githubusercontent.com/SophonPlus/ChineseNlpCorpus/master/datasets/ChnSentiCorp_htl_all/ChnSentiCorp_htl_all.csv" + + +def download_dataset(data_dir): + """下载数据集(如果不存在)""" + csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') + + if os.path.exists(csv_path): + print(f"数据已存在: {csv_path}") + return True + + if not DOWNLOAD_AVAILABLE: + return False + + print("正在下载数据集...") + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + + try: + request = urllib.request.Request(DATASET_URL, headers={'User-Agent': 'Mozilla/5.0'}) + response = urllib.request.urlopen(request, timeout=120, context=ssl_context) + os.makedirs(data_dir, exist_ok=True) + with open(csv_path, 'wb') as f: + f.write(response.read()) + print(f"下载完成: {csv_path}") + return True + except Exception as e: + print(f"下载失败: {e}") + return False + + +def load_raw_data(data_dir): + """加载原始数据""" + csv_path = os.path.join(data_dir, 'ChnSentiCorp_htl_all.csv') + texts, labels = [], [] + + with open(csv_path, 'r', encoding='utf-8') as f: + reader = csv.reader(f) + for row in reader: + if len(row) < 2: + continue + try: + label = int(row[0]) + review = row[1].strip() + if review: + texts.append(review) + labels.append(label) + except (ValueError, IndexError): + continue + + return texts, np.array(labels) + + +def tokenize(text): + """中文分词""" + text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', ' ', text) + words = jieba.lcut(text) + return [w for w in words if len(w) > 1] + + +# ==================== 向量化器 ==================== + +class BaseVectorizer: + """向量化器基类""" + def fit(self, texts): pass + def transform(self, texts): pass + def fit_transform(self, texts): pass + + +class BoWVectorizer(BaseVectorizer): + """ + 词袋模型 (Bag of Words) + + 原理:统计每个词在文本中出现的次数 + 向量维度 = 词表大小 + 每个维度 = 该词在本文本中出现的次数 + """ + + def __init__(self, max_features, max_seq_len): + self.max_features = max_features + self.max_seq_len = max_seq_len + self.vocab = {} + self.doc_freq = {} # 文档频率 + self.num_docs = 0 + + def fit(self, texts): + """构建词表(基于词频)""" + counter = Counter() + doc_counter = Counter() # 统计包含该词的文档数 + + for text in texts: + words = tokenize(text) + unique_words = set(words) + counter.update(words) + for w in unique_words: + doc_counter[w] += 1 + + self.num_docs = len(texts) + + # 取最高频的词 + most_common = counter.most_common(self.max_features) + self.vocab = {word: idx for idx, (word, _) in enumerate(most_common)} + + # 记录文档频率(用于TF-IDF) + self.doc_freq = {w: doc_counter[w] for w in self.vocab} + + print(f" BoW词表大小: {len(self.vocab)}") + return self + + def transform(self, texts): + """将文本转换为词频向量""" + vectors = [] + for text in texts: + words = tokenize(text) + freq = [0] * self.max_seq_len + for i, word in enumerate(words[:self.max_seq_len]): + if word in self.vocab: + freq[i] = 1 # 二值(出现=1,不出现=0) + vectors.append(freq) + return np.array(vectors, dtype=np.float32) + + def fit_transform(self, texts): + self.fit(texts) + return self.transform(texts) + + +class TFIDFVectorizer(BaseVectorizer): + """ + TF-IDF 向量器 + + 原理: + - TF(词频) = 词在本文本中出现的次数 + - IDF(逆文档频率) = log(总文档数 / 包含该词的文档数) + - TF-IDF = TF × IDF + + 优势: + - 降低常见无意义词的权重(如"的"、"是") + - 提升罕见但有信息量的词 + """ + + def __init__(self, max_features, max_seq_len): + self.max_features = max_features + self.max_seq_len = max_seq_len + self.vocab = {} + self.idf = {} # 存储每个词的IDF值 + self.num_docs = 0 + + def fit(self, texts): + """构建词表并计算IDF""" + counter = Counter() + doc_counter = Counter() + + for text in texts: + words = tokenize(text) + unique_words = set(words) + counter.update(words) + for w in unique_words: + doc_counter[w] += 1 + + self.num_docs = len(texts) + + # 计算每个词的IDF + # IDF = log(总文档数 / 包含该词的文档数) + idf_values = {} + for word, df in doc_counter.items(): + idf_values[word] = math.log(self.num_docs / (df + 1)) + 1 # 加1防零 + + # 取IDF值最高的词(信息量最大的词) + sorted_words = sorted(idf_values.items(), key=lambda x: x[1], reverse=True) + self.vocab = {word: idx for idx, (word, _) in enumerate(sorted_words[:self.max_features])} + + # 保存IDF值 + self.idf = {word: idf_values[word] for word in self.vocab} + + print(f" TF-IDF词表大小: {len(self.vocab)}") + print(f" 平均IDF: {np.mean(list(self.idf.values())):.3f}") + return self + + def transform(self, texts): + """将文本转换为TF-IDF向量""" + vectors = [] + for text in texts: + words = tokenize(text) + + # 计算TF + tf = Counter(words) + tf_sum = len(words) if words else 1 + + # 生成向量 + vec = [0.0] * self.max_seq_len + for i, word in enumerate(words[:self.max_seq_len]): + if word in self.vocab: + # TF × IDF + vec[i] = (tf[word] / tf_sum) * self.idf.get(word, 0) + vectors.append(vec) + + return np.array(vectors, dtype=np.float32) + + def fit_transform(self, texts): + self.fit(texts) + return self.transform(texts) + + +def load_data(data_dir, max_features, max_seq_len, vectorizer_type='tfidf'): + """ + 加载并向量化数据 + + 参数: + - vectorizer_type: 'tfidf' 或 'bow' + """ + if not download_dataset(data_dir): + raise RuntimeError("数据加载失败,请检查网络或手动下载数据集") + + print("正在加载数据...") + texts, labels = load_raw_data(data_dir) + print(f"总评论数: {len(texts)}, 正面: {sum(labels)}, 负面: {len(labels) - sum(labels)}") + + # 选择向量化器 + if vectorizer_type == 'tfidf': + vectorizer = TFIDFVectorizer(max_features, max_seq_len) + vec_name = "TF-IDF" + else: + vectorizer = BoWVectorizer(max_features, max_seq_len) + vec_name = "BoW" + + print(f"正在使用{vec_name}向量化...") + X = vectorizer.fit_transform(texts) + y = labels + + # 打乱并划分 + np.random.seed(42) + indices = np.random.permutation(len(X)) + X = X[indices] + y = y[indices] + + split_idx = int(len(X) * 0.8) + X_train, X_test = X[:split_idx], X[split_idx:] + y_train, y_test = y[:split_idx], y[split_idx:] + + print(f"训练集: {len(X_train)}条, 测试集: {len(X_test)}条") + + return X_train, y_train, X_test, y_test, vectorizer + + +if __name__ == '__main__': + # 测试 + print("=" * 60) + print("测试 TF-IDF 向量化") + print("=" * 60) + X_train, y_train, X_test, y_test, vec = load_data( + 'data/ChnSentiCorp', max_features=3000, max_seq_len=100, + vectorizer_type='tfidf' + ) + print(f"\nX_train shape: {X_train.shape}") + print(f"X_train sample (前5个特征): {X_train[0][:5]}") diff --git a/main.py b/main.py index eaeaadc..2dbbe1c 100644 --- a/main.py +++ b/main.py @@ -1,34 +1,34 @@ -# -*- coding: utf-8 -*- -""" -主程序入口 - -使用方式: - -1. 运行单个模型(默认): - python main.py - - 修改 config.py 中的 MODEL_TYPE 和 VECTORIZER_TYPE 来切换配置 - -2. 运行对比实验: - 修改 config.py 中 RUN_COMPARISON = True - - 这会依次运行: - - 实验1: BoW vs TF-IDF (固定LR模型) - - 实验2: LR vs MLP (固定TF-IDF) - - 实验3: 不同学习率对比 - - 实验4: 不同隐藏层大小对比 - - 最后输出汇总报告 -""" - -from train import main - -if __name__ == '__main__': - print("\n" + "=" * 70) - print("文本分类实验 - 纯NumPy实现") - print("数据集: ChnSentiCorp (中文酒店评论)") - print("模型: Logistic Regression / MLP") - print("向量化: BoW / TF-IDF") - print("=" * 70 + "\n") - - main() +# -*- coding: utf-8 -*- +""" +主程序入口 + +使用方式: + +1. 运行单个模型(默认): + python main.py + + 修改 config.py 中的 MODEL_TYPE 和 VECTORIZER_TYPE 来切换配置 + +2. 运行对比实验: + 修改 config.py 中 RUN_COMPARISON = True + + 这会依次运行: + - 实验1: BoW vs TF-IDF (固定LR模型) + - 实验2: LR vs MLP (固定TF-IDF) + - 实验3: 不同学习率对比 + - 实验4: 不同隐藏层大小对比 + + 最后输出汇总报告 +""" + +from train import main + +if __name__ == '__main__': + print("\n" + "=" * 70) + print("文本分类实验 - 纯NumPy实现") + print("数据集: ChnSentiCorp (中文酒店评论)") + print("模型: Logistic Regression / MLP") + print("向量化: BoW / TF-IDF") + print("=" * 70 + "\n") + + main() diff --git a/model_numpy.py b/model_numpy.py index e8d7adf..cdec28a 100644 --- a/model_numpy.py +++ b/model_numpy.py @@ -1,342 +1,342 @@ -# -*- coding: utf-8 -*- -""" -模型模块 - 纯NumPy实现 - -支持两种模型: -1. Logistic Regression(逻辑回归)- 线性模型 -2. MLP(多层感知机)- 两层全连接网络 - -设计思路: -- 两种模型都共享相同的接口,方便对比 -- 代码简洁,每行都有详细注释 -- 手动实现反向传播,原理透明 -""" - -import numpy as np - - -class BaseModel: - """模型基类""" - def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): pass - def predict(self, X): pass - def predict_proba(self, X): pass - def accuracy(self, X, y): pass - - -class LogisticRegression(BaseModel): - """ - 逻辑回归(线性分类器) - - 结构:输入 → 线性变换 → Softmax → 输出 - - 原理: - - 线性变换: z = X @ W + b - - Softmax: 将线性输出转为概率分布 - - 参数量:input_size × num_classes + num_classes - """ - - def __init__(self, input_size, num_classes=2, learning_rate=0.1, - class_weight=None, seed=42): - np.random.seed(seed) - - # 权重初始化(Xavier) - self.W = np.random.randn(input_size, num_classes) * np.sqrt(2.0 / input_size) - self.b = np.zeros(num_classes) - - self.lr = learning_rate - self.input_size = input_size - self.num_classes = num_classes - self.class_weight = class_weight # 类别权重 - - total_params = input_size * num_classes + num_classes - print(f"LogisticRegression: {input_size} -> {num_classes}, 参数量: {total_params}") - - def softmax(self, x): - """Softmax函数""" - x_shifted = x - np.max(x, axis=1, keepdims=True) - exp_x = np.exp(x_shifted) - return exp_x / np.sum(exp_x, axis=1, keepdims=True) - - def forward(self, X): - """前向传播""" - # 线性变换 - z = X @ self.W + self.b - # Softmax输出概率 - return self.softmax(z) - - def backward(self, X, y): - """反向传播(梯度下降)""" - batch_size = X.shape[0] - probs = self.forward(X) - - # Softmax + 交叉熵梯度 - d_z = probs.copy() - - # 应用类别权重:减去权重值而不是1 - # 公式: dL/dz_y = w_y * (p_y - 1) = w_y*p_y - w_y - if self.class_weight is not None: - for i in range(batch_size): - d_z[i, y[i]] -= self.class_weight[y[i]] - else: - d_z[np.arange(batch_size), y] -= 1 - - # 梯度 - d_W = X.T @ d_z - d_b = np.sum(d_z, axis=0) - - # 更新 - self.W -= self.lr * d_W / batch_size - self.b -= self.lr * d_b / batch_size - - def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): - """训练""" - num_samples = len(X) - num_batches = (num_samples + batch_size - 1) // batch_size - - for epoch in range(epochs): - # 打乱 - indices = np.random.permutation(num_samples) - X_shuffled = X[indices] - y_shuffled = y[indices] - - epoch_loss = 0 - for batch_idx in range(num_batches): - start = batch_idx * batch_size - end = min(start + batch_size, num_samples) - X_batch = X_shuffled[start:end] - y_batch = y_shuffled[start:end] - - # 前向 + 反向 - probs = self.forward(X_batch) - self.backward(X_batch, y_batch) - - # 损失 - loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) - epoch_loss += loss - - # 评估 - if verbose and (epoch + 1) % 20 == 0: - train_acc = self.accuracy(X, y) - msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" - if X_val is not None: - val_acc = self.accuracy(X_val, y_val) - msg += f" | 测试准确率: {val_acc:.4f}" - print(msg) - - return self - - def predict(self, X): - return np.argmax(self.forward(X), axis=1) - - def predict_proba(self, X): - return self.forward(X) - - def accuracy(self, X, y): - return np.mean(self.predict(X) == y) - - def save(self, filepath): - """保存模型权重""" - np.save(filepath + '_W.npy', self.W) - np.save(filepath + '_b.npy', self.b) - print(f"模型已保存: {filepath}") - - @staticmethod - def load(filepath, input_size, num_classes=2, learning_rate=0.1): - """加载模型权重""" - model = LogisticRegression(input_size, num_classes, learning_rate) - model.W = np.load(filepath + '_W.npy') - model.b = np.load(filepath + '_b.npy') - print(f"模型已加载: {filepath}") - return model - - -class MLP(BaseModel): - """ - 多层感知机(神经网络) - - 结构:输入 → 线性变换 → ReLU → 线性变换 → Softmax → 输出 - - 和LogisticRegression的区别: - - 多了一层隐藏层 + 非线性激活 - - 可以学习非线性关系 - - 参数量更大 - - 参数量: - - W1: input_size × hidden_size - - b1: hidden_size - - W2: hidden_size × num_classes - - b2: num_classes - """ - - def __init__(self, input_size, hidden_size=64, num_classes=2, - learning_rate=0.1, keep_prob=1.0, class_weight=None, seed=42): - np.random.seed(seed) - - # 第一层权重 - self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size) - self.b1 = np.zeros(hidden_size) - - # 第二层权重 - self.W2 = np.random.randn(hidden_size, num_classes) * np.sqrt(2.0 / hidden_size) - self.b2 = np.zeros(num_classes) - - self.lr = learning_rate - self.keep_prob = keep_prob - self.hidden_size = hidden_size - self.input_size = input_size - self.num_classes = num_classes - self.class_weight = class_weight # 类别权重 - - total_params = (input_size * hidden_size + hidden_size + - hidden_size * num_classes + num_classes) - print(f"MLP: {input_size} -> {hidden_size} -> {num_classes}, 参数量: {total_params}") - - def relu(self, x): - """ReLU激活""" - return np.maximum(0, x) - - def relu_derivative(self, x): - """ReLU导数""" - return (x > 0).astype(float) - - def softmax(self, x): - """Softmax函数""" - x_shifted = x - np.max(x, axis=1, keepdims=True) - exp_x = np.exp(x_shifted) - return exp_x / np.sum(exp_x, axis=1, keepdims=True) - - def forward(self, X): - """前向传播""" - # 第一层 - self.z1 = X @ self.W1 + self.b1 - self.a1 = self.relu(self.z1) - - # Dropout(训练时) - if self.keep_prob < 1.0 and hasattr(self, 'training'): - self.d1 = (np.random.rand(*self.a1.shape) < self.keep_prob).astype(float) - self.a1 *= self.d1 - self.a1 /= self.keep_prob - - # 第二层 - self.z2 = self.a1 @ self.W2 + self.b2 - self.probs = self.softmax(self.z2) - - return self.probs - - def backward(self, X, y): - """反向传播""" - batch_size = X.shape[0] - - # 输出层梯度 - d_z2 = self.probs.copy() - - # 应用类别权重 - if self.class_weight is not None: - for i in range(batch_size): - d_z2[i, y[i]] -= self.class_weight[y[i]] - else: - d_z2[np.arange(batch_size), y] -= 1 - - # 第二层梯度 - d_W2 = self.a1.T @ d_z2 - d_b2 = np.sum(d_z2, axis=0) - - # 隐藏层梯度 - d_a1 = d_z2 @ self.W2.T - d_z1 = d_a1 * self.relu_derivative(self.z1) - - # Dropout梯度 - if self.keep_prob < 1.0 and hasattr(self, 'd1'): - d_z1 *= self.d1 - d_z1 /= self.keep_prob - - # 第一层梯度 - d_W1 = X.T @ d_z1 - d_b1 = np.sum(d_z1, axis=0) - - # 更新 - self.W1 -= self.lr * d_W1 / batch_size - self.b1 -= self.lr * d_b1 / batch_size - self.W2 -= self.lr * d_W2 / batch_size - self.b2 -= self.lr * d_b2 / batch_size - - def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): - """训练""" - num_samples = len(X) - num_batches = (num_samples + batch_size - 1) // batch_size - - for epoch in range(epochs): - # 打乱 - indices = np.random.permutation(num_samples) - X_shuffled = X[indices] - y_shuffled = y[indices] - - epoch_loss = 0 - self.training = True # 开启Dropout - - for batch_idx in range(num_batches): - start = batch_idx * batch_size - end = min(start + batch_size, num_samples) - X_batch = X_shuffled[start:end] - y_batch = y_shuffled[start:end] - - # 前向 + 反向 - probs = self.forward(X_batch) - self.backward(X_batch, y_batch) - - # 损失 - loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) - epoch_loss += loss - - self.training = False # 关闭Dropout - - # 评估 - if verbose and (epoch + 1) % 20 == 0: - train_acc = self.accuracy(X, y) - msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" - if X_val is not None: - val_acc = self.accuracy(X_val, y_val) - msg += f" | 测试准确率: {val_acc:.4f}" - print(msg) - - return self - - def predict(self, X): - return np.argmax(self.forward(X), axis=1) - - def predict_proba(self, X): - return self.forward(X) - - def accuracy(self, X, y): - return np.mean(self.predict(X) == y) - - def save(self, filepath): - """保存模型权重""" - np.save(filepath + '_W1.npy', self.W1) - np.save(filepath + '_b1.npy', self.b1) - np.save(filepath + '_W2.npy', self.W2) - np.save(filepath + '_b2.npy', self.b2) - print(f"模型已保存: {filepath}") - - @staticmethod - def load(filepath, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0): - """加载模型权重""" - model = MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob) - model.W1 = np.load(filepath + '_W1.npy') - model.b1 = np.load(filepath + '_b1.npy') - model.W2 = np.load(filepath + '_W2.npy') - model.b2 = np.load(filepath + '_b2.npy') - print(f"模型已加载: {filepath}") - return model - - -def create_model(model_type, input_size, hidden_size=64, num_classes=2, - learning_rate=0.1, keep_prob=1.0, class_weight=None): - """工厂函数:创建模型""" - if model_type == 'lr': - return LogisticRegression(input_size, num_classes, learning_rate, class_weight) - elif model_type == 'mlp': - return MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob, class_weight) - else: - raise ValueError(f"未知模型类型: {model_type}") +# -*- coding: utf-8 -*- +""" +模型模块 - 纯NumPy实现 + +支持两种模型: +1. Logistic Regression(逻辑回归)- 线性模型 +2. MLP(多层感知机)- 两层全连接网络 + +设计思路: +- 两种模型都共享相同的接口,方便对比 +- 代码简洁,每行都有详细注释 +- 手动实现反向传播,原理透明 +""" + +import numpy as np + + +class BaseModel: + """模型基类""" + def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): pass + def predict(self, X): pass + def predict_proba(self, X): pass + def accuracy(self, X, y): pass + + +class LogisticRegression(BaseModel): + """ + 逻辑回归(线性分类器) + + 结构:输入 → 线性变换 → Softmax → 输出 + + 原理: + - 线性变换: z = X @ W + b + - Softmax: 将线性输出转为概率分布 + + 参数量:input_size × num_classes + num_classes + """ + + def __init__(self, input_size, num_classes=2, learning_rate=0.1, + class_weight=None, seed=42): + np.random.seed(seed) + + # 权重初始化(Xavier) + self.W = np.random.randn(input_size, num_classes) * np.sqrt(2.0 / input_size) + self.b = np.zeros(num_classes) + + self.lr = learning_rate + self.input_size = input_size + self.num_classes = num_classes + self.class_weight = class_weight # 类别权重 + + total_params = input_size * num_classes + num_classes + print(f"LogisticRegression: {input_size} -> {num_classes}, 参数量: {total_params}") + + def softmax(self, x): + """Softmax函数""" + x_shifted = x - np.max(x, axis=1, keepdims=True) + exp_x = np.exp(x_shifted) + return exp_x / np.sum(exp_x, axis=1, keepdims=True) + + def forward(self, X): + """前向传播""" + # 线性变换 + z = X @ self.W + self.b + # Softmax输出概率 + return self.softmax(z) + + def backward(self, X, y): + """反向传播(梯度下降)""" + batch_size = X.shape[0] + probs = self.forward(X) + + # Softmax + 交叉熵梯度 + d_z = probs.copy() + + # 应用类别权重:减去权重值而不是1 + # 公式: dL/dz_y = w_y * (p_y - 1) = w_y*p_y - w_y + if self.class_weight is not None: + for i in range(batch_size): + d_z[i, y[i]] -= self.class_weight[y[i]] + else: + d_z[np.arange(batch_size), y] -= 1 + + # 梯度 + d_W = X.T @ d_z + d_b = np.sum(d_z, axis=0) + + # 更新 + self.W -= self.lr * d_W / batch_size + self.b -= self.lr * d_b / batch_size + + def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): + """训练""" + num_samples = len(X) + num_batches = (num_samples + batch_size - 1) // batch_size + + for epoch in range(epochs): + # 打乱 + indices = np.random.permutation(num_samples) + X_shuffled = X[indices] + y_shuffled = y[indices] + + epoch_loss = 0 + for batch_idx in range(num_batches): + start = batch_idx * batch_size + end = min(start + batch_size, num_samples) + X_batch = X_shuffled[start:end] + y_batch = y_shuffled[start:end] + + # 前向 + 反向 + probs = self.forward(X_batch) + self.backward(X_batch, y_batch) + + # 损失 + loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) + epoch_loss += loss + + # 评估 + if verbose and (epoch + 1) % 20 == 0: + train_acc = self.accuracy(X, y) + msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" + if X_val is not None: + val_acc = self.accuracy(X_val, y_val) + msg += f" | 测试准确率: {val_acc:.4f}" + print(msg) + + return self + + def predict(self, X): + return np.argmax(self.forward(X), axis=1) + + def predict_proba(self, X): + return self.forward(X) + + def accuracy(self, X, y): + return np.mean(self.predict(X) == y) + + def save(self, filepath): + """保存模型权重""" + np.save(filepath + '_W.npy', self.W) + np.save(filepath + '_b.npy', self.b) + print(f"模型已保存: {filepath}") + + @staticmethod + def load(filepath, input_size, num_classes=2, learning_rate=0.1): + """加载模型权重""" + model = LogisticRegression(input_size, num_classes, learning_rate) + model.W = np.load(filepath + '_W.npy') + model.b = np.load(filepath + '_b.npy') + print(f"模型已加载: {filepath}") + return model + + +class MLP(BaseModel): + """ + 多层感知机(神经网络) + + 结构:输入 → 线性变换 → ReLU → 线性变换 → Softmax → 输出 + + 和LogisticRegression的区别: + - 多了一层隐藏层 + 非线性激活 + - 可以学习非线性关系 + - 参数量更大 + + 参数量: + - W1: input_size × hidden_size + - b1: hidden_size + - W2: hidden_size × num_classes + - b2: num_classes + """ + + def __init__(self, input_size, hidden_size=64, num_classes=2, + learning_rate=0.1, keep_prob=1.0, class_weight=None, seed=42): + np.random.seed(seed) + + # 第一层权重 + self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size) + self.b1 = np.zeros(hidden_size) + + # 第二层权重 + self.W2 = np.random.randn(hidden_size, num_classes) * np.sqrt(2.0 / hidden_size) + self.b2 = np.zeros(num_classes) + + self.lr = learning_rate + self.keep_prob = keep_prob + self.hidden_size = hidden_size + self.input_size = input_size + self.num_classes = num_classes + self.class_weight = class_weight # 类别权重 + + total_params = (input_size * hidden_size + hidden_size + + hidden_size * num_classes + num_classes) + print(f"MLP: {input_size} -> {hidden_size} -> {num_classes}, 参数量: {total_params}") + + def relu(self, x): + """ReLU激活""" + return np.maximum(0, x) + + def relu_derivative(self, x): + """ReLU导数""" + return (x > 0).astype(float) + + def softmax(self, x): + """Softmax函数""" + x_shifted = x - np.max(x, axis=1, keepdims=True) + exp_x = np.exp(x_shifted) + return exp_x / np.sum(exp_x, axis=1, keepdims=True) + + def forward(self, X): + """前向传播""" + # 第一层 + self.z1 = X @ self.W1 + self.b1 + self.a1 = self.relu(self.z1) + + # Dropout(训练时) + if self.keep_prob < 1.0 and hasattr(self, 'training'): + self.d1 = (np.random.rand(*self.a1.shape) < self.keep_prob).astype(float) + self.a1 *= self.d1 + self.a1 /= self.keep_prob + + # 第二层 + self.z2 = self.a1 @ self.W2 + self.b2 + self.probs = self.softmax(self.z2) + + return self.probs + + def backward(self, X, y): + """反向传播""" + batch_size = X.shape[0] + + # 输出层梯度 + d_z2 = self.probs.copy() + + # 应用类别权重 + if self.class_weight is not None: + for i in range(batch_size): + d_z2[i, y[i]] -= self.class_weight[y[i]] + else: + d_z2[np.arange(batch_size), y] -= 1 + + # 第二层梯度 + d_W2 = self.a1.T @ d_z2 + d_b2 = np.sum(d_z2, axis=0) + + # 隐藏层梯度 + d_a1 = d_z2 @ self.W2.T + d_z1 = d_a1 * self.relu_derivative(self.z1) + + # Dropout梯度 + if self.keep_prob < 1.0 and hasattr(self, 'd1'): + d_z1 *= self.d1 + d_z1 /= self.keep_prob + + # 第一层梯度 + d_W1 = X.T @ d_z1 + d_b1 = np.sum(d_z1, axis=0) + + # 更新 + self.W1 -= self.lr * d_W1 / batch_size + self.b1 -= self.lr * d_b1 / batch_size + self.W2 -= self.lr * d_W2 / batch_size + self.b2 -= self.lr * d_b2 / batch_size + + def fit(self, X, y, X_val=None, y_val=None, epochs=100, batch_size=32, verbose=True): + """训练""" + num_samples = len(X) + num_batches = (num_samples + batch_size - 1) // batch_size + + for epoch in range(epochs): + # 打乱 + indices = np.random.permutation(num_samples) + X_shuffled = X[indices] + y_shuffled = y[indices] + + epoch_loss = 0 + self.training = True # 开启Dropout + + for batch_idx in range(num_batches): + start = batch_idx * batch_size + end = min(start + batch_size, num_samples) + X_batch = X_shuffled[start:end] + y_batch = y_shuffled[start:end] + + # 前向 + 反向 + probs = self.forward(X_batch) + self.backward(X_batch, y_batch) + + # 损失 + loss = -np.mean(np.log(np.clip(probs[np.arange(len(y_batch)), y_batch], 1e-10, 1))) + epoch_loss += loss + + self.training = False # 关闭Dropout + + # 评估 + if verbose and (epoch + 1) % 20 == 0: + train_acc = self.accuracy(X, y) + msg = f"Epoch {epoch+1:3d}/{epochs} | Loss: {epoch_loss/num_batches:.4f} | 训练准确率: {train_acc:.4f}" + if X_val is not None: + val_acc = self.accuracy(X_val, y_val) + msg += f" | 测试准确率: {val_acc:.4f}" + print(msg) + + return self + + def predict(self, X): + return np.argmax(self.forward(X), axis=1) + + def predict_proba(self, X): + return self.forward(X) + + def accuracy(self, X, y): + return np.mean(self.predict(X) == y) + + def save(self, filepath): + """保存模型权重""" + np.save(filepath + '_W1.npy', self.W1) + np.save(filepath + '_b1.npy', self.b1) + np.save(filepath + '_W2.npy', self.W2) + np.save(filepath + '_b2.npy', self.b2) + print(f"模型已保存: {filepath}") + + @staticmethod + def load(filepath, input_size, hidden_size=64, num_classes=2, learning_rate=0.1, keep_prob=1.0): + """加载模型权重""" + model = MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob) + model.W1 = np.load(filepath + '_W1.npy') + model.b1 = np.load(filepath + '_b1.npy') + model.W2 = np.load(filepath + '_W2.npy') + model.b2 = np.load(filepath + '_b2.npy') + print(f"模型已加载: {filepath}") + return model + + +def create_model(model_type, input_size, hidden_size=64, num_classes=2, + learning_rate=0.1, keep_prob=1.0, class_weight=None): + """工厂函数:创建模型""" + if model_type == 'lr': + return LogisticRegression(input_size, num_classes, learning_rate, class_weight) + elif model_type == 'mlp': + return MLP(input_size, hidden_size, num_classes, learning_rate, keep_prob, class_weight) + else: + raise ValueError(f"未知模型类型: {model_type}")