1.k-means聚类
1.1.算法简介
1.1.1 牧师-村民模型
1.1.2 原理
1.2 计算步骤
1.3 K值选择
K 值的选取对 K-means 影响很大,这也是 K-means 最大的缺点,常见的选取 K 值的方法有:手肘法、Gap statistic 方法。
- 核心指标:SSE(sum of the squared errors,误差平方和)
- 核心思想:随着聚类数k的增大,样本划分会更加精细,每个簇的聚合程度会逐渐提高,那么误差平方和SSE自然会逐渐变小。当k小于真实聚类数时,由于k的增大会大幅增加每个簇的聚合程度,故SSE的下降幅度会很大,而当k到达真实聚类数时,再增加k所得到的聚合程度回报会迅速变小,所以SSE的下降幅度会骤减,然后随着k值的继续增大而趋于平缓,也就是说SSE和k的关系图是一个手肘的形状,而这个肘部对应的k值就是数据的真实聚类数
显然,肘部对于的k值为3(曲率最高),故对于这个数据集的聚类而言,最佳聚类数应该选3。
手肘法的缺点在于需要人工看不够自动化,所以我们又有了 Gap statistic 方法,此方法出自斯坦福大学的论文:“estimating the number of clusters in a dataset via the gap statistic”
其中Dk为损失函数,这里E(logDk)指的是logDk的期望。这个数值通常通过蒙特卡洛模拟产生,我们在样本里所在的区域中按照均匀分布随机产生和原始样本数一样多的随机样本,并对这个随机样本做 K-Means,从而得到一个Dk。如此往复多次,通常 20 次,我们可以得到 20 个logDk。对这 20 个数值求平均值,就得到了E(logDk)的近似值。最终可以计算 Gap Statisitc。而 Gap statistic 取得最大值所对应的 K 就是最佳的 K。
由图可见,当 K=5时,Gap(K) 取值最大,所以最佳的簇数是 K=5。
1.4 算法优缺点
简单而高效:K均值聚类是一种简单而直观的聚类算法,易于理解和实现。它的计算效率通常较高,特别适用于大规模数据集。
可扩展性:K均值聚类可以处理大规模数据集,并且在处理大型数据时具有较好的可扩展性。
相对较快的收敛:K均值聚类通常会在有限的迭代次数内收敛,因此在实践中运行时间较短。
对于具有明显分离簇的数据集,K均值聚类通常能够产生较好的聚类结果。
对初始聚类中心的敏感性:K均值聚类对初始聚类中心的选择非常敏感。不同的初始中心可能导致不同的聚类结果,因此需要进行多次运行以选择最佳结果。
对数据分布的假设:K均值聚类假设每个簇的形状是球状的,并且簇的大小相似。对于非球状、大小差异较大或者存在重叠的簇,K均值聚类的效果可能不佳。
不适用于处理噪声和异常值:K均值聚类对噪声和异常值敏感,这些数据点可能会显著影响聚类结果。
需要预先指定簇的数量:K均值聚类需要事先确定簇的数量K。在实际应用中,选择适当的K值可能是一项挑战,且错误选择K值可能导致不合理的聚类结果。
1.5 应用场景
1.6 Python实现
density,ratio
0.697,0.460
0.774,0.376
0.634,0.264
0.608,0.318
0.556,0.215
0.403,0.237
0.481,0.149
0.437,0.211
0.666,0.091
0.243,0.267
0.245,0.057
0.343,0.099
0.639,0.161
0.657,0.198
0.360,0.370
0.593,0.042
0.719,0.103
0.359,0.188
0.339,0.241
0.282,0.257
0.748,0.232
0.714,0.346
0.483,0.312
0.478,0.437
0.525,0.369
0.751,0.489
0.532,0.472
0.473,0.376
0.725,0.445
0.446,0.459
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random
dataset = pd.read_csv('watermelon.csv', delimiter=",")
data = dataset.values
print(dataset)
def distance(x1, x2): # 计算距离
return sum((x1-x2)**2)
def Kmeans(D,K,maxIter):
m, n = np.shape(D)
if K >= m:
return D
initSet = set()
curK = K
while(curK>0): # 随机选取k个样本
randomInt = random.randint(0, m-1)
if randomInt not in initSet:
curK -= 1
initSet.add(randomInt)
U = D[list(initSet), :] # 均值向量,即质心
C = np.zeros(m)
curIter = maxIter # 最大的迭代次数
while curIter > 0:
curIter -= 1
# 计算样本到各均值向量的距离
for i in range(m):
p = 0
minDistance = distance(D[i], U[0])
for j in range(1, K):
if distance(D[i], U[j]) < minDistance:
p = j
minDistance = distance(D[i], U[j])
C[i] = p
newU = np.zeros((K, n))
cnt = np.zeros(K)
for i in range(m):
newU[int(C[i])] += D[i]
cnt[int(C[i])] += 1
changed = 0
# 判断质心是否发生变化,如果发生变化则继续迭代,否则结束
for i in range(K):
newU[i] /= cnt[i]
for j in range(n):
if U[i, j] != newU[i, j]:
changed = 1
U[i, j] = newU[i, j]
if changed == 0:
return U, C, maxIter-curIter
return U, C, maxIter-curIter
U, C, iter = Kmeans(data,3,20)
f1 = plt.figure(1)
plt.title('watermelon')
plt.xlabel('density')
plt.ylabel('ratio')
plt.scatter(data[:, 0], data[:, 1], marker='o', color='g', s=50)
plt.scatter(U[:, 0], U[:, 1], marker='o', color='r', s=100)
m, n = np.shape(data)
for i in range(m):
plt.plot([data[i, 0], U[int(C[i]), 0]], [data[i, 1], U[int(C[i]), 1]], "c--", linewidth=0.3)
plt.show()
2.K-modes聚类
2.1 算法简介
2.2 计算步骤
2.3 Python实现
import numpy as np
from kmodes import kmodes
'''生成互相无交集的离散属性样本集'''
data1 = np.random.randint(1,6,(10000,10))
data2 = np.random.randint(6,12,(10000,10))
data = np.concatenate((data1,data2))
'''进行K-modes聚类'''
km = kmodes.KModes(n_clusters=2)
clusters = km.fit_predict(data)
'''计算正确归类率'''
score = np.sum(clusters[:int(len(clusters)/2)])+(len(clusters)/2-np.sum(clusters[int(len(clusters)/2):]))
score = score/len(clusters)
if score >= 0.5:
print('正确率:'+ str(score))
else:
print('正确率:'+ str(1-score))
3.k-prototypes聚类
3.1 算法简介
3.2 算法步骤
3.3 python内嵌API
from kmodes.kprototypes import KPrototypes
3.4 K值选择方法
3.5 Python实现
3.5.1 内嵌函数实现
#!/usr/bin/env python
import timeit
import numpy as np
from kmodes.kprototypes import KPrototypes
# number of clusters
K = 20
# no. of points
N = int(1e5)
# no. of dimensions
M = 10
# no. of numerical dimensions
MN = 5
# no. of times test is repeated
T = 3
data = np.random.randint(1, 1000, (N, M))
def huang():
KPrototypes(n_clusters=K, init='Huang', n_init=1, verbose=2)\
.fit_predict(data, categorical=list(range(M - MN, M)))
def cao():
KPrototypes(n_clusters=K, init='Cao', verbose=2)\
.fit_predict(data, categorical=list(range(M - MN, M)))
if __name__ == '__main__':
for cm in ('huang', 'cao'):
print(cm.capitalize() + ': {:.2} seconds'.format(
timeit.timeit(cm + '()',
setup='from __main__ import ' + cm,
number=T)))
3.5.2 面向过程的实现
import numpy as np
import pandas as pd
def get_distances(cores, data: np.ndarray, Numerical, Type, gamma) -> np.ndarray:
'''
距离计算函数
'''
m, n = data.shape
k = cores.shape[0]
d1 = (np.abs(np.repeat(data[:, Numerical], k, axis=0).reshape(m, k, len(Numerical)) - cores[:, Numerical]))*np.array(gamma)[Numerical]
distance_1 = np.sum(d1, axis=2) # ndarray(m, k)
distance_2 = np.sum((np.repeat(data[:, Type], k, axis=0).reshape(m, k, len(Type)) != cores[:, Type])*np.array(gamma)[Type], axis=2)
return distance_1 + distance_2
def get_new_cores(result, data, cores):
'''
重新找聚类中心
'''
k = cores.shape[0]
for i in range(k): # 遍历质心集
items = data[result == i] # 找出对应当前质心的子样本集
cores[i, Numerical] = np.mean(items[:, Numerical], axis=0) # 更新数据型数据核心
for num in Type:
cores[i, num] = np.argmax(np.bincount(items[:, num])) # 更新分类型数据核心
return cores
def k_prototypes(data, k, Numerical, Type, gamma):
"""
k-means聚类算法
k - int,指定分簇数量
data - ndarray(m, n),m个样本的数据集,每个样本n个属性值
Numerical - list,数值型数据所在列
Type - list, 分类型数据所在的列
gamma - float, 分类属性权重。
- list, 若为list,保证 len(gamma)==len(Type),每个分类型数据的权重可单独设置
"""
m, n = data.shape # m:样本数量,n:每个样本的属性值个数
result = np.empty(m) # 生成容器,用于存放m个样本的聚类结果
cores = data[np.random.choice(np.arange(m), k, replace=False)] # 从m个数据样本中不重复地随机选择k个样本作为质心
while True: # 迭代计算
distance = get_distances(cores, data, Numerical, Type, gamma) # 计算距离矩阵
index_min = np.argmin(distance, axis=1) # 每个样本距离最近的质心索引序号
if (index_min == result).all(): # 如果样本聚类没有改变
return result, cores # 则返回聚类结果和质心数据
result[:] = index_min # 重新分类
cores = get_new_cores(result, data, cores) # 重新寻找聚类中心
data=pd.read_csv(r'/root/data_tmp/四川未成年人聚类.csv',header=0).values
# data数据共有8列,其中 0-5列的数据为分类型数据,6-7列为数值型数据。
Numerical=[6,7]
Type=[0,1,2,3,4,5]
gamma=[1,1,1,1,1,1,2/7,3/18] # 定义每项指标的占比
k=5 # 设置聚类
all_result={} # 用于保存运行结果,保存为json格式
# 运行10
for i in range(10):
result, cores = k_prototypes(data, k, Numerical, Type, gamma)
all_result[i]={}
all_result[i]['result']=result # 保存分类结果
all_result[i]['cores'] = cores # 保存分类的簇中心
cores_nums=[]
for one_k in range(k):
cores_nums.append(int(sum(result==one_k))) # 对数据进行转化,后续保存为json格式会报错
all_result[i]['cores_nums'] = cores_nums # 保存每个簇的数量
3.6 效果评价
import os
import sys
from pyspark import SparkConf
from pyspark.sql import SparkSession
# 如果当前代码文件运行测试需要加入修改路径,避免出现后导包问题
BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))
sys.path.insert(0, os.path.join(BASE_DIR))
PYSPARK_PYTHON = "/opt/anaconda3/envs/pythonOnYarn/bin/python3.8"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ["HADOOP_USER_NAME"]="hdfs"
spark=SparkSession.builder.master("yarn").appName("chen") \
.config("spark.yarn.queue","q2") \
.config("spark.executor.instances",80) \
.config("spark.executor.cores",8) \
.config("spark.executor.memory","4g") \
.enableHiveSupport() \
.getOrCreate()
sc=spark.sparkContext
# 基本参数设置
Numerical=[6,7]
Type=[0,1,2,3,4,5]
k=5
broadcast=sc.broadcast({'k':k,'Num_list':Numerical,'Type_list':Type,'gamma':gamma}) # 广播变量
# 基础数据处理
import pandas as pd
data=pd.read_csv(r'/root/聚类数据.csv',header=0).values
data_list = data.tolist()
data_tuple = [(i, tuple(data_list[i])) for i in range(len(data_list))] # 将数据转化为元组
rdd = sc.parallelize(data_tuple, 75).cache() # 将数据转化为rdd,分区数设为75,
broad_rdd = sc.broadcast(data_tuple) # 将元数据进行广播
for key,value in all_result.items(): # all_result 是由之前代码运行的结果
result=np.array(value['result']) # 转成ndarray格式
cores=np.array(value['cores'])
broadresult=sc.broadcast(tuple(result.astype(int).tolist())) # 将当前分类结果进行广播
def get_distance_rdd(one_rdd):
broadcast_value=broadcast.value
broadResult=broadresult.value
broadrdd=broad_rdd.value
def distance(rdd1,rdd2):
sum_=0
for i in range(len(rdd1[1])):
if i in broadcast_value['Num_list']:
sum_+=abs(rdd1[1][i]-rdd2[1][i])*broadcast_value['gamma'][i]
elif i in broadcast_value['Type_list']:
sum_+=0 if rdd1[1][i]==rdd2[1][i] else broadcast_value['gamma'][i]
return sum_
cluster_in=[0,0] # [簇内的距离之和,簇内最远距离]
cluster_out=[float('inf')]*broadcast_value['k']
for one in broadrdd:
dis_tmp=distance(one_rdd,one)
if broadResult[one[0]] == broadResult[one_rdd[0]]: # 同一簇内
cluster_in[0]+=dis_tmp
if dis_tmp>cluster_in[1]:
cluster_in[1]=dis_tmp
else:
if cluster_out[broadResult[one[0]]]>dis_tmp:
cluster_out[broadResult[one[0]]]=dis_tmp
return (broadResult[one_rdd[0]],cluster_in,cluster_out) # (所属类别,(簇内平均距离,簇内最远距离),(不同簇间的最近的距离))
res=rdd.map(get_distance_rdd).cache()
target=res.map(lambda x:(x[0],x[1]+x[2])).reduceByKey(lambda x,y:[x[0]+y[0],x[1] if x[1]>=y[1] else y[1]]+[x[i] if x[i]<=y[i] else y[i] for i in range(2,len(x))]).sortByKey(ascending=True).collect()
avg=[]
diam=[]
dmin=[]
for one in target:
avg.append(one[1][0]/(sum(result==one[0])**2))
diam.append(one[1][1])
dmin.append(one[1][2:])
dcen=get_distances(cores, cores, Numerical, Type, gamma).astype(int).tolist()
DBI_list=[-float('inf')]*k
for i in range(k):
for j in range(k):
if i!=j:
tmp=(avg[i]+avg[j])/dcen[i][j]
if tmp>DBI_list[i]:
DBI_list[i]=tmp
DBI=sum(DBI_list)/k
DI=min([min(one) for one in dmin])/max(diam)
all_result[key]['avg']=avg
all_result[key]['diam'] = diam
all_result[key]['dmin'] = dmin
all_result[key]['dcen'] = dcen
all_result[key]['DBI'] = DBI
all_result[key]['DI'] = DI
# 将运行结果保存为json文件
import json
def dic_to_json(dic,save_path):
tf=open(save_path,"w")
json.dump(dic,tf)
tf.close()
dic_to_json(all_result,r'/root/k_prototypes_k_5结果.json')