目录
概述
环境准备
搭建Spark项目的代码工程
创建maven项目工程
创建scala测试类
整合spark环境
机器学习基础知识
机器学习流程
机器学习模型的分类
评估模型效果的指标
分类算法模型
回归算法模型
聚类算法模型
Spark基础知识
Spark是什么?
Spark支持哪些开发语言?
Spark运行模式
SparkContext
RDD
基于Spark的推荐引擎
推荐模型的分类
基于内容的过滤
协同过滤
矩阵分解
构建基于Spark的推荐引擎
电影数据集
提取特征
概述
最近自己在加强AI这块,以前做Java、大数据分析比较多,所以对CDH那套东西都比较熟悉,例如Hadoop、Spark。但Spark这块中的ML用得不是很多,以前项目中,涉及到算法部分,我们往往都会让python系的同事负责。在AI这方面,python语言确实会有比较大的优势。当然,如果你的数据环境是基于Hadoop这些大数据框架来搭建的,我们也会用上spark去做算法这块处理,主要就是用到Pyspark里的Spark ML做数据处理。用Scala写都会相对比较少,我们都知道Spark的源码是用Scala来写的,而且官网都推荐我们用Scala来做开发。自己在早几年就买了《Spark机器学习》这本书,当时只是简单看了一下,由于在项目中很少用到,就没怎么花时间去深入研究。最近在做一些AI的基础整理,所以又重新看回这本书,自己也写写笔记,和大家分享一下。
环境准备
做过Spark开发的朋友都会有这样的感受,需要花很多时间在环境上,各种版本冲突报错,而且还不好找问题。确实,在大数据这些框架,他们经常要整合在一起,不像一些框架,可以把所有组件一起封装起来打包。拿Spark来说,能让Spark跑起来,我们需要安装Jdk、Scala环境,构建工程还需要安装maven工具。如果还需要整合一些存储框架,如hdfs、hive、hbase那些,到时会更麻烦。所以我们在实际项目开发中,都会好好整理组件之间的版本信息。
我们先从简单到复杂,慢慢地搭建起来,先把jdk、scala、maven环境搞好。这里就不是详细介绍这几个环境的安装过程了,这些网上都很多类似的教程。我这里列一下我的环境版本信息:
JDK:java version "1.8.0_151"
Scala:version 2.11.12
maven:Apache Maven 3.6.3
搭建Spark项目的代码工程
创建maven项目工程
新建一个maven project。
选择相应版本的JDK,以及Scala环境。
填写相应信息:
然后下一步,配置maven环境。
最后就可以创建工程了。
稍微等一会,就可以创建好。
创建好后,但发现pom有报错,报错信息如下:
Cannot resolve plugin org.scala-tools:maven-scala-plugin:<unknown>
Cannot resolve plugin org.apache.maven.plugins:maven-eclipse-plugin:<unknown>
我们打开pom.xml文件看看。
好像没有看到标红的地方,那我们先忽略它,到时用得时候,出问题再看看。
创建scala测试类
建好工程后,我们尝试创建一下scala类,看能不能正常使用。
我们会发现,IDEA会默认帮我们创建好一个名为“App.scala"文件。我们直接运行一下,看看是什么效果。
执行运行后,但直接报错了,编译那里就没有通过。看来上面的pom那里报错,还是有影响的。那我们只好深入研究一下,把这个bug先解决了。
println方法标红报错提示。
报错信息:
Error:scalac: Scala compiler JARs not found (module 'SparkMLDemo'): D:\maven_repository_3.63\org\scala-lang\scala-reflect\2.7.0\scala-reflect-2.7.0.jar
详细看了一下,这里说找不到scala-reflect-2.7.0.jar包,这个scala的版本和我们本机装的版本好像不一样。那我们看回pom.xml文件。
看来IDEA没有用我们本地scala环境来生成工程,那我们手动改一下,把版本改回我们自己本机的scala版本,我这里装的是2.11.12,所以pom.xml文件里改成这样。
<properties>
<scala.version>2.11.12</scala.version>
</properties>
然后重新更新maven,根据以下步骤来操作。
pom.xml--》右键--》Mave--》Reimport
重新看回“App.scala”文件,println方法不报错了,但Application那里还是红色报错。
如果对scala比较熟的朋友,可能会了解这种“extends Application”写法,这是scala的Main方法第二种实现方式。但我们一般都不建议这样使用,如果采用这种方法,我们无法向主函数(Main方法)传入参数。所以我们把代码改一下,改成以下方式。
package com.ispeasant
/**
* Hello world!
*
*/
object App {
def main(args: Array[String]): Unit = {
println("Hello World!")
}
}
同时我们把“test”里的东西清掉,免得有其它乱七八糟的问题。
删除后,然后运行看看效果。
Hello World!
所以从目前的来看,Scala环境应该是搞好的。
整合spark环境
弄好scala环境后,我们就可以接着弄spark环境,我们现在pom.xml文件里,添加spark相关的jar包。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
spark版本用的是2.4.0版本,所以需要在pom.xml文件前面添加spark版本变量。
<properties>
<scala.version>2.11.12</scala.version>
<spark.version>2.4.0</spark.version>
</properties>
添加好后,手动更新一下maven。
没问题后,我创建一个新scala文件来测试一下spark环境。
先实现最简单的代码逻辑,创建一个SparkContext,如果创建成功就打印出来。代码如下:
package com.ispeasant
import org.apache.spark.SparkContext
object SparkTest {
def main(args: Array[String]): Unit = {
val sc = new SparkContext("local[2]", "Spark Test")
if (sc != null)
println(sc.toString)
sc.stop()
}
}
然后,我们运行一下,看看结果。
E:\Java\jdk1.8.0_151\bin\java.exe "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2020.1\lib\idea_rt.jar=62938:C:\Program Files\JetBrains\IntelliJ IDEA 2020.1\bin" -Dfile.encoding=UTF-8 -classpath E:\Java\jdk1.8.0_151\jre\lib\charsets.jar;E:\Java\jdk1.8.0_151\jre\lib\deploy.jar;E:\Java\jdk1.8.0_151\jre\lib\ext\access-bridge-64.jar;E:\Java\jdk1.8.0_151\jre\lib\ext\cldrdata.jar;E:\Java\jdk1.8.0_151\jre\lib\ext\dnsns.jar;E:\Java\jdk1.8.0_151\jre\lib\ext\jaccess.jar;E:\Java\jdk1.8.0_151\jre\lib\ext\jfxrt.jar;E:\Java\jdk1.8.0_151\jre\lib\ext\localedata.jar;E:\Java\jdk1.8.0_151\jre\lib\ext\nashorn.jar;E:\Java\jdk1.8.0_151\jre\lib\ext\sunec.jar;E:\Java\jdk1.8.0_151\jre\lib\ext\sunjce_provider.jar;E:\Java\jdk1.8.0_151\jre\lib\ext\sunmscapi.jar;E:\Java\jdk1.8.0_151\jre\lib\ext\sunpkcs11.jar;E:\Java\jdk1.8.0_151\jre\lib\ext\zipfs.jar;E:\Java\jdk1.8.0_151\jre\lib\javaws.jar;E:\Java\jdk1.8.0_151\jre\lib\jce.jar;E:\Java\jdk1.8.0_151\jre\lib\jfr.jar;E:\Java\jdk1.8.0_151\jre\lib\jfxswt.jar;E:\Java\jdk1.8.0_151\jre\lib\jsse.jar;E:\Java\jdk1.8.0_151\jre\lib\management-agent.jar;E:\Java\jdk1.8.0_151\jre\lib\plugin.jar;E:\Java\jdk1.8.0_151\jre\lib\resources.jar;E:\Java\jdk1.8.0_151\jre\lib\rt.jar;E:\ispeasant\github\bwzydy4learn\Scala\SparkMLDemo\target\classes;D:\maven_repository_3.63\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;D:\maven_repository_3.63\org\apache\spark\spark-core_2.11\2.4.0\spark-core_2.11-2.4.0.jar;D:\maven_repository_3.63\org\apache\avro\avro\1.8.2\avro-1.8.2.jar;D:\maven_repository_3.63\org\codehaus\jackson\jackson-core-asl\1.9.13\jackson-core-asl-1.9.13.jar;D:\maven_repository_3.63\org\codehaus\jackson\jackson-mapper-asl\1.9.13\jackson-mapper-asl-1.9.13.jar;D:\maven_repository_3.63\com\thoughtworks\paranamer\paranamer\2.7\paranamer-2.7.jar;D:\maven_repository_3.63\org\apache\commons\commons-compress\1.8.1\commons-compress-1.8.1.jar;D:\maven_repository_3.63\org\tukaani\xz\1.5\xz-1.5.jar;D:\maven_repository_3.63\org\apache\avro\avro-mapred\1.8.2\avro-mapred-1.8.2-hadoop2.jar;D:\maven_repository_3.63\org\apache\avro\avro-ipc\1.8.2\avro-ipc-1.8.2.jar;D:\maven_repository_3.63\commons-codec\commons-codec\1.9\commons-codec-1.9.jar;D:\maven_repository_3.63\com\twitter\chill_2.11\0.9.3\chill_2.11-0.9.3.jar;D:\maven_repository_3.63\com\esotericsoftware\kryo-shaded\4.0.2\kryo-shaded-4.0.2.jar;D:\maven_repository_3.63\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;D:\maven_repository_3.63\org\objenesis\objenesis\2.5.1\objenesis-2.5.1.jar;D:\maven_repository_3.63\com\twitter\chill-java\0.9.3\chill-java-0.9.3.jar;D:\maven_repository_3.63\org\apache\xbean\xbean-asm6-shaded\4.8\xbean-asm6-shaded-4.8.jar;D:\maven_repository_3.63\org\apache\hadoop\hadoop-client\2.6.5\hadoop-client-2.6.5.jar;D:\maven_repository_3.63\org\apache\hadoop\hadoop-common\2.6.5\hadoop-common-2.6.5.jar;D:\maven_repository_3.63\commons-cli\commons-cli\1.2\commons-cli-1.2.jar;D:\maven_repository_3.63\xmlenc\xmlenc\0.52\xmlenc-0.52.jar;D:\maven_repository_3.63\commons-httpclient\commons-httpclient\3.1\commons-httpclient-3.1.jar;D:\maven_repository_3.63\commons-io\commons-io\2.4\commons-io-2.4.jar;D:\maven_repository_3.63\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\maven_repository_3.63\commons-lang\commons-lang\2.6\commons-lang-2.6.jar;D:\maven_repository_3.63\commons-configuration\commons-configuration\1.6\commons-configuration-1.6.jar;D:\maven_repository_3.63\commons-digester\commons-digester\1.8\commons-digester-1.8.jar;D:\maven_repository_3.63\commons-beanutils\commons-beanutils\1.7.0\commons-beanutils-1.7.0.jar;D:\maven_repository_3.63\commons-beanutils\commons-beanutils-core\1.8.0\commons-beanutils-core-1.8.0.jar;D:\maven_repository_3.63\com\google\protobuf\protobuf-java\2.5.0\protobuf-java-2.5.0.jar;D:\maven_repository_3.63\com\google\code\gson\gson\2.2.4\gson-2.2.4.jar;D:\maven_repository_3.63\org\apache\hadoop\hadoop-auth\2.6.5\hadoop-auth-2.6.5.jar;D:\maven_repository_3.63\org\apache\httpcomponents\httpclient\4.2.5\httpclient-4.2.5.jar;D:\maven_repository_3.63\org\apache\httpcomponents\httpcore\4.2.4\httpcore-4.2.4.jar;D:\maven_repository_3.63\org\apache\directory\server\apacheds-kerberos-codec\2.0.0-M15\apacheds-kerberos-codec-2.0.0-M15.jar;D:\maven_repository_3.63\org\apache\directory\server\apacheds-i18n\2.0.0-M15\apacheds-i18n-2.0.0-M15.jar;D:\maven_repository_3.63\org\apache\directory\api\api-asn1-api\1.0.0-M20\api-asn1-api-1.0.0-M20.jar;D:\maven_repository_3.63\org\apache\directory\api\api-util\1.0.0-M20\api-util-1.0.0-M20.jar;D:\maven_repository_3.63\org\apache\curator\curator-client\2.6.0\curator-client-2.6.0.jar;D:\maven_repository_3.63\org\htrace\htrace-core\3.0.4\htrace-core-3.0.4.jar;D:\maven_repository_3.63\org\apache\hadoop\hadoop-hdfs\2.6.5\hadoop-hdfs-2.6.5.jar;D:\maven_repository_3.63\org\mortbay\jetty\jetty-util\6.1.26\jetty-util-6.1.26.jar;D:\maven_repository_3.63\xerces\xercesImpl\2.9.1\xercesImpl-2.9.1.jar;D:\maven_repository_3.63\xml-apis\xml-apis\1.3.04\xml-apis-1.3.04.jar;D:\maven_repository_3.63\org\apache\hadoop\hadoop-mapreduce-client-app\2.6.5\hadoop-mapreduce-client-app-2.6.5.jar;D:\maven_repository_3.63\org\apache\hadoop\hadoop-mapreduce-client-common\2.6.5\hadoop-mapreduce-client-common-2.6.5.jar;D:\maven_repository_3.63\org\apache\hadoop\hadoop-yarn-client\2.6.5\hadoop-yarn-client-2.6.5.jar;D:\maven_repository_3.63\org\apache\hadoop\hadoop-yarn-server-common\2.6.5\hadoop-yarn-server-common-2.6.5.jar;D:\maven_repository_3.63\org\apache\hadoop\hadoop-mapreduce-client-shuffle\2.6.5\hadoop-mapreduce-client-shuffle-2.6.5.jar;D:\maven_repository_3.63\org\apache\hadoop\hadoop-yarn-api\2.6.5\hadoop-yarn-api-2.6.5.jar;D:\maven_repository_3.63\org\apache\hadoop\hadoop-mapreduce-client-core\2.6.5\hadoop-mapreduce-client-core-2.6.5.jar;D:\maven_repository_3.63\org\apache\hadoop\hadoop-yarn-common\2.6.5\hadoop-yarn-common-2.6.5.jar;D:\maven_repository_3.63\javax\xml\bind\jaxb-api\2.2.2\jaxb-api-2.2.2.jar;D:\maven_repository_3.63\javax\xml\stream\stax-api\1.0-2\stax-api-1.0-2.jar;D:\maven_repository_3.63\org\codehaus\jackson\jackson-jaxrs\1.9.13\jackson-jaxrs-1.9.13.jar;D:\maven_repository_3.63\org\codehaus\jackson\jackson-xc\1.9.13\jackson-xc-1.9.13.jar;D:\maven_repository_3.63\org\apache\hadoop\hadoop-mapreduce-client-jobclient\2.6.5\hadoop-mapreduce-client-jobclient-2.6.5.jar;D:\maven_repository_3.63\org\apache\hadoop\hadoop-annotations\2.6.5\hadoop-annotations-2.6.5.jar;D:\maven_repository_3.63\org\apache\spark\spark-launcher_2.11\2.4.0\spark-launcher_2.11-2.4.0.jar;D:\maven_repository_3.63\org\apache\spark\spark-kvstore_2.11\2.4.0\spark-kvstore_2.11-2.4.0.jar;D:\maven_repository_3.63\org\fusesource\leveldbjni\leveldbjni-all\1.8\leveldbjni-all-1.8.jar;D:\maven_repository_3.63\com\fasterxml\jackson\core\jackson-core\2.6.7\jackson-core-2.6.7.jar;D:\maven_repository_3.63\com\fasterxml\jackson\core\jackson-annotations\2.6.7\jackson-annotations-2.6.7.jar;D:\maven_repository_3.63\org\apache\spark\spark-network-common_2.11\2.4.0\spark-network-common_2.11-2.4.0.jar;D:\maven_repository_3.63\org\apache\spark\spark-network-shuffle_2.11\2.4.0\spark-network-shuffle_2.11-2.4.0.jar;D:\maven_repository_3.63\org\apache\spark\spark-unsafe_2.11\2.4.0\spark-unsafe_2.11-2.4.0.jar;D:\maven_repository_3.63\javax\activation\activation\1.1.1\activation-1.1.1.jar;D:\maven_repository_3.63\org\apache\curator\curator-recipes\2.6.0\curator-recipes-2.6.0.jar;D:\maven_repository_3.63\org\apache\curator\curator-framework\2.6.0\curator-framework-2.6.0.jar;D:\maven_repository_3.63\com\google\guava\guava\16.0.1\guava-16.0.1.jar;D:\maven_repository_3.63\org\apache\zookeeper\zookeeper\3.4.6\zookeeper-3.4.6.jar;D:\maven_repository_3.63\javax\servlet\javax.servlet-api\3.1.0\javax.servlet-api-3.1.0.jar;D:\maven_repository_3.63\org\apache\commons\commons-lang3\3.5\commons-lang3-3.5.jar;D:\maven_repository_3.63\org\apache\commons\commons-math3\3.4.1\commons-math3-3.4.1.jar;D:\maven_repository_3.63\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;D:\maven_repository_3.63\org\slf4j\slf4j-api\1.7.16\slf4j-api-1.7.16.jar;D:\maven_repository_3.63\org\slf4j\jul-to-slf4j\1.7.16\jul-to-slf4j-1.7.16.jar;D:\maven_repository_3.63\org\slf4j\jcl-over-slf4j\1.7.16\jcl-over-slf4j-1.7.16.jar;D:\maven_repository_3.63\log4j\log4j\1.2.17\log4j-1.2.17.jar;D:\maven_repository_3.63\org\slf4j\slf4j-log4j12\1.7.16\slf4j-log4j12-1.7.16.jar;D:\maven_repository_3.63\com\ning\compress-lzf\1.0.3\compress-lzf-1.0.3.jar;D:\maven_repository_3.63\org\xerial\snappy\snappy-java\1.1.7.1\snappy-java-1.1.7.1.jar;D:\maven_repository_3.63\org\lz4\lz4-java\1.4.0\lz4-java-1.4.0.jar;D:\maven_repository_3.63\com\github\luben\zstd-jni\1.3.2-2\zstd-jni-1.3.2-2.jar;D:\maven_repository_3.63\org\roaringbitmap\RoaringBitmap\0.5.11\RoaringBitmap-0.5.11.jar;D:\maven_repository_3.63\commons-net\commons-net\3.1\commons-net-3.1.jar;D:\maven_repository_3.63\org\json4s\json4s-jackson_2.11\3.5.3\json4s-jackson_2.11-3.5.3.jar;D:\maven_repository_3.63\org\json4s\json4s-core_2.11\3.5.3\json4s-core_2.11-3.5.3.jar;D:\maven_repository_3.63\org\json4s\json4s-ast_2.11\3.5.3\json4s-ast_2.11-3.5.3.jar;D:\maven_repository_3.63\org\json4s\json4s-scalap_2.11\3.5.3\json4s-scalap_2.11-3.5.3.jar;D:\maven_repository_3.63\org\scala-lang\modules\scala-xml_2.11\1.0.6\scala-xml_2.11-1.0.6.jar;D:\maven_repository_3.63\org\glassfish\jersey\core\jersey-client\2.22.2\jersey-client-2.22.2.jar;D:\maven_repository_3.63\javax\ws\rs\javax.ws.rs-api\2.0.1\javax.ws.rs-api-2.0.1.jar;D:\maven_repository_3.63\org\glassfish\hk2\hk2-api\2.4.0-b34\hk2-api-2.4.0-b34.jar;D:\maven_repository_3.63\org\glassfish\hk2\hk2-utils\2.4.0-b34\hk2-utils-2.4.0-b34.jar;D:\maven_repository_3.63\org\glassfish\hk2\external\aopalliance-repackaged\2.4.0-b34\aopalliance-repackaged-2.4.0-b34.jar;D:\maven_repository_3.63\org\glassfish\hk2\external\javax.inject\2.4.0-b34\javax.inject-2.4.0-b34.jar;D:\maven_repository_3.63\org\glassfish\hk2\hk2-locator\2.4.0-b34\hk2-locator-2.4.0-b34.jar;D:\maven_repository_3.63\org\javassist\javassist\3.18.1-GA\javassist-3.18.1-GA.jar;D:\maven_repository_3.63\org\glassfish\jersey\core\jersey-common\2.22.2\jersey-common-2.22.2.jar;D:\maven_repository_3.63\javax\annotation\javax.annotation-api\1.2\javax.annotation-api-1.2.jar;D:\maven_repository_3.63\org\glassfish\jersey\bundles\repackaged\jersey-guava\2.22.2\jersey-guava-2.22.2.jar;D:\maven_repository_3.63\org\glassfish\hk2\osgi-resource-locator\1.0.1\osgi-resource-locator-1.0.1.jar;D:\maven_repository_3.63\org\glassfish\jersey\core\jersey-server\2.22.2\jersey-server-2.22.2.jar;D:\maven_repository_3.63\org\glassfish\jersey\media\jersey-media-jaxb\2.22.2\jersey-media-jaxb-2.22.2.jar;D:\maven_repository_3.63\javax\validation\validation-api\1.1.0.Final\validation-api-1.1.0.Final.jar;D:\maven_repository_3.63\org\glassfish\jersey\containers\jersey-container-servlet\2.22.2\jersey-container-servlet-2.22.2.jar;D:\maven_repository_3.63\org\glassfish\jersey\containers\jersey-container-servlet-core\2.22.2\jersey-container-servlet-core-2.22.2.jar;D:\maven_repository_3.63\io\netty\netty-all\4.1.17.Final\netty-all-4.1.17.Final.jar;D:\maven_repository_3.63\io\netty\netty\3.9.9.Final\netty-3.9.9.Final.jar;D:\maven_repository_3.63\com\clearspring\analytics\stream\2.7.0\stream-2.7.0.jar;D:\maven_repository_3.63\io\dropwizard\metrics\metrics-core\3.1.5\metrics-core-3.1.5.jar;D:\maven_repository_3.63\io\dropwizard\metrics\metrics-jvm\3.1.5\metrics-jvm-3.1.5.jar;D:\maven_repository_3.63\io\dropwizard\metrics\metrics-json\3.1.5\metrics-json-3.1.5.jar;D:\maven_repository_3.63\io\dropwizard\metrics\metrics-graphite\3.1.5\metrics-graphite-3.1.5.jar;D:\maven_repository_3.63\com\fasterxml\jackson\core\jackson-databind\2.6.7.1\jackson-databind-2.6.7.1.jar;D:\maven_repository_3.63\com\fasterxml\jackson\module\jackson-module-scala_2.11\2.6.7.1\jackson-module-scala_2.11-2.6.7.1.jar;D:\maven_repository_3.63\org\scala-lang\scala-reflect\2.11.8\scala-reflect-2.11.8.jar;D:\maven_repository_3.63\com\fasterxml\jackson\module\jackson-module-paranamer\2.7.9\jackson-module-paranamer-2.7.9.jar;D:\maven_repository_3.63\org\apache\ivy\ivy\2.4.0\ivy-2.4.0.jar;D:\maven_repository_3.63\oro\oro\2.0.8\oro-2.0.8.jar;D:\maven_repository_3.63\net\razorvine\pyrolite\4.13\pyrolite-4.13.jar;D:\maven_repository_3.63\net\sf\py4j\py4j\0.10.7\py4j-0.10.7.jar;D:\maven_repository_3.63\org\apache\spark\spark-tags_2.11\2.4.0\spark-tags_2.11-2.4.0.jar;D:\maven_repository_3.63\org\apache\commons\commons-crypto\1.0.0\commons-crypto-1.0.0.jar;D:\maven_repository_3.63\org\spark-project\spark\unused\1.0.0\unused-1.0.0.jar com.ispeasant.SparkTest
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/11/03 16:29:25 INFO SparkContext: Running Spark version 2.4.0
20/11/03 16:29:25 INFO SparkContext: Submitted application: Spark Test
20/11/03 16:29:25 INFO SecurityManager: Changing view acls to: ispea
20/11/03 16:29:25 INFO SecurityManager: Changing modify acls to: ispea
20/11/03 16:29:25 INFO SecurityManager: Changing view acls groups to:
20/11/03 16:29:25 INFO SecurityManager: Changing modify acls groups to:
20/11/03 16:29:25 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ispea); groups with view permissions: Set(); users with modify permissions: Set(ispea); groups with modify permissions: Set()
20/11/03 16:29:27 INFO Utils: Successfully started service 'sparkDriver' on port 62954.
20/11/03 16:29:27 INFO SparkEnv: Registering MapOutputTracker
20/11/03 16:29:27 INFO SparkEnv: Registering BlockManagerMaster
20/11/03 16:29:27 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/11/03 16:29:27 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/11/03 16:29:27 INFO DiskBlockManager: Created local directory at C:\Users\ispea\AppData\Local\Temp\blockmgr-504280ad-fdce-4c5f-a5bd-ff7efb4a08cc
20/11/03 16:29:27 INFO MemoryStore: MemoryStore started with capacity 1984.5 MB
20/11/03 16:29:27 INFO SparkEnv: Registering OutputCommitCoordinator
20/11/03 16:29:27 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/11/03 16:29:27 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://DESKTOP-7KO4HHD:4040
20/11/03 16:29:27 INFO Executor: Starting executor ID driver on host localhost
20/11/03 16:29:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 62971.
20/11/03 16:29:27 INFO NettyBlockTransferService: Server created on DESKTOP-7KO4HHD:62971
20/11/03 16:29:27 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/11/03 16:29:28 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, DESKTOP-7KO4HHD, 62971, None)
20/11/03 16:29:28 INFO BlockManagerMasterEndpoint: Registering block manager DESKTOP-7KO4HHD:62971 with 1984.5 MB RAM, BlockManagerId(driver, DESKTOP-7KO4HHD, 62971, None)
20/11/03 16:29:28 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, DESKTOP-7KO4HHD, 62971, None)
20/11/03 16:29:28 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, DESKTOP-7KO4HHD, 62971, None)
org.apache.spark.SparkContext@377c68c6
20/11/03 16:29:28 INFO SparkUI: Stopped Spark web UI at http://DESKTOP-7KO4HHD:4040
20/11/03 16:29:28 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/11/03 16:29:28 INFO MemoryStore: MemoryStore cleared
20/11/03 16:29:28 INFO BlockManager: BlockManager stopped
20/11/03 16:29:28 INFO BlockManagerMaster: BlockManagerMaster stopped
20/11/03 16:29:28 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/11/03 16:29:28 INFO SparkContext: Successfully stopped SparkContext
20/11/03 16:29:28 INFO ShutdownHookManager: Shutdown hook called
20/11/03 16:29:28 INFO ShutdownHookManager: Deleting directory C:\Users\ispea\AppData\Local\Temp\spark-937abb61-e9d8-4d12-a93f-e5ad27478f70
Process finished with exit code 0
所以我们的Spark环境也没问题了。
机器学习基础知识
这次主要是记录自己学习《Spark机器学习》这本书的学习过程,并不会长篇大论讲述机器学习的基础理论,这里简单点一下即可。
机器学习流程
机器学习流程主要可分为以下几部分:
- 数据收集
- 数据预处理
- 特征工程
- 模型训练
- 模型测试验证
- 模型部署上线
机器学习模型的分类
可以分为两大类:
监督学习:使用已标记数据来学习。“已标记”很关键,训练集必须是有标签的,不然就很难使用监督学习的模型。像经典的Kaggle的泰坦尼克号那个比赛就是典型的监督学习模型,还有一些推荐系统中用到的模型。监督学习一般都有2个主要任务:回归和分类。
回归:预测连续的、具体的数值。
分类:对各种食物分门别类,用于离散型预测。
常用的监督学习算法:
算法名称 | 算法分类 | 简介 |
朴素贝叶斯(Naive Bayesian) | 分类 | 朴素贝叶斯法是基于贝叶斯定理与特征条件独立假设的分类方法。贝叶斯方法是以贝叶斯原理为基础,使用概率统计的知识对样本数据集进行分类。应用场景:如文本分类,垃圾邮件的分类,信用评估,钓鱼网站检测等等。 |
决策树(Decision Tree) | 分类 | 决策树是一个预测模型;他代表的是对象属性与对象值之间的一种映射关系。树中每个节点表示某个对象,而每个分叉路径则代表的某个可能的属性值,而每个叶结点则对应从根节点到该叶节点所经历的路径所表示的对象的值。 |
支持向量机(Support Vector Machine, SVM) | 分类 | 支持向量机是一类按监督学习方式对数据进行二元分类的广义线性分类器,其决策边界是对学习样本求解的最大边距超平面。 |
逻辑回归(Logistic Regression) | 分类 | 是一种广义的线性回归分析模型,常用于数据挖掘,疾病自动诊断,经济预测等领域。例如,探讨引发疾病的危险因素,并根据危险因素预测疾病发生的概率等。 |
线性回归(Linear Regression) | 回归 | 线性回归是利用数理统计中回归分析,来确定两种或两种以上变量间相互依赖的定量关系的一种统计分析方法,运用十分广泛。其表达形式为y = w'x+e,e为误差服从均值为0的正态分布。 |
回归树(Regression Tree) | 回归 | 用树模型做回归问题,每一片叶子都输出一个预测值,预测值一般是该片叶子所含训练集元素输出的均值。 |
K邻近(k-Nearest Neighbor,KNN) | 分类 | 在特征空间中,如果一个样本附近的k个最近(即特征空间中最邻近)样本的大多数属于某一个类别,则该样本也属于这个类别。 |
无监督学习:刚好和监督学习相反,训练过程中,不需要已标记的数据。模型会根据数据进行特征提取。一般都会用到聚类算法。如:
- K均值聚类算法:是一种迭代求解的聚类分析算法,其步骤是,预将数据分为K组,则随机选取K个对象作为初始的聚类中心,然后计算每个对象与各个种子聚类中心之间的距离,把每个对象分配给距离它最近的聚类中心。聚类中心以及分配给它们的对象就代表一个聚类。每分配一个样本,聚类的聚类中心会根据聚类中现有的对象被重新计算。这个过程将不断重复直到满足某个终止条件。终止条件可以是没有(或最小数目)对象被重新分配给不同的聚类,没有(或最小数目)聚类中心再发生变化,误差平方和局部最小。
- 分层聚类算法:对给定数据对象的集合进行层次分解,根据分层分解采用的分解策略,分层聚类法又可以分为凝聚的(agglomerative)和分裂的(divisive)分层聚类。
- 凝聚的分层聚类:它采用自底向上的策略,首先将每一个对象作为一个类,然后根据某种度量(如2个当前类中心点的距离)将这些类合并为较大的类,直到所有的对象都在一个类中,或者是满足某个终止条件时为止,绝大多数分层聚类算法属于这一类,它们只是在类间相似度的定义上有所不同。
- 分裂的分层聚类:它采用与凝聚的分层聚类相反的策略——自顶向下,它首先将所有的对象置于一个类中,然后根据某种度量逐渐细分为较小的类,直到每一个对象自成一个类,或者达到某个终止条件(如达到希望的类个数,或者2个最近的类之间的距离超过了某个阈值)。
- 基于密度的聚类算法(DBSCAN,Density-Based Spatial Clustering of Applications with Noise):与划分和层次聚类方法不同,它将簇定义为密度相连的点的最大集合,能够把具有足够高密度的区域划分为簇,并可在噪声的空间数据库中发现任意形状的聚类。
评估模型效果的指标
分类算法模型
准确率(Accuracy)
准确率计算公式:
Accuracy=(TP+TN) / (TP+FN+FP+TN)
即正确预测的正反样本数 / 总数
说明:
真正(True Positive , TP):被模型预测为正的正样本。
假正(False Positive , FP):被模型预测为正的负样本。
假负(False Negative , FN):被模型预测为负的正样本。
真负(True Negative , TN):被模型预测为负的负样本。
精确率(Precision)
精确率和准确率是不一样的,精确率只统计预测正确的正样本,并不是所有预测正确的样本。表示为预测出是正的里面有多少真正是正的,可理解为查准率。
公式:
Precision=TP / (TP+FP)
即正确预测的正样本数 / 预测正样本总数
召回率(Recall)
在实际正样本中,模型能预测出多少正样本。
公式:
Recall=TP / (TP+FN)
即正确预测的正样本数 / 实际正样本总数
F1 Score
F1 Score,是统计学中用来衡量二分类模型精确度的一种指标。它同时兼顾了分类模型的精确率和召回率。F1分数可以看作是模型精确率和召回率的一种调和平均,它的最大值是1,最小值是0。
公式:
F1 Score=2 * (Precision * Recall) / (Precision + Recall)
ROC曲线
指在特定刺激条件下,以被试在不同判断标准下所得的虚报概率P(y/N)为横坐标,以击中概率P(y/SN)为纵坐标,画得的各点的连线。
ROC曲线指受试者工作特征曲线 / 接收器操作特性曲线(receiver operating characteristic curve), 是反映敏感性和特异性连续变量的综合指标,是用构图法揭示敏感性和特异性的相互关系,它通过将连续变量设定出多个不同的临界值,从而计算出一系列敏感性和特异性,再以敏感性为纵坐标、(1-特异性)为横坐标绘制成曲线,曲线下面积越大,诊断准确性越高。在ROC曲线上,最靠近坐标图左上方的点为敏感性和特异性均较高的临界值。
总之:ROC曲线越接近左上角,该算法模型的性能越好。而且一般来说,如果ROC曲线是光滑的,那该算法模型就没有太大的过拟合。
AUC
AUC(Area Under Curve)被定义为ROC曲线下与坐标轴围成的面积,显然这个面积的数值不会大于1。又由于ROC曲线一般都处于y=x这条直线的上方,所以AUC的取值范围在0.5和1之间。AUC越接近1.0,检测方法真实性越高;等于0.5时,则真实性最低,无应用价值。
PR曲线
PR曲线实则是以precision(精准率)和recall(召回率)这两个为变量而做出的曲线,其中recall为横坐标,precision为纵坐标。
回归算法模型
平均绝对误差(MAE)
平均绝对误差是所有单个观测值与算术平均值的偏差的绝对值的平均。平均绝对误差可以避免误差相互抵消的问题,因而可以准确反映实际预测误差的大小
均方误差(MSE)
均方误差(mean-square error, MSE)是反映估计量与被估计量之间差异程度的一种度量。设t是根据子样确定的总体参数θ的一个估计量,(θ-t)2的数学期望,称为估计量t的均方误差。它等于σ2+b2,其中σ2与b分别是t的方差与偏倚。
均方根误差(RMSE)
均方根误差亦称标准误差,其定义为 ,i=1,2,3,…n。在有限测量次数中,均方根误差常用下式表示:√[∑di^2/n]=Re,式中:n为测量次数;di为一组测量值与真值的偏差。如果误差统计分布是正态分布,那么随机误差落在±σ以内的概率为68%。
决定系数
与复相关系数类似的,表示一个随机变量与多个随机变量关系的数字特征,用来反映回归模式说明因变量变化可靠程度的一个统计指标,一般用符号“R”表示,可定义为已被模式中全部自变量说明的自变量的变差对自变量总变差的比值。
聚类算法模型
兰德系数(Rand index)
用于聚类模型的性能评估,取值范围为[0,1],值越大意味着聚类结果与真实情况越吻合。
互信息(Mutual Information)
互信息是信息论里一种有用的信息度量,它可以看成是一个随机变量中包含的关于另一个随机变量的信息量,或者说是一个随机变量由于已知另一个随机变量而减少的不肯定性。
轮廓系数
轮廓系数(Silhouette Coefficient),是聚类效果好坏的一种评价方式。它结合内聚度和分离度两种因素。可以用来在相同原始数据的基础上用来评价不同算法、或者算法不同运行方式对聚类结果所产生的影响。
Spark基础知识
这本书主要是讲Spark的机器学习内容,Spark的基础知识,讲得比较简单。如果大家需要更深入学习Spark,可以找Spark相关书籍再详细研究。这里我简单总结一下一些关键知识点。
Spark是什么?
官网(https://spark.apache.org/)描述:
Apache Spar is a unified analytics engine for large-scale data processing.
Apache Spark是一个开源集群运算框架,最初是由加州大学柏克莱分校AMPLab所开发。相对于Hadoop的MapReduce会在运行完工作后将中介资料存放到磁盘中,Spark使用了存储器内运算技术,能在资料尚未写入硬盘时即在存储器内分析运算。Spark在存储器内运行程序的运算速度能做到比Hadoop MapReduce的运算速度快上100倍,即便是运行程序于硬盘时,Spark也能快上10倍速度。Spark允许用户将资料加载至集群存储器,并多次对其进行查询,非常适合用于机器学习算法。
使用Spark需要搭配集群管理员和分布式存储系统。Spark支持独立模式(本地Spark集群)、Hadoop YARN或Apache Mesos的集群管理。在分布式存储方面,Spark可以和 Alluxio, HDFS、 Cassandra、OpenStack Swift和Amazon S3等接口搭载。 Spark也支持伪分布式(pseudo-distributed)本地模式,不过通常只用于开发或测试时以本机文件系统取代分布式存储系统。在这样的情况下,Spark仅在一台机器上使用每个CPU核心运行程序。
Spark支持哪些开发语言?
目前Spark支持Scala,Java,Python三种编程语言。那我们如何选择?
我的建议是:如果项目是独立出来的,我建议用Scala来完成。如果还涉及其它编程语言项目的集成,例如Java项目、Python项目,就和它们对应即可。
Spark运行模式
Spark支持以下几种运行模式:
- 本地单机模式
- 集群单机模式
- 基于Mesos
- 基于YARN
注:在本地模式的时候,local[N]的N表示要使用的线程数目。
SparkContext
Spark程序的入口是从SparkContext开始,SparkContext初始化时需要一个SparkConf对象,通过SparkConf来配置运行参数。因此,Spark程序初始化代码大致如下:
package com.ispeasant
import org.apache.spark.{SparkConf, SparkContext}
object SparkContextTest {
def main(args: Array[String]): Unit = {
// 创建一个SparkConf对象
val conf = new SparkConf()
// 配置相应的参数,如应用名称、运行模式
conf.setAppName("SparkContextTest")
.setMaster("local[2]")
// 创建SparkContext
val sparkContext = new SparkContext(conf)
// 打印看看是否创建成功
println("sparkContext:==" + sparkContext)
}
}
可以看看运行后的结果:
除了这种方式外,我们还有另外一种方式来创建SparkContext。我们可以看看SparkContext这个类的构造函数。
从上面截图可以看出,我们可以直接采取默认参数来构建,如下:
package com.ispeasant
import org.apache.spark.{SparkConf, SparkContext}
object SparkContextTest2 {
def main(args: Array[String]): Unit = {
// 创建SparkContext
val sparkContext = new SparkContext("local[2]","SparkContextTest2")
// 打印看看是否创建成功
println("sparkContext:==" + sparkContext)
}
}
我们看看运行结果:
因此,两种创建方式都是可以的,但我个人比较偏向于第一种,在实际开发中,我们经常配置多种参数,第二种未必能满足。
RDD
RDD,弹性分布式数据集(Resilient Distributed Dataset,RDD)是 Spark 中的核心概念。我觉得RDD的最重要的一个特性是容错性,可以这样理解,RDD是分布在集群的节点上,以函数式操作集合的方式进行各种并行操作,同时,当某个节点或者任务失败时,RDD会在余下的节点上自动重建,以便任务能最终完成。
RDD的操作分为转换(transformation)和执行(action)两种。(注,面试的时候,经常会考这个问题,大家可以深入研究一下。)
那我们写一个RDD的例子看看。
package com.ispeasant
import org.apache.spark.{SparkConf, SparkContext}
object TestRDD {
def main(args: Array[String]): Unit = {
// 创建一个SparkConf对象
val conf = new SparkConf()
// 配置相应的参数,如应用名称、运行模式
conf.setAppName("SparkContextTest")
.setMaster("local[2]")
// 创建SparkContext
val sparkContext = new SparkContext(conf)
val data = Array(1, 2, 3, 4, 5)
val distData = sparkContext.parallelize(data)
println("distData=====" + distData)
val cout = distData.count()
println("cout====" + cout)
}
}
运行结果如下:
基于Spark的推荐引擎
推荐模型的分类
基于内容的过滤
利用物品的内容或是属性信息以及某些相似度定义,来求出与该物品类似的物品。
协同过滤
协同过滤是利用已知的用户编好来估计用户对其它物品的喜好程度。
矩阵分解
这个模型,我自己接触不多,所以就着重看了一下。
分为两种,一种是显式矩阵分解,另一种是隐式矩阵分解。
显式矩阵分解
当要处理的数据是由用户提供的自身偏好数据,这些数据被称作显式偏好数据。将这些数据转换为二维矩阵,矩阵的每一个数据表示某个用户对特定物品的偏好。如下:
隐式矩阵分解
显式矩阵分解是面向显式偏好数据,隐式矩阵分解是通过显式偏好数据提取隐式反馈数据。书上举了一个这样的例子,两个矩阵,一个矩阵表示用户看了哪些电影,另一个矩阵表示用户看电影的次数。
最小二乘法
最小二乘法(Alternating Least Squares,ALS)是一种求解矩阵分解问题的最优化方法。
构建基于Spark的推荐引擎
前面准备了一些核心技术知识,下面就开始学习一些实战项目,我们先走读一下书里的代码,如果到时觉得效果不好,自己再尝试优化一下。
我相信大家对“推荐”这两个字一点都不陌生,在日常生活中,我们经常都会遇到与它相关的东西,如广告推送,这应该是推进最经典应用场景之一。系统根据用户的特征,推送与之相关的内容。不管是电商平台、还是内容社区平台,内容推送是运营中及其重要的一环。
大家回想一下,在我们实际生活中,举个例子,最近这几年比较火爆的抖音短视频app,大家日常在刷视频的时候,基本都会选用“推荐”模式,由平台来推送内容。如果细心的网友会发现,当我们第一次打开抖音app的时候,一般都会让你注册,平台做好唯一标识。由于你是平台的新用户,平台也不知道你喜欢什么类型的内容,所以平台会提前弄好一些推荐内容。我们一般把这个环节叫做“冷启动”,平台一般都会推荐平台的热门视频内容,先不分类型,不管是搞笑、汽车、美女、帅哥等内容都先推送给你,然后根据你看到视频后的操作进行用户行为获取,如:如果你看到你感兴趣的内容,你一般都会停留比较久,甚至看完全部,或者看多几遍,点个“小红心”。平台的后台就收集你的用户行为,开始根据你的行为特征提取特征,并根据这些特征给予你最新的视频内容,然后后台不断循环这个操作。
当然,只要是产品,就少不了运营。大家也会发现,很多时候我们经常都会刷到同类型的内容,特别是BGM一样的视频内容。我相信大家都会想到一个点,视频创作者,花钱去买了推广服务,进行流量刷新。站在我们用户角度,有时会觉得有点审美疲劳。之前我们团队开玩笑说,推荐系统就应该让用户爱恨交加,这样反而会更能留住用户。
推荐系统,在实际产品中,我们不能简单通过技术来实现。推荐系统属于运营中重要一部分,必须和产品运营相辅相成。之前和一个做电商平台的朋友聊天,他给我举了一个很现实的例子。
在电商平台中,平台推送商品主要是为了用户购买,但很多时候,用户看到这些推送内容,很大部分都不愿意点击去。那从技术层面来说,好像很难判断这次推送到底是成功还是没成功。但假如在运营方案中增加优惠活动,效果就完全不一样了。之前我们也遇到一个无奈的现象,特别是对于技术团队来说。有一次,做商品推荐,运营团队想减少成本,不希望总是通过烧钱模式去推广。希望技术团队通过技术手段深入推荐,就是我们需要深度分析用户想法,找到一些比较慷慨大方的用户,用户看到推送内容直接就会点击去,并且进行购买。
从技术来说,确实可以做到一点。因为在实际生活上,确实存在这样的用户。他们不会很计较一些优惠利益,只要是自己需要,而且价格在他们接受的范围内,基本都会进行购买。大家听起来,好像想到一个词“大数据杀熟”。
“大数据杀熟”,这两年,经常都会被媒体批评。这个到底是对还是错?是否违反?对于我这种技术人来说,我就不去评价了。
闲话聊了一大堆,我们开始看看代码吧。
电影数据集
书里的课程内容,用的是电影数据集(MovieLens 100k),首先我们先把数据集读取进来。
我们现在原来的项目工程里新建一个包,用于存放本次项目的代码。
先建一个名为“RecommenderApp”类,读取数据集,并打印第一行数据看看。
package com.ispeasant.recommender
import org.apache.spark.{SparkConf, SparkContext}
object RecommenderApp {
def main(args: Array[String]): Unit = {
// 创建一个SparkConf对象
val conf = new SparkConf()
// 配置相应的参数,如应用名称、运行模式
conf.setAppName("RecommenderApp")
.setMaster("local[2]")
// 创建SparkContext
val sc = new SparkContext(conf)
val mlData = sc.textFile("./data/ml-100k/u.data")
val str = mlData.first()
println(str)
}
}
结果:
196 242 3 881250949
或者我们也可以直接打开“u.data”文件,看看里面的内容。
196 242 3 881250949
186 302 3 891717742
22 377 1 878887116
244 51 2 880606923
166 346 1 886397596
猜测一下,每行数据的定义规则。
应该是用户ID,影片ID,星级和时间戳。
为了更好的数据处理,我们需要把每个字段数据提取出来。从数据样本来看,字段之间应该是用制表符进行分割,那我们尝试一下用制表符切分一下。
val strings = str.split("\t")
for (i <- 0 to strings.length){
println(strings(i))
}
结果:
196
242
3
881250949
提取特征
数据的样例我们也看了,现在开始准备下一步。开始提取特征,采用显示评级数据。我们先看看原始数据的格式:
用户ID、影片ID、星级、时间戳,如第一行数据如下:
196 242 3 881250949