0
点赞
收藏
分享

微信扫一扫

PySpark SQL 加载使用 tab 键分隔的文件

PySpark SQL 加载使用 tab 键分隔的文件

数据文件准备

为了方便后面的实验, 先生成数据文件 ​​data.txt​​, Python 代码如下:

data = [
'x1\t1\t2',
'x2\t2\t2',
'x3\t3\t2',
'x4\t4\t2',
'x5\t5\t2',
]

with open('data.txt', 'w') as f:
for i in data:
f.write('{}\n'.format(i))

Spark 以及 PySpark 安装

首先要安装好 Spark 以及 PySpark, 其中安装 PySpark 需要 Python >= 3.6. Spark 的安装比较简单, 从 ​​https://spark.apache.org/downloads.html​​​ 选择期望的版本, 比如 ​​spark-3.0.0-preview2-bin-hadoop2.7.tgz​​​, 下载解压后, 设置好 ​​SPARK_HOME​​​ 以及 ​​PATH​​ 两个环境变量, 比如:

export SPARK_HOME=~/spark-3.0.0-preview2-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin

再使用 ​​source ~/.bashrc​​​, 之后应该能在终端启动 ​​spark-shell​​​. 为了使用 ​​pyspark​​, 进入到:

cd ~/spark-3.0.0-preview2-bin-hadoop2.7/python
python setup.py install

使用上面的命令安装 ​​pyspark​​​ (注意 python 的版本至少要 3.6). 安装完成后, 应该能在终端启动 ​​pyspark​​.

PySpark SQL 加载数据文件

OK, 下面介绍使用 PySpark SQL 加载数据文件, 代码如下.

其中 ​​warehouse_location​​ 指定数据库和表的默认位置. 两句删除代码:

os.system("rm -rf {}".format(warehouse_location))
os.system("rm -rf {}".format('metastore_db'))

是因为第一次运行代码后, 数据就缓存在 ​​metastore_db​​ 中, 第二次运行代码获得的还是第一次运行代码所得到的结果, 因此这两行代码相当于删除缓存的作用.

之后创建 ​​SparkSession​​​ 时, 需要注意加上 ​​enableHiveSupport()​​, 以支持 Hive 语句的使用.

import os
from os.path import abspath
from pyspark.sql import SparkSession, Row

# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')
os.system("rm -rf {}".format(warehouse_location))
os.system("rm -rf {}".format('metastore_db'))

spark = SparkSession.builder \
.appName("sql") \
.config('master', 'local') \
.config('spark.sql.warehouse.dir', warehouse_location) \
.enableHiveSupport() \
.getOrCreate()


data_file = 'data.txt'
table = 'Info'

cmd = "create table if not exists {}".format(table) + \
" (id STRING, age INT, number INT)" + \
r" row format delimited fields terminated by '\t'"
print(cmd)
spark.sql(cmd)
spark.sql("load data local inpath '{}' into table {}".format(data_file, table))
spark.sql("select * from {} limit 2".format(table)).show()
spark.stop()

此外, 将 ​​cmd​​ 中的

r" row format delimited fields terminated by '\t'"

改为:

r" using hive options(fileFormat 'textfile', fieldDelim '\t')"

也能达到同样的目的, 注意 ​​options​​​ 中的 ​​fileFormat​​ 必须填写, 否则报错.

结果如下:


PySpark SQL 加载使用 tab 键分隔的文件_sql


举报

相关推荐

0 条评论