0
点赞
收藏
分享

微信扫一扫

Spark从入门到入土(五):SparkSQL原理与实战

boom莎卡拉卡 2021-09-22 阅读 71

SparkSQL是spark家族中一个结构化或半结构化数据的处理模块。对SQL的处理跟关系型数据库SQL类似,将SQL解析成一棵树,通过规则的模式匹配,对树进行绑定、优化,得到查询结果。
SparkSQL提供了一种特殊的RDD-DataFrame,相当于关系型数据的一个表,在Java API中,由行(row)组成的数据集(DataSet)表示为一个DataFrame。
用户程序在执行过程中,下图表示了从SQL语句到DataFrame的整个执行过程。


SparkSession

在spark1.x时代,SparkSQL的入口都是通过SQLContext 或者HiveContext完成,从1.6以后,引入了SparkSession概念,替代了SQLContext,实现对数据的加载、转换、处理等工作。

可以通过SparkSession.builder来创建一个SparkSession,也可以通过stop停止

SparkSQL与MongoDB的集成

Mongo对Spark的支持可参见Mongo官方文档MongoDB

public static void main(String[] args) throws AnalysisException {
        logger.info("开始执行告警统计spark任务");

        SparkSession spark = SparkSession.builder()
                .master("local")
                .appName("alarmService")
                .config("spark.mongodb.input.uri", MONGODB_INPUT_URL)
                .config("spark.mongodb.output.uri", MONGODB_OUTPUT_URL)
                .getOrCreate();
        Dataset ds = MongoSpark.load(spark, EmAlarmBean.class);
        ds.registerTempTable("test");
        ds = spark.sql(getSql());
        MongoSpark.save(ds);
    }

    private static String getSql() {
        String sql = "select orgId,from_unixtime(createTimestamp, 'yyyyMMdd') AS statisticDate, " +
                "sum(if(levelDictId='4001',1,0)) level1," +
                "sum(if(levelDictId='4002',1,0)) level2," +
                "sum(if(levelDictId='4003',1,0)) level3" +
                " from test " +
                " where (deviceType=0 or deviceType=2 or deviceType=3) and createTimestamp is not null and orgId is not null " +
                " group by from_unixtime(createTimestamp, 'yyyyMMdd'),orgId";
        return sql;
    }
举报

相关推荐

0 条评论