你有没有想过,那些约会软件和推荐系统是怎么做到“比你还懂你”的?这背后没有玄学,而是一系列有趣而有效的算法。今天,我们化身“代码月老”,亲手揭开一种经典方法——K-近邻(KNN)的神秘面纱。

这篇教程完整走一遍机器学习的小项目:从原始文本数据出发,完成清洗与转换;用纯 Python 与 NumPy 实现 KNN 核心;再用专业工具评估模型表现,看看它到底有多靠谱。

故事背景:帮海伦找到她的“菜”

主角海伦在一个约会网站上想提高推荐质量。她把过往的约会体验分为三类:

  • 魅力十足 (largeDoses)
  • 感觉一般 (smallDoses)
  • 敬而远之 (didntLike)

她认为评价主要和三个特征相关:

  1. 每年获得的飞行常客里程数(是否热爱旅行)
  2. 玩视频游戏所耗时间百分比(娱乐偏好)
  3. 每周消费的冰淇淋公升数(生活习惯)

我们的目标:输入这三项特征,自动预测她对新对象的喜好类别。

数据探索与预处理

在机器学习里,数据就是“食材”。食材质量常常决定了模型的上限。我们的原始数据在 datingTestSet.txt

数据加载:从文本到矩阵

计算机不直接吃文本。我们先写个函数,把文件读出来,变成 NumPy 矩阵(特征)和列表(标签)。

1
2
3
4
5
6
7
8
9
10
11
12
def file2matrix(filename):
"""从文件读取数据并转为矩阵与标签。"""
with open(filename) as fr:
lines = fr.readlines()
m = len(lines)
data = np.zeros((m, 3))
labels = []
for i, line in enumerate(lines):
parts = line.strip().split('\t')
data[i, :] = parts[0:3]
labels.append(parts[-1])
return data, labels

数据归一化:消除“单位”的霸权

特征的数值范围差异巨大:飞行里程动辄上万,冰淇淋只有个位数。直接用欧氏距离时,量纲大的特征会“霸占话语权”。所以要做 min-max 归一化,把所有特征压到 [0, 1] 区间:

$$ v_{new} = \frac{v_{old} - v_{min}}{v_{max} - v_{min}} $$

1
2
3
4
5
6
7
8
def auto_norm(data_set):
"""按列做 min-max 归一化。"""
min_vals = data_set.min(0)
max_vals = data_set.max(0)
ranges = max_vals - min_vals
m = data_set.shape[0]
norm = (data_set - np.tile(min_vals, (m, 1))) / np.tile(ranges, (m, 1))
return norm, ranges, min_vals

构建“大脑”——KNN 核心算法

KNN 是“懒惰学习”:没有显式训练过程,预测时才用整个训练集做邻近搜索。

直觉与做法

  • 距离度量:用欧氏距离来衡量相似度
    $$ d(p, q)=\sqrt{\sum_{i=1}^n (p_i-q_i)^2} $$
  • 选择 K:K 太小易受噪声影响,太大会过度平滑。本文先用 K=3
  • 投票决策:取最近的 K 个样本,看哪个类别出现最频繁。
1
2
3
4
5
6
7
8
9
10
11
def classify0(in_x, data_set, labels, k):
"""KNN 分类器:返回预测标签与邻居投票统计。"""
diff = np.tile(in_x, (data_set.shape[0], 1)) - data_set
distances = (diff**2).sum(axis=1) ** 0.5
sorted_idx = distances.argsort()
votes = {}
for i in range(k):
lab = labels[sorted_idx[i]]
votes[lab] = votes.get(lab, 0) + 1
pred = sorted(votes.items(), key=operator.itemgetter(1), reverse=True)[0][0]
return pred, votes

万事俱备,只欠代码——完整实现

下面把前面的逻辑整合进一个脚本,并配上评估与可视化。

完整 knn.py 脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import numpy as np
import operator
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, roc_curve, auc, precision_recall_curve, average_precision_score
from sklearn.preprocessing import label_binarize

def classify0(in_x, data_set, labels, k):
"""KNN 分类器:返回预测标签与邻居投票统计。"""
data_set_size = data_set.shape[0]
diff_mat = np.tile(in_x, (data_set_size, 1)) - data_set
distances = (diff_mat**2).sum(axis=1) ** 0.5
sorted_idx = distances.argsort()
votes = {}
for i in range(k):
lab = labels[sorted_idx[i]]
votes[lab] = votes.get(lab, 0) + 1
pred = sorted(votes.items(), key=operator.itemgetter(1), reverse=True)[0][0]
return pred, votes

def file2matrix(filename):
"""从文件读取数据并转为矩阵与标签。"""
with open(filename) as fr:
lines = fr.readlines()
m = len(lines)
data = np.zeros((m, 3))
labels = []
for i, line in enumerate(lines):
parts = line.strip().split('\t')
data[i, :] = parts[0:3]
labels.append(parts[-1])
return data, labels

def auto_norm(data_set):
"""按列做 min-max 归一化。"""
min_vals = data_set.min(0)
max_vals = data_set.max(0)
ranges = max_vals - min_vals
m = data_set.shape[0]
norm = (data_set - np.tile(min_vals, (m, 1))) / np.tile(ranges, (m, 1))
return norm, ranges, min_vals

def plot_confusion_matrix(y_true, y_pred, classes, title='Confusion matrix', cmap=plt.cm.Blues):
cm = confusion_matrix(y_true, y_pred)
plt.imshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
plt.colorbar()
ticks = np.arange(len(classes))
plt.xticks(ticks, classes, rotation=45)
plt.yticks(ticks, classes)
fmt = 'd'
thresh = cm.max() / 2.0
for i, j in np.ndindex(cm.shape):
plt.text(j, i, format(cm[i, j], fmt),
ha="center", color="white" if cm[i, j] > thresh else "black")
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.tight_layout()
plt.savefig('confusion_matrix.png')
plt.close()

def plot_roc_curve(y_true, y_scores, classes):
y_true_bin = label_binarize(y_true, classes=np.unique(y_true))
n_classes = y_true_bin.shape[1]
fpr, tpr, roc_auc = {}, {}, {}
for i in range(n_classes):
fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], y_scores[:, i])
roc_auc[i] = auc(fpr[i], tpr[i])
plt.figure()
colors = ['aqua', 'darkorange', 'cornflowerblue']
for i, color in zip(range(n_classes), colors):
plt.plot(fpr[i], tpr[i], color=color, lw=2,
label=f'ROC curve of class {i+1} (area = {roc_auc[i]:0.2f})')
plt.plot([0, 1], [0, 1], 'k--', lw=2)
plt.xlim([0.0, 1.0]); plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate'); plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.savefig('roc_curve.png')
plt.close()

def plot_pr_curve(y_true, y_scores, classes):
y_true_bin = label_binarize(y_true, classes=np.unique(y_true))
n_classes = y_true_bin.shape[1]
precision, recall, ap = {}, {}, {}
for i in range(n_classes):
precision[i], recall[i], _ = precision_recall_curve(y_true_bin[:, i], y_scores[:, i])
ap[i] = average_precision_score(y_true_bin[:, i], y_scores[:, i])
plt.figure()
colors = ['aqua', 'darkorange', 'cornflowerblue']
for i, color in zip(range(n_classes), colors):
plt.plot(recall[i], precision[i], color=color, lw=2,
label=f'PR curve of class {i+1} (AP = {ap[i]:0.2f})')
plt.xlabel('Recall'); plt.ylabel('Precision'); plt.title('Precision-Recall Curve')
plt.legend(loc="lower left")
plt.savefig('pr_curve.png')
plt.close()

def dating_class_test():
"""主流程:加载数据 -> 归一化 -> 拆分 -> 预测 -> 评估与可视化。"""
ho_ratio = 0.10 # 测试集比例 10%
data_raw, label_str = file2matrix('datingTestSet.txt')

label_map = {'didntLike': 1, 'smallDoses': 2, 'largeDoses': 3}
labels = [label_map[x] for x in label_str]

data_norm, ranges, min_vals = auto_norm(data_raw)
m = data_norm.shape[0]
n_test = int(m * ho_ratio)

train_x = data_norm[n_test:, :]
train_y = labels[n_test:]
test_x = data_norm[:n_test, :]
test_y = labels[:n_test]

k = 3
preds, scores = [], []
for i in range(n_test):
pred, vote = classify0(test_x[i, :], train_x, train_y, k)
preds.append(pred)
s = np.zeros(len(label_map))
for lab, cnt in vote.items():
s[lab - 1] = cnt / k
scores.append(s)
scores = np.array(scores)

err = (np.array(preds) != np.array(test_y)).sum()
acc = 1 - err / float(n_test)
print(f"Accuracy: {acc:.2f}")

precision, recall, f1, _ = precision_recall_fscore_support(test_y, preds, average='macro')
print(f"Precision: {precision:.2f}, Recall: {recall:.2f}, F1-Score: {f1:.2f}")

class_names = list(label_map.keys())
plot_confusion_matrix(test_y, preds, classes=class_names)
plot_roc_curve(test_y, scores, classes=np.unique(labels))
plot_pr_curve(test_y, scores, classes=np.unique(labels))
print("评估图表 (confusion_matrix.png, roc_curve.png, pr_curve.png) 已保存。")

if __name__ == '__main__':
dating_class_test()

效果怎么样?——模型“体检报告”

运行脚本后,你会拿到一份简明的结果与三张图表。

核心指标

  • 准确率 (Accuracy)95%(示例数值,用于解释指标含义)
  • 精确率 (Precision)0.95
  • 召回率 (Recall)0.95
  • F1-Score0.95

这些数字说明:对一个从零实现的简单模型来说,表现已经相当可观。

可视化分析:看图说话

混淆矩阵:模型在哪儿犯了错?

混淆矩阵

这张图就像“错题本”。对角线越大越好,非对角线是分类混淆的来源,能帮助你定位偏差。

ROC 曲线:判别力如何?

ROC曲线

曲线越靠近左上角越理想;AUC 接近 1.0 表明判别能力强。

PR 曲线:在“求精”与“求全”之间找平衡

PR曲线

当类别不平衡时尤其有参考价值;AP 越高越稳健。

总结与展望

我们完成了从数据准备、算法实现到评估可视化的全流程。更重要的是,掌握了一个可复用的套路:定义问题 → 准备数据 → 构建模型 → 评估与优化

想亲自上手试试吗?

  1. 安装依赖

    1
    pip install numpy scikit-learn matplotlib
  2. 准备文件
    将上面完整的 knn.py 保存到本地,确保 datingTestSet.txt 与它位于同一目录。

  3. 运行脚本

    1
    python knn.py
  4. 查看结果
    终端会打印准确率、精确率、召回率与 F1;三张评估图表会生成在当前目录:confusion_matrix.pngroc_curve.pngpr_curve.png。快去看看你的成果吧!