0
点赞
收藏
分享

微信扫一扫

【Alink-KMeans】基于Alink算法平台的聚类【Java实现】

圣杰 2022-10-27 阅读 156


一、介绍

Alink是基于Flink的通用算法平台。

【Alink-KMeans】基于Alink算法平台的聚类【Java实现】_flink

1.1 数据聚类介绍

【Alink-KMeans】基于Alink算法平台的聚类【Java实现】_flink_02


1.可以定义为5组数据类型的特征字段名称:

sepal_length double, sepal_width double, petal_length double, petal_width double,

2.控制特征字段为petal_width
3.聚合类型为category
3.主要特征控制为:

sepal_length double, sepal_width double, petal_length double, petal_width double

二、Java API

2.1 Alink-KMeans.java

public class AlinkDemo {
public static void main(String[] args) throws Exception {
String URL = "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv";
String SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string";

BatchOperator data = new CsvSourceBatchOp()
.setFilePath(URL)
.setSchemaStr(SCHEMA_STR);

VectorAssembler va = new VectorAssembler()
.setSelectedCols(new String[]{"sepal_length", "sepal_width", "petal_length", "petal_width"})
.setOutputCol("features");

KMeans kMeans = new KMeans().setVectorCol("features").setK(3)
.setPredictionCol("prediction_result")
.setPredictionDetailCol("prediction_detail")
.setReservedCols("category")
.setMaxIter(100);

Pipeline pipeline = new Pipeline().add(va).add(kMeans);
pipeline.fit(data).transform(data).print();
}
}

2.2 pom

<dependency>
<groupId>com.alibaba.alink</groupId>
<artifactId>alink_core_flink-1.9_2.11</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.0</version>
</dependency>

2.3 打包插件

<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>com.wang.flink.alink.AlinkDemo</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

三、 本地执行聚类结果

【Alink-KMeans】基于Alink算法平台的聚类【Java实现】_flink_03

四、上传Flink集群

4.1 执行 打包 mvn -package

【Alink-KMeans】基于Alink算法平台的聚类【Java实现】_maven_04

4.2 上传至Linux

sftp> put D:/APP/IDEA/workplace/FlinkTurbineFaultDiagnosis/target/Flink-TurbineFaultDiagnosis-1.0-SNAPSHOT-jar-with-dependencies.jar

4.3 运行FLink执行

bin/flink run -p 1 -c com.wang.flink.alink.AlinkDemo /root/Flink-TurbineFaultDiagnosis-1.0-SNAPSHOT-jar-with-dependencies.jar

【Alink-KMeans】基于Alink算法平台的聚类【Java实现】_maven_05

4.4 Flink集群概览

访问集群节点:​​http://202.206.212.189:8081/​​

【Alink-KMeans】基于Alink算法平台的聚类【Java实现】_scala_06

【Alink-KMeans】基于Alink算法平台的聚类【Java实现】_maven_07


【Alink-KMeans】基于Alink算法平台的聚类【Java实现】_scala_08


举报

相关推荐

0 条评论