在机器学习中分为监督学习和无监督学习。其中聚类算法就是无监督学习。聚类算法就是根据某种相似性将相似的样本划分为一个类型。比如,最简单的k-mean算法的相似规则就是空间中的位置,两个样本点空间中位置越接近表示越相似。
K-means算法
在Spark mllib中实现了k-mean和k-mean++算法,下面是在saprk mllib中可调的参数:
- k:分类的数量。注意在Spark mlib所实现的k-mean算法中,实际分出的类型的数量可以小于k。比如,最后结果少于k个不同的类。
- maxIterations:最大的迭代计算数。
- initializationModel:指定随机初始化或者通过k-mean++初始化。
- epsilon:决定k-mean已经收敛的距离闸值。
- initalModel:用于初始化的一组可选的集群中心。如果提供这个参数,则只会迭代计算一次。
下面的例子,我们将数据聚合成两个族。然后计算平方误差之和。
package com.kmeanTest
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Main {
def main(args: Array[String]): Unit = {
//TODO 创建环境
val conf: SparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 数据操作
//读取数据
val path = "src/main/resources/data/mllib/kmeans_data.txt"
val line: RDD[String] = sc.textFile(path)
val parsedData: RDD[linalg.Vector] = line.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
//设置KMean的参数
val numClusters = 2 //分成2个簇
val numIterations = 20
val clusters: KMeansModel = KMeans.train(parsedData, numClusters, numIterations)
//计算出误差
val WSSSE: Double = clusters.computeCost(parsedData)
println(s"Within Set Sum of Squared Error = $WSSSE")
//TODO 关闭环境
sc.stop()
}
}
高斯混合模型
高斯混合模型是一种复合模型,比如\(p(x)=\pi_1p_1(x)+\pi_2p_2(x)+...+\pi_kp_k(x)\)其中\(\sum_{i=1}^{k}{\pi_i}=1\)和\(p_i(x)\)中\(x \sim N(\mu_i, \sigma^2_i)\)
在Spark mllib中有以下参数来调整模型:
- k:需要分簇的数量
- convergenceTol:表示收敛时,两次计算期望的的差的最大值。
- maxIterations:表示达不到收敛的时候,可以计算迭代的最大次数。
- initialModel:一个可选的起点,用来初始化EM算法。(因为EM算法对初始化敏感)如果不设置该参数,会随机生成起始点。
下面示例中,我们将会将数据分为两个簇,然后输出高斯混合模型的参数。
部分数据:
2.59470454e+00 2.12298217e+00
1.15807024e+00 -1.46498723e-01
2.46206638e+00 6.19556894e-01
代码:
幂迭代聚类
潜在狄利克雷分配
二分k-means
二分k-means是k-means的一个变形,或者说改进版。k-means的初始质心是随机的。不同的初始质心会导致最后的结果不同。也就是说k-means对初始化比较敏感。而二分k-means就解决了这个问题。二分k-means的算法步骤如下:
假设D是待分簇的数据集,令\(D=\{D_{0,0}\}\),初始质心\(p_{0,0}\)。
- 步骤1:计算\({D_{0,0}}\)的所有样本x与该簇所在质心的SSE值,\(SSE_{i,j}=\sum_{i=1}^{n}{\sum_{j=1}^{m}{dis(x_{i,j}, p_{j})}}\),其中\(SSE_{i,j}\)表示第i次二分的第j个簇,\(p_{i}\)表示第j个簇的质心。
- 步骤2:从\(\{D_{n,0}, D_{n, 1}...,D_{n, n} \}, n < k-1\)选择出SEE最大的簇。
- 步骤3:假设选出的簇为\(D_{n,s}\),将其进行d次k为2的普通k-means分簇。将\(D_{n,s}\)分为\(\{B_1,B_2,..,B_d\}, B_i={C_{i,0},C_{i,1}}\)。计算出\(SSE(B_i)=\sum_{j=0}^{1}{SSE(C_{i,j})}\),然后选出最小的\(SSE(B_i)\),然后让\(\{C_{i,0},C_{i,1}\}\)代替\(D_{n,s}\),
- 步骤4:重复步骤2和步骤3,直到n=k-1(因为我是从0开始的)为止。
二分k-means是一种层次聚类。层次聚类是最常用的聚类分析方法之一,主要目的是建立聚类的层次结构。层次聚类的策略一般分为两类: - 聚合式:这是一种“自上而下”的方法,每个观察值都是在自身的簇中计算出的,随着层次的提升,按照某种标准不断的合并簇形成新的簇。
- 分裂式:这是一种“自下而上”的方法,所有的观察值都是从整体所形成的簇开始的,然后按照某个标准将一个簇分成2个或者多个簇。
在Spark MLlib中提供了如下的参数来调整模型: - k:分簇的数量(默认值为4),注意,模型的结果可以小于k值。
- maxIterations: 每轮进行2分的k-means的次数。
- minDivisibleClusterSize:如果是大于1,则规定了可分簇的最小样本数量,比如设置为5,当一个簇的样本为5,而且SSE最大,则算法结束。如果小于1,则规定了可分簇的最小比例。默认值为1,表示簇样本数为1时不可再分。
package com.BisectingKmeansTest
import org.apache.spark.mllib.clustering.{BisectingKMeans, BisectingKMeansModel}
import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Main {
def main(args: Array[String]): Unit = {
//TODO 创建环境
val conf: SparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)
//TODO 数据操作
//读取数据
val path = "src/main/resources/data/mllib/kmeans_data.txt"
val line: RDD[String] = sc.textFile(path)
val data: RDD[linalg.Vector] = line.map(s => {
Vectors.dense(s.split(' ').map(_.toDouble))
})
//设置参数
val k = 6
val maxIterations = 10
val minDivisibleClusterSize = 5
//创建模型
val bisectingKmeans: BisectingKMeans = new BisectingKMeans().setK(k)
.setMaxIterations(maxIterations)
.setMinDivisibleClusterSize(minDivisibleClusterSize)
val model: BisectingKMeansModel = bisectingKmeans.run(data)
//输出簇的质心
model.clusterCenters.zipWithIndex.foreach(r=>{
println(s"Cluster Center ${r._2}: ${r._1}")
})
//TODO 关闭环境
sc.stop()
}
}
流式K-means
在大数据中,常常需要面对的数据是流式数据。在Spark MLlib中实现了伪K-mean的流式处理。流式K-means的关键就是对簇的质心的更新操作,也就是说新的数据到达之后,质心的变化情况。在Spark MLlib是通过以下公式来更新质心的:
\[\vec{c_{t+1}}=\frac{\vec{c_t}n_t\alpha+\vec{x_t}m_t}{n_t\alpha+m_t} \]
\[n_{t+1}=n_t+m_t \]
其中\(\vec{c_t},\vec{x_t}\)分别表示原始簇与新添加数据所形成簇的质心。\(n_t,m_t\)分别是原始簇的样本数量和新添簇的样本数量,\(\alpha\)是衰变因子。
举个例子就能明白质心是如何被更新的:
假设原始簇只有一个样本(-1,-1),新添加的样本为(1,0),(2,1),(1,2)。则\(\vec{c_0}=(-1,-1),n_0=1,\vec{x_0}=(1,1),m_t=3\).
- 当\(\alpha=0\)时:\(c_1=\frac{(-1,-1)*1*0+(1,1)*3}{0+3}=(1,1)\)
- 当\(\alpha=0.5\)时:\(c_1=\frac{(-1,-1)*1*0.5+(1,1)*3}{0.5+3}=(5/7,5/7)\)
- 当\(\alpha=1\)时:\(c_1=\frac{(-1,-1)*1*1+(1,1)*3}{1+3}=(1/2,1/2)\)
是不是\(\alpha\)越大就越靠近原始的簇的质心,其实在大数据中\(n_t\)往往很大,所以\(\alpha\)取小于1的数才比较有价值。
package com.StreamKmeanTest
import org.apache.spark.SparkConf
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Main {
def main(args: Array[String]): Unit = {
//TODO 创建环境
val conf: SparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
//TODO 数据操作
//读取数据
val trainData: DStream[linalg.Vector] = ssc.textFileStream(args(0)).map(Vectors.parse)
val testData: DStream[LabeledPoint] = ssc.textFileStream(args(1)).map(LabeledPoint.parse)
//配置模型
val k: Int = args(3).toInt
val alpha = 1.0
val randomCenters: Int = args(4).toInt
//训练模型
val model: StreamingKMeans = new StreamingKMeans().setK(k).setDecayFactor(alpha).setRandomCenters(randomCenters, 0.0)
model.trainOn(trainData)
model.predictOnValues(testData.map(r=>(r.label, r.features))).print()
//TODO 执行程序
ssc.start()
ssc.awaitTermination()
}
}