需求一:使用PyCharm编程实现SparkCore的WordCount单词统计,并保存在HDFS中
from pyspark import SparkConf,SparkContext
import os
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
if __name__ == '__main__':
# 1.创建上下文,指定应用的名字和用谁的资源来跑
conf=SparkConf().setAppName("first_wordcount").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 2.加载words.txt文件形成一个RDD,RDD的每个元素是文本的每一行
rdd1=sc.textFile('file://file:///export/servers/data/words.txt')
# 过滤空行
rdd1 = rdd1.filter(lambda line: len(line.strip()) > 0)
# 3.进一步将文本内容打散成单词
rdd2=rdd1.flatMap(lambda line:line.split(" "))
# 4.为每个单词标记上1,形成一个元组,具有键值对数据结构,方便做bykey的操作
rdd3=rdd2.map(lambda word:(word,1))
# 5.进一步做reduceByKey,得到wordcount结果
rdd4=rdd3.reduceByKey(lambda x,y:x+y)
# 6.结果打印到控制台
arr=rdd4.collect()
print('wordcount结果是:',arr)
# 7.结果输出到本地文件
rdd4.saveAsTextFile("hdfs://node1:8020/output/file1")
需求二:使用PyCharm编程实现SparkSQL的DSL和SQL方式WordCount单词统计
from pyspark.sql import SparkSession,Row
from pyspark.sql.types import *
import pyspark.sql.functions as F
import os
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
if __name__ == '__main__':
# 1.创建上下文对象
spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()
file_df=spark.read.text('file:///export/pyworkspace/pyspark_sz26/pyspark-sparksql-3.1.2/data/words.txt')
file_df.printSchema()
file_df.show(truncate=False)
# 3.注册成临时表
file_df.createOrReplaceTempView('words_t')
# 4.做wordcount
print('SQL风格做wordcount')
spark.sql('''
select t.word,
count(*) as cnt
from
(select explode(split(value,' ')) as word from words_t) t
group by t.word
order by cnt desc ''').show()
print('DSL做wordcount')
file_df.select(F.explode(F.split('value',' ')).alias('word')) \
.groupBy('word') \
.count() \
.orderBy('count',ascending=False) \
.show()
spark.stop()
需求三:使用PySpark读取json数据格式,多种方式查询字段并进行统计分析
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
import os
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
if __name__ == '__main__':
# 1-创建上下文对象
spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()
sc = spark.sparkContext
df = spark.read.json('file:///export/pyworkspace/pyspark_sz26/pyspark-sparksql-3.1.2/data/employee.json')
# (1)查询所有数据;
df.show()
# (2)查询所有数据,并去除重复的数据;
df.distinct().show()
# (3)查询所有数据,打印时去除id字段;
df.drop('id').show()
# (4)筛选出age>30的记录;
df.where('age>30').show()
# (5)将数据按age分组;
df.groupBy('age').count().show()
# (6)将数据按name升序排列;
df.orderBy('name').show()
# (7)取出前3行数据;
print(df.take(3))
# (8)查询所有记录的name列,并为其取别名为username;
df.select(df.name.alias('username')).show()
# (9)查询年龄age的平均值;
from pyspark.sql.functions import avg
df.agg(avg('age')).show()
# (10)查询年龄age的最小值。
from pyspark.sql.functions import min
df.agg(min('age')).show()