0
点赞
收藏
分享

微信扫一扫

客快物流大数据项目(六十六):车辆主题



文章目录

​​车辆主题​​

​​一、背景介绍​​

​​二、指标明细​​

​​三、表关联关系​​

​​1、事实表​​

​​2、维度表​​

​​3、关联关系​​

​​四、车辆数据拉宽开发​​

​​1、拉宽后的字段​​

​​2、SQL语句​​

​​3、Spark实现​​

​​五、车辆数据指标开发​​

​​1、计算的字段​​

​​2、Spark实现​​

车辆主题

一、背景介绍

车辆主题主要是统计各个网点、区域、公司的发车情况,反映了区域或者公司的吞吐量及运营状况。

二、指标明细


指标列表



维度



发车次数



各网点发车次数



各区域发车次数



各公司发车次数



最大发车次数



各网点最大发车次数



各区域最大发车次数



各分公司最大发车次数



最小发车次数



各网点最小发车次数



各区域最小发车次数



各分公司最小发车次数



平均发车次数



各网点平均发车次数



各区域平均发车次数



各分公司平均发车次数


三、表关联关系

1、​​​​​​​​​​​​​​事实表


表名



描述



tbl_transport_tool



车辆事实表



tbl_warehouse_transport_tool



仓库车辆关联表


2、维度表


表名



描述



tbl_dot



网点表



tbl_company



公司表



tbl_warehouse



仓库表



tbl_company_warehouse_map



公司仓库关联表



tbl_transport_tool



车辆表


3、​​​​​​​​​​​​​​关联关系

车辆表与维度表的关联关系如下:

客快物流大数据项目(六十六):车辆主题_big data

四、​​​​​​​车辆数据拉宽开发

1、拉宽后的字段

1.1、拉宽网点车辆表




字段名



别名



字段描述



tbl_transport_tool



id



id



运输工具ID



tbl_transport_tool



brand



brand



运输工具品牌



tbl_transport_tool



model



model



运输工具型号



tbl_transport_tool



type



type



运输工具类型



tbl_codes



codeDesc/ttTypeName



type_name



车辆类型描述



tbl_transport_tool



givenLoad



given_load



额定载重



tbl_transport_tool



loadCnUnit



load_cn_unit



中文载重单位



tbl_transport_tool



loadEnUnit



load_en_unit



英文载重单位



tbl_transport_tool



buyDt



buy_dt



购买时间



tbl_transport_tool



licensePlate



license_plate



牌照



tbl_transport_tool



state



state



运输工具状态



tbl_codes



codeDesc/ttStateName



state_name



运输工具状态描述



tbl_transport_tool



cdt



cdt



创建时间



tbl_transport_tool



udt



udt



修改时间



tbl_transport_tool



remark



remark



备注



tbl_dot



id



dot_id



网点id



tbl_dot



dotNumber



dot_number



网点编号



tbl_dot



dotName



dot_name



网点名称



tbl_dot



dotAddr



dot_addr



网点地址



tbl_dot



dotGisAddr



dot_gis_addr



网点GIS地址



tbl_dot



dotTel



dot_tel



网点电话



tbl_dot



manageAreaId



manage_area_id



网点管理辖区ID



tbl_dot



manageAreaGis



manage_area_gis



网点管理辖区地理围栏



tbl_company



id



company_id



公司ID



tbl_company



companyName



company_name



公司名称



tbl_company



cityId



city_id



城市ID



tbl_company



companyNumber



company_number



公司编号



tbl_company



companyAddr



company_addr



公司地址



tbl_company



companyAddrGis



company_addr_gis



公司gis地址



tbl_company



companyTel



company_tel



公司电话



tbl_company



isSubCompany



is_sub_company



母公司ID



tbl_transport_tool



yyyyMMdd(cdt)



day



创建时间

年月日格式


1.2、拉宽仓库车辆表




字段名



别名



字段描述



tbl_transport_tool



id



id



运输工具ID



tbl_transport_tool



brand



brand



运输工具品牌



tbl_transport_tool



model



model



运输工具型号



tbl_transport_tool



type



type



运输工具类型



tbl_codes



codeDesc/ttTypeName



type_name



车辆类型描述



tbl_transport_tool



givenLoad



given_load



额定载重



tbl_transport_tool



loadCnUnit



load_cn_unit




中文载重单位



tbl_transport_tool



loadEnUnit



load_en_unit



英文载重单位



tbl_transport_tool



buyDt



buy_dt



购买时间



tbl_transport_tool



licensePlate



license_plate



牌照



tbl_transport_tool



state



state



运输工具状态



tbl_transport_tool



cdt



cdt



创建时间



tbl_transport_tool



udt



udt



修改时间



tbl_transport_tool



remark



remark



备注



tbl_warehouse



id



ws_id



仓库ID



tbl_warehouse



name



name



仓库名称



tbl_warehouse



addr



addr



仓库地址



tbl_warehouse



addrGis



addr_gis



仓库gis地址



tbl_warehouse



employeeId



employee_id



仓库负责人



tbl_warehouse



type



ws_type



仓库类型



tbl_warehouse



area



area



占地面积



tbl_warehouse



isLease



is_lease



是否租赁



tbl_company



id



company_id



公司ID



tbl_company



companyName



company_name



公司名称



tbl_company



cityId



city_id



城市ID



tbl_company



companyNumber



company_number



公司编号



tbl_company



companyAddr



company_addr



公司地址



tbl_company



companyAddrGis



company_addr_gis



公司gis地址



tbl_company



companyTel



company_tel



公司电话



tbl_company



isSubCompany



is_sub_company



母公司ID



tbl_transport_tool



yyyyMMdd(cdt)



day



创建时间

年月日格式


2、​​​​​​​​​​​​​​SQL语句

2.1、拉宽网点车辆表

SELECT 
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
TTL ."given_load" ,
TTL ."load_cn_unit" ,
TTL ."load_en_unit" ,
TTL ."buy_dt" ,
TTL ."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
dot."id" AS dot_id,
dot."dot_name" ,
dot."dot_number" ,
dot."dot_addr" ,
dot."dot_gis_addr" ,
dot."dot_tel" ,
dot."manage_area_id" ,
dot."manage_area_gis" ,
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_transport_tool" ttl
LEFT JOIN "tbl_dot_transport_tool" tdtl ON ttl."id" = tdtl."transport_tool_id"
LEFT JOIN "tbl_dot" dot ON DOT ."id" = TDTL ."dot_id"
LEFT JOIN "tbl_company_dot_map" companydot ON companydot."dot_id" = TDTL ."dot_id"
LEFT JOIN "tbl_company" company ON company."id" = companydot."company_id"

2.2、拉宽仓库车辆表

SELECT 
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
ttl."given_load" ,
ttl."load_cn_unit" ,
ttl."load_en_unit" ,
ttl."buy_dt" ,
ttl."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
warehouse."id" ,
warehouse."name",
warehouse."addr",
warehouse."addr_gis",
warehouse."employee_id",
warehouse."type",
warehouse."area",
warehouse."is_lease",
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_warehouse_transport_tool" twt
LEFT JOIN "tbl_transport_tool" ttl ON twt."transport_tool_id" = ttl."id"
LEFT JOIN "tbl_warehouse" warehouse ON WAREHOUSE ."id" = twt."warehouse_id"
LEFT JOIN "tbl_company_warehouse_map" warehouse_map ON warehouse_map."warehouse_id" = warehouse."id"
LEFT JOIN "tbl_company" company ON company."id" = warehouse_map."company_id"

3、Spark实现

实现步骤:

  • dwd目录下创建TransportToolDWD 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 获取运输工具表(tbl_transport_tool)数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 获取网点运输工具关联表(tbl_dot_transport_tool数据,并缓存数据
  • 获取网点表(tbl_dot)数据,并缓存数据
  • 获取公司网点关联表(tbl_company_dot_map)数据,并缓存数据
  • 获取仓库运输工具关联表(tbl_warehouse_transport_tool数据,并缓存数据
  • 获取公司仓库关联表(tbl_company_warehouse_map数据,并缓存数据
  • 获取仓库表(tbl_warehouse数据,并缓存数据
  • 获取公司表(tbl_company数据,并缓存数据
  • 根据以下方式拉宽仓库车辆明细数据
  • 根据交通工具id,在交通工具表中获取交通工具数据
  • 根据网点id,在网点表中获取网点数据
  • 根据公司id,在公司表中获取公司数据
  • 根据仓库id,在仓库表中获取仓库数据
  • 创建网点车辆明细宽表(若存在则不创建)
  • 创建仓库车辆明细宽表(若存在则不创建)
  • 将仓库车辆明细宽表数据写入到kudu数据表中
  • 删除缓存数据

3.1、初始化环境变量

初始化运单明细拉宽作业的环境变量

package cn.it.logistics.offline.dwd

import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
* 车辆主题开发
* 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
* 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
* 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
*/
object TransportToolDWD extends OfflineApp{
//定义应用的名称
val appName = this.getClass.getSimpleName

/**
* 入口函数
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)初始化sparkConf对象
* 2)创建sparkSession对象
* 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
* 4)定义维度表与事实表的关联
* 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
* 5.1:创建车辆明细宽表的schema表结构
* 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
* 5.3:将数据写入到kudu中
* 6)将缓存的数据删除掉
* 7)停止任务
*/

//1)初始化sparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)

//2)创建sparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

//数据处理
execute(sparkSession)
}
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
sparkSession.stop()
}
}

 3.2、加载运输工具表及车辆相关的表并缓存

  • 加载运输工具表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//加载车辆表数据(事实表)
val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

//加载仓库车辆表数据
val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)

//加载网点表的数据
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)

//加载公司网点关联表的数据
val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)

//加载公司表的数据
val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)

//加载仓库车辆关联表数据(事实表)
val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

//加载仓库公司关联表
val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)

//加载仓库表数据
val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)

//加载物流码表数据
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)

import sparkSession.implicits._
//获取运输工具类型
val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))

//获取运输工具状态
val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))

3.3、定义网点车辆宽表的关联关系

  • 为了在DWS层任务中方便的获取每日增量网点车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:

//TODO 4)定义维度表与事实表的关联
val left_outer: String = "left_outer"
// 4.1:拉宽网点车辆表
val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
.join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
.join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
.join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
.join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
.join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
.withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
.sort(ttDF.col("cdt").asc)
.select(
ttDF("id"), //车辆表id
ttDF("brand"), //车辆表brand
ttDF("model"), //车辆表model
ttDF("type").cast(IntegerType), //车辆表type
transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
ttDF("licensePlate").as("license_plate"), //车辆表license_plate
ttDF("state").cast(IntegerType), //车辆表state
transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
ttDF("cdt"), //车辆表cdt
ttDF("udt"), //车辆表udt
ttDF("remark"), //车辆表remark
dotDF("id").as("dot_id"), //网点表dot_id
dotDF("dotNumber").as("dot_number"), //网点表dot_number
dotDF("dotName").as("dot_name"), //网点表dot_name
dotDF("dotAddr").as("dot_addr"), //网点表dot_addr
dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr
dotDF("dotTel").as("dot_tel"), //网点表dot_tel
dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id
dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis
companyDF("id").alias("company_id"), //公司表id
companyDF("companyName").as("company_name"), //公司表company_name
companyDF("cityId").as("city_id"), //公司表city_id
companyDF("companyNumber").as("company_number"), //公司表company_number
companyDF("companyAddr").as("company_addr"), //公司表company_addr
companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
companyDF("companyTel").as("company_tel"), //公司表company_tel
companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
$"day"
)

3.4、创建网点车辆明细宽表并将网点车辆明细数据写入到kudu数据表中

网点车辆明细宽表数据需要保存到kudu中,因此在第一次执行网点车辆明细拉宽操作时,网点车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建

实现步骤:

  • TransportToolDWD 单例对象中调用save方法

实现过程:

  • TransportToolDWD 单例对象Main方法中调用save方法
//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail)

3.5、定义仓库车辆宽表的关联关系

  • 为了在DWS层任务中方便的获取每日增量仓库车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:

// 4.2:拉宽仓库车辆表
// 拉宽仓库车辆表
val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表
.join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
.join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
.join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库
.join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表
.join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer)
.withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
.sort(ttDF.col("cdt").asc)
.select(
ttDF("id"), //车辆表id
ttDF("brand"), //车辆表brand
ttDF("model"), //车辆表model
ttDF("type").cast(IntegerType), //车辆表type
transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load
ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
ttDF("licensePlate").as("license_plate"), //车辆表license_plate
ttDF("state").cast(IntegerType), //车辆表state
transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("cdt"), //车辆表cdt
ttDF("udt"), //车辆表udt
ttDF("remark"), //车辆表remark
wsDF("id").as("ws_id"), //仓库表id
wsDF("name"), //仓库表name
wsDF("addr"), //仓库表addr
wsDF("addrGis").as("addr_gis"), //仓库表addr_gis
wsDF("employeeId").as("employee_id"), //仓库表employee_id
wsDF("type").as("ws_type").cast(IntegerType), //仓库表type
wsDF("area"), //仓库表area
wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease
companyDF("id").alias("company_id"), //公司表id
companyDF("companyName").as("company_name"), //公司表company_name
companyDF("cityId").as("city_id"), //公司表city_id
companyDF("companyNumber").as("company_number"), //公司表company_number
companyDF("companyAddr").as("company_addr"), //公司表company_addr
companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
companyDF("companyTel").as("company_tel"), //公司表company_tel
companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
$"day"
)

3.6、创建仓库车辆明细宽表并将仓库车辆明细数据写入到kudu数据表中

仓库车辆明细宽表数据需要保存到kudu中,因此在第一次执行仓库车辆明细拉宽操作时,仓库车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建

实现步骤:

  • TransportToolDWD 单例对象中调用save方法

实现过程:

  • TransportToolDWD 单例对象Main方法中调用save方法
save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail)

3.7、删除缓存数据

为了释放资源,车辆明细宽表数据计算完成以后,需要将缓存的源表数据删除。

//TODO 6)将缓存的数据删除掉
ttDF.unpersist()
ttDotDF.unpersist()
dotDF.unpersist()
companyDotDF.unpersist()
companyDF.unpersist()
ttWsDF.unpersist()
companyWareHouseMapDF.unpersist()
wsDF.unpersist()
codesDF.unpersist()

3.8、完整代码

package cn.it.logistics.offline.dwd

import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.date_format
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel

/**
* 车辆主题开发
* 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
* 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
* 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
*/
object TransportToolDWD extends OfflineApp{
//定义应用的名称
val appName = this.getClass.getSimpleName

/**
* 入口函数
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)初始化sparkConf对象
* 2)创建sparkSession对象
* 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
* 4)定义维度表与事实表的关联
* 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
* 5.1:创建车辆明细宽表的schema表结构
* 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
* 5.3:将数据写入到kudu中
* 6)将缓存的数据删除掉
* 7)停止任务
*/

//1)初始化sparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)

//2)创建sparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

//数据处理
execute(sparkSession)
}
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//加载车辆表数据(事实表)
val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

//加载仓库车辆表数据
val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)

//加载网点表的数据
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)

//加载公司网点关联表的数据
val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)

//加载公司表的数据
val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)

//加载仓库车辆关联表数据(事实表)
val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

//加载仓库公司关联表
val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)

//加载仓库表数据
val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)

//加载物流码表数据
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)

import sparkSession.implicits._
//获取运输工具类型
val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))

//获取运输工具状态
val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))

//TODO 4)定义维度表与事实表的关联
val left_outer: String = "left_outer"
// 4.1:拉宽网点车辆表
val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
.join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
.join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
.join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
.join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
.join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
.withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
.sort(ttDF.col("cdt").asc)
.select(
ttDF("id"), //车辆表id
ttDF("brand"), //车辆表brand
ttDF("model"), //车辆表model
ttDF("type").cast(IntegerType), //车辆表type
transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
ttDF("licensePlate").as("license_plate"), //车辆表license_plate
ttDF("state").cast(IntegerType), //车辆表state
transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
ttDF("cdt"), //车辆表cdt
ttDF("udt"), //车辆表udt
ttDF("remark"), //车辆表remark
dotDF("id").as("dot_id"), //网点表dot_id
dotDF("dotNumber").as("dot_number"), //网点表dot_number
dotDF("dotName").as("dot_name"), //网点表dot_name
dotDF("dotAddr").as("dot_addr"), //网点表dot_addr
dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr
dotDF("dotTel").as("dot_tel"), //网点表dot_tel
dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id
dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis
companyDF("id").alias("company_id"), //公司表id
companyDF("companyName").as("company_name"), //公司表company_name
companyDF("cityId").as("city_id"), //公司表city_id
companyDF("companyNumber").as("company_number"), //公司表company_number
companyDF("companyAddr").as("company_addr"), //公司表company_addr
companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
companyDF("companyTel").as("company_tel"), //公司表company_tel
companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
$"day"
)

// 4.2:拉宽仓库车辆表
// 拉宽仓库车辆表
val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表
.join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
.join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
.join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库
.join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表
.join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer)
.withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
.sort(ttDF.col("cdt").asc)
.select(
ttDF("id"), //车辆表id
ttDF("brand"), //车辆表brand
ttDF("model"), //车辆表model
ttDF("type").cast(IntegerType), //车辆表type
transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load
ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
ttDF("licensePlate").as("license_plate"), //车辆表license_plate
ttDF("state").cast(IntegerType), //车辆表state
transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("cdt"), //车辆表cdt
ttDF("udt"), //车辆表udt
ttDF("remark"), //车辆表remark
wsDF("id").as("ws_id"), //仓库表id
wsDF("name"), //仓库表name
wsDF("addr"), //仓库表addr
wsDF("addrGis").as("addr_gis"), //仓库表addr_gis
wsDF("employeeId").as("employee_id"), //仓库表employee_id
wsDF("type").as("ws_type").cast(IntegerType), //仓库表type
wsDF("area"), //仓库表area
wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease
companyDF("id").alias("company_id"), //公司表id
companyDF("companyName").as("company_name"), //公司表company_name
companyDF("cityId").as("city_id"), //公司表city_id
companyDF("companyNumber").as("company_number"), //公司表company_number
companyDF("companyAddr").as("company_addr"), //公司表company_addr
companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
companyDF("companyTel").as("company_tel"), //公司表company_tel
companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
$"day"
)

//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail)
save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail)

//TODO 6)将缓存的数据删除掉
ttDF.unpersist()
ttDotDF.unpersist()
dotDF.unpersist()
companyDotDF.unpersist()
companyDF.unpersist()
ttWsDF.unpersist()
companyWareHouseMapDF.unpersist()
wsDF.unpersist()
codesDF.unpersist()

sparkSession.stop()
}
}

五、车辆数据指标开发

1、​​​​​​​​​​​​​​计算的字段


字段名



字段描述



id



数据产生时间



ttDotTotalCount



网点总发车次数



maxTtDotTotalCount



各网点最大发车次数



minTtDotTotalCount



各网点最小发车次数



avgTtDotTotalCount



各网点平均发车次数



areaDotTotalCount



区域总发车次数



maxAreaDotTotalCount



各区域最大发车次数



minAreaDotTotalCount



各区域最小发车次数



avgAreaDotTotalCount



各区域平均发车次数



companyDotTotalCount



公司总发车次数



maxCompanyDotTotalCount



各公司最大发车次数



minCompanyDotTotalCount



各公司最小发车次数



avgCompanyDotTotalCount



各公司平均发车次数


2、​​​​​​​Spark实现

实现步骤:

  • 在dws目录下创建TransportToolDWS 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 根据指定的日期获取拉宽后的车辆主题宽表(tbl_dot_transport_tool_detail、tbl_dot_transport_tool_detail)增量数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 指标计算
  • 各网点发车次数
  • 各网点最大发车次数
  • 各网点最小发车次数
  • 各网点平均发车次数
  • 各区域发车次数
  • 各区域最大发车次数
  • 各区域最小发车次数
  • 各区域平均发车次数
  • 各公司发车次数
  • 各公司最大发车次数
  • 各公司最小发车次数
  • 各公司平均发车次数
  • 获取当前时间yyyyMMddHH
  • 构建要持久化的指标数据(需要判断计算的指标是否有值,若没有需要赋值默认值
  • 通过StructType构建指定Schema
  • 创建车辆主题指标数据表(若存在则不创建)
  • 持久化指标数据到kudu表

2.1、初始化环境变量

package cn.it.logistics.offline.dws

import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
* 车辆主题指标开发
*/
object TransportToolDWS extends OfflineApp{
//定义应用程序的名称
val appName = this.getClass.getSimpleName

/**
* 入口函数
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)创建SparkConf对象
* 2)创建SparkSession对象
* 3)读取车辆明细宽表的数据
* 4)对车辆明细宽表的数据进行指标的计算
* 5)将计算好的指标数据写入到kudu数据库中
* 5.1:定义指标结果表的schema信息
* 5.2:组织需要写入到kudu表的数据
* 5.3:判断指标结果表是否存在,如果不存在则创建
* 5.4:将数据写入到kudu表中
* 6)删除缓存数据
* 7)停止任务,退出sparksession
*/

//TODO 1)创建SparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)

//TODO 2)创建SparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

//处理数据
execute(sparkSession)
}

/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
sparkSession.stop()
}
}

2.2、加载车辆宽表增量数据并缓存

加载车辆宽表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。

//TODO 3)读取车辆明细宽表的数据
val ttDotDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.dotTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
val ttWarehouseDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.warehouseTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

2.3、指标计算

//根据网点车辆的日期进行分组
val ttDotDetailGroupByDayDF: DataFrame = ttDotDetailDF.select("day").groupBy("day").count().cache()

//导入隐式转换
import sparkSession.implicits._

//定义计算好的指标结果集合对象
val dotRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历网点车辆每个日期的车辆明细宽表数据
ttDotDetailGroupByDayDF.collect().foreach(row=> {
//获取到要处理的数据所在的日期
val day: String = row.getAs[String](0)

//返回指定日期的仓库明细数据
val ttDotDetailByDayDF: DataFrame = ttDotDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)

//各网点的发车次数(西三旗:10,西二旗:20)
val ttDotTotalCountDF: DataFrame = ttDotDetailByDayDF.groupBy($"dot_id").agg(count("dot_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
//计算各网点的总发车次数
val ttDotTotalCount: Row = ttDotTotalCountDF.agg(sum("cnt")).first()
//各网点的最大发车次数、最小发车次数、平均发车次数
val maxAndMinAndAvgTTDotTotalCount: Row = ttDotTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
val maxTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(0)
val minTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(1)
val avgTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(2)

// 各区域发车次数
val areaDotTotalCountDF = ttDotDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val areaDotTotalCount = areaDotTotalCountDF.agg(sum($"cnt")).first()
// 各区域最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgAreaDotTotalCount = areaDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(0)
val minAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(1)
val avgAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(2)

// 各公司发车次数
val companyDotTotalCountDF = ttDotDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val companyDotTotalCount = companyDotTotalCountDF.agg(sum($"cnt")).first()
// 各公司最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgCompanyDotTotalCount = companyDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(0)
val minCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(1)
val avgCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(2)

//将计算好的指标数据封装到row对象中
val rowInfo: Row = Row(
day,
if(ttDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxTTDotTotalCount==null) 0L else maxTTDotTotalCount.asInstanceOf[Number].longValue(),
if(minTTDotTotalCount==null) 0L else minTTDotTotalCount.asInstanceOf[Number].longValue(),
if(avgTTDotTotalCount==null) 0L else avgTTDotTotalCount.asInstanceOf[Number].longValue(),
if(areaDotTotalCount.isNullAt(0)) 0L else areaDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxAreaDotTotalCount==null) 0L else maxAreaDotTotalCount.asInstanceOf[Number].longValue(),
if(minAreaDotTotalCount==null) 0L else minAreaDotTotalCount.asInstanceOf[Number].longValue(),
if(avgAreaDotTotalCount==null) 0L else avgAreaDotTotalCount.asInstanceOf[Number].longValue(),
if(companyDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxCompanyDotTotalCount==null) 0L else maxCompanyDotTotalCount.asInstanceOf[Number].longValue(),
if(minCompanyDotTotalCount==null) 0L else minCompanyDotTotalCount.asInstanceOf[Number].longValue(),
if(avgCompanyDotTotalCount==null) 0L else avgCompanyDotTotalCount.asInstanceOf[Number].longValue()
)

dotRows.append(rowInfo)
println(rowInfo)

ttDotDetailByDayDF.unpersist()
ttDotTotalCountDF.unpersist()
areaDotTotalCountDF.unpersist()
companyDotTotalCountDF.unpersist()
})

//根据仓库车辆的日期进行分组
val ttWsDetailGroupByDayDF: DataFrame = ttWarehouseDetailDF.select("day").groupBy("day").count().cache()
//定义计算好的指标结果集合对象
val wsRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历仓库车辆每个日期的车辆明细宽表数据
ttWsDetailGroupByDayDF.collect().foreach(row=> {
//获取到要处理的数据所在的日期
val day: String = row.getAs[String](0)

//返回指定日期的仓库明细数据
val ttWsDetailByDayDF: DataFrame = ttWarehouseDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)

val whTransportToolTotalCountDF: DataFrame = ttWsDetailByDayDF.groupBy($"ws_id").agg(count("ws_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
//计算各仓库的总发车次数
val whTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(sum("cnt")).first()
//各仓库的最大发车次数、最小发车次数、平均发车次数
val maxAndMinAndAvgWhTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
val maxTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(0)
val minTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(1)
val avgTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(2)

// 各区域发车次数
val areaTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val areaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(sum($"cnt")).first()
// 各区域最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgAreaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxAreaTransportToolTotalCount: Any = maxAndMinAndAvgAreaTransportToolTotalCount(0)
val minAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(1)
val avgAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(2)

// 各公司发车次数
val companyTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val companyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(sum($"cnt")).first()
// 各公司最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgCompanyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(0)
val minCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(1)
val avgCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(2)

//将计算好的指标数据封装到row对象中
val rowInfo: Row = Row(
day,
if(whTransportToolTotalCount.isNullAt(0)) 0L else whTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxTransportToolCount==null) 0L else maxTransportToolCount.asInstanceOf[Number].longValue(),
if(minTransportToolCount==null) 0L else minTransportToolCount.asInstanceOf[Number].longValue(),
if(avgTransportToolCount==null) 0L else avgTransportToolCount.asInstanceOf[Number].longValue(),
if(areaTransportToolTotalCount.isNullAt(0)) 0L else areaTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxAreaTransportToolTotalCount==null) 0L else maxAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(minAreaTransportToolTotalCount==null) 0L else minAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(avgAreaTransportToolTotalCount==null) 0L else avgAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(companyTransportToolTotalCount.isNullAt(0)) 0L else companyTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxCompanyTransportToolTotalCount==null) 0L else maxCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(minCompanyTransportToolTotalCount==null) 0L else minCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(avgCompanyTransportToolTotalCount==null) 0L else avgCompanyTransportToolTotalCount.asInstanceOf[Number].longValue()
)

wsRows.append(rowInfo)
println(rowInfo)

ttWsDetailByDayDF.unpersist()
areaTransportToolTotalCountDF.unpersist()
companyTransportToolTotalCountDF.unpersist()
whTransportToolTotalCountDF.unpersist()
})

2.4、通过StructType构建指定Schema

//定义指标结果表的shema信息
//网点车辆相关的表结构数据
val schemaDot: StructType = StructType(Array(
StructField("id", StringType, false, Metadata.empty),
StructField("ttDotTotalCount", LongType, true, Metadata.empty), //各网点发车次数
StructField("maxTtDotTotalCount", LongType, true, Metadata.empty), //各网点最大发车次数
StructField("minTtDotTotalCount", LongType, true, Metadata.empty), //各网点最小发车次数
StructField("avgTtDotTotalCount", LongType, true, Metadata.empty), //各网点平均发车次数
StructField("areaDotTotalCount", LongType, true, Metadata.empty), //各区域发车次数
StructField("maxAreaDotTotalCount", LongType, true, Metadata.empty), //各区域最大发车次数
StructField("minAreaDotTotalCount", LongType, true, Metadata.empty), //各区域最小发车次数
StructField("avgAreaDotTotalCount", LongType, true, Metadata.empty), //各区域平均发车次数
StructField("companyDotTotalCount", LongType, true, Metadata.empty), //各公司发车次数
StructField("maxCompanyDotTotalCount", LongType, true, Metadata.empty), //各公司最大发车次数
StructField("minCompanyDotTotalCount", LongType, true, Metadata.empty), //各公司最小发车次数
StructField("avgCompanyDotTotalCount", LongType, true, Metadata.empty) //各公司平均发车次数
))

//仓库车辆相关的表结构数据
val schemaWs: StructType = StructType(Array(
StructField("id", StringType, false, Metadata.empty),
StructField("whTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库发车次数
StructField("maxWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库最大发车次数
StructField("minWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库最小发车次数
StructField("avgWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库平均发车次数
StructField("areaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域发车次数
StructField("maxAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域最大发车次数
StructField("minAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域最小发车次数
StructField("avgAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域平均发车次数
StructField("companyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司发车次数
StructField("maxCompanyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司最大发车次数
StructField("minCompanyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司最小发车次数
StructField("avgCompanyTransportToolTotalCount", LongType, true, Metadata.empty) //各公司平均发车次数
))

2.5、持久化指标数据到kudu表

//TODO 5)将计算好的指标数据写入到kudu数据库中
val dotRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(dotRows)
val wsRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(wsRows)

val dotDataFrame: DataFrame = sparkSession.createDataFrame(dotRDD, schemaDot)
val wsDataFrame: DataFrame = sparkSession.createDataFrame(wsRDD, schemaWs)

save(dotDataFrame, OfflineTableDefine.ttDotSummary)
save(wsDataFrame, OfflineTableDefine.ttWsSummary)

2.6、完整代码

package cn.it.logistics.offline.dws

import cn.it.logistics.common.{Configuration, DateHelper, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, LongType, Metadata, StringType, StructField, StructType}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable.ArrayBuffer

/**
* 车辆主题指标开发
*/
object TransportToolDWS extends OfflineApp{
//定义应用程序的名称
val appName = this.getClass.getSimpleName

/**
* 入口函数
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)创建SparkConf对象
* 2)创建SparkSession对象
* 3)读取车辆明细宽表的数据
* 4)对车辆明细宽表的数据进行指标的计算
* 5)将计算好的指标数据写入到kudu数据库中
* 5.1:定义指标结果表的schema信息
* 5.2:组织需要写入到kudu表的数据
* 5.3:判断指标结果表是否存在,如果不存在则创建
* 5.4:将数据写入到kudu表中
* 6)删除缓存数据
* 7)停止任务,退出sparksession
*/

//TODO 1)创建SparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)

//TODO 2)创建SparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

//处理数据
execute(sparkSession)
}
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
//TODO 3)读取车辆明细宽表的数据
val ttDotDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.dotTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
val ttWarehouseDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.warehouseTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

//根据网点车辆的日期进行分组
val ttDotDetailGroupByDayDF: DataFrame = ttDotDetailDF.select("day").groupBy("day").count().cache()

//导入隐式转换
import sparkSession.implicits._

//定义计算好的指标结果集合对象
val dotRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历网点车辆每个日期的车辆明细宽表数据
ttDotDetailGroupByDayDF.collect().foreach(row=> {
//获取到要处理的数据所在的日期
val day: String = row.getAs[String](0)

//返回指定日期的仓库明细数据
val ttDotDetailByDayDF: DataFrame = ttDotDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)

//各网点的发车次数(西三旗:10,西二旗:20)
val ttDotTotalCountDF: DataFrame = ttDotDetailByDayDF.groupBy($"dot_id").agg(count("dot_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
//计算各网点的总发车次数
val ttDotTotalCount: Row = ttDotTotalCountDF.agg(sum("cnt")).first()
//各网点的最大发车次数、最小发车次数、平均发车次数
val maxAndMinAndAvgTTDotTotalCount: Row = ttDotTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
val maxTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(0)
val minTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(1)
val avgTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(2)

// 各区域发车次数
val areaDotTotalCountDF = ttDotDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val areaDotTotalCount = areaDotTotalCountDF.agg(sum($"cnt")).first()
// 各区域最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgAreaDotTotalCount = areaDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(0)
val minAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(1)
val avgAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(2)

// 各公司发车次数
val companyDotTotalCountDF = ttDotDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val companyDotTotalCount = companyDotTotalCountDF.agg(sum($"cnt")).first()
// 各公司最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgCompanyDotTotalCount = companyDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(0)
val minCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(1)
val avgCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(2)

//将计算好的指标数据封装到row对象中
val rowInfo: Row = Row(
day,
if(ttDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxTTDotTotalCount==null) 0L else maxTTDotTotalCount.asInstanceOf[Number].longValue(),
if(minTTDotTotalCount==null) 0L else minTTDotTotalCount.asInstanceOf[Number].longValue(),
if(avgTTDotTotalCount==null) 0L else avgTTDotTotalCount.asInstanceOf[Number].longValue(),
if(areaDotTotalCount.isNullAt(0)) 0L else areaDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxAreaDotTotalCount==null) 0L else maxAreaDotTotalCount.asInstanceOf[Number].longValue(),
if(minAreaDotTotalCount==null) 0L else minAreaDotTotalCount.asInstanceOf[Number].longValue(),
if(avgAreaDotTotalCount==null) 0L else avgAreaDotTotalCount.asInstanceOf[Number].longValue(),
if(companyDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxCompanyDotTotalCount==null) 0L else maxCompanyDotTotalCount.asInstanceOf[Number].longValue(),
if(minCompanyDotTotalCount==null) 0L else minCompanyDotTotalCount.asInstanceOf[Number].longValue(),
if(avgCompanyDotTotalCount==null) 0L else avgCompanyDotTotalCount.asInstanceOf[Number].longValue()
)

dotRows.append(rowInfo)
println(rowInfo)

ttDotDetailByDayDF.unpersist()
ttDotTotalCountDF.unpersist()
areaDotTotalCountDF.unpersist()
companyDotTotalCountDF.unpersist()
})

//根据仓库车辆的日期进行分组
val ttWsDetailGroupByDayDF: DataFrame = ttWarehouseDetailDF.select("day").groupBy("day").count().cache()
//定义计算好的指标结果集合对象
val wsRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历仓库车辆每个日期的车辆明细宽表数据
ttWsDetailGroupByDayDF.collect().foreach(row=> {
//获取到要处理的数据所在的日期
val day: String = row.getAs[String](0)

//返回指定日期的仓库明细数据
val ttWsDetailByDayDF: DataFrame = ttWarehouseDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)

val whTransportToolTotalCountDF: DataFrame = ttWsDetailByDayDF.groupBy($"ws_id").agg(count("ws_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
//计算各仓库的总发车次数
val whTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(sum("cnt")).first()
//各仓库的最大发车次数、最小发车次数、平均发车次数
val maxAndMinAndAvgWhTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
val maxTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(0)
val minTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(1)
val avgTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(2)

// 各区域发车次数
val areaTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val areaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(sum($"cnt")).first()
// 各区域最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgAreaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxAreaTransportToolTotalCount: Any = maxAndMinAndAvgAreaTransportToolTotalCount(0)
val minAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(1)
val avgAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(2)

// 各公司发车次数
val companyTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val companyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(sum($"cnt")).first()
// 各公司最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgCompanyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(0)
val minCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(1)
val avgCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(2)

//将计算好的指标数据封装到row对象中
val rowInfo: Row = Row(
day,
if(whTransportToolTotalCount.isNullAt(0)) 0L else whTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxTransportToolCount==null) 0L else maxTransportToolCount.asInstanceOf[Number].longValue(),
if(minTransportToolCount==null) 0L else minTransportToolCount.asInstanceOf[Number].longValue(),
if(avgTransportToolCount==null) 0L else avgTransportToolCount.asInstanceOf[Number].longValue(),
if(areaTransportToolTotalCount.isNullAt(0)) 0L else areaTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxAreaTransportToolTotalCount==null) 0L else maxAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(minAreaTransportToolTotalCount==null) 0L else minAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(avgAreaTransportToolTotalCount==null) 0L else avgAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(companyTransportToolTotalCount.isNullAt(0)) 0L else companyTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxCompanyTransportToolTotalCount==null) 0L else maxCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(minCompanyTransportToolTotalCount==null) 0L else minCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(avgCompanyTransportToolTotalCount==null) 0L else avgCompanyTransportToolTotalCount.asInstanceOf[Number].longValue()
)

wsRows.append(rowInfo)
println(rowInfo)

ttWsDetailByDayDF.unpersist()
areaTransportToolTotalCountDF.unpersist()
companyTransportToolTotalCountDF.unpersist()
whTransportToolTotalCountDF.unpersist()
})

//定义指标结果表的shema信息
//网点车辆相关的表结构数据
val schemaDot: StructType = StructType(Array(
StructField("id", StringType, false, Metadata.empty),
StructField("ttDotTotalCount", LongType, true, Metadata.empty), //各网点发车次数
StructField("maxTtDotTotalCount", LongType, true, Metadata.empty), //各网点最大发车次数
StructField("minTtDotTotalCount", LongType, true, Metadata.empty), //各网点最小发车次数
StructField("avgTtDotTotalCount", LongType, true, Metadata.empty), //各网点平均发车次数
StructField("areaDotTotalCount", LongType, true, Metadata.empty), //各区域发车次数
StructField("maxAreaDotTotalCount", LongType, true, Metadata.empty), //各区域最大发车次数
StructField("minAreaDotTotalCount", LongType, true, Metadata.empty), //各区域最小发车次数
StructField("avgAreaDotTotalCount", LongType, true, Metadata.empty), //各区域平均发车次数
StructField("companyDotTotalCount", LongType, true, Metadata.empty), //各公司发车次数
StructField("maxCompanyDotTotalCount", LongType, true, Metadata.empty), //各公司最大发车次数
StructField("minCompanyDotTotalCount", LongType, true, Metadata.empty), //各公司最小发车次数
StructField("avgCompanyDotTotalCount", LongType, true, Metadata.empty) //各公司平均发车次数
))

//仓库车辆相关的表结构数据
val schemaWs: StructType = StructType(Array(
StructField("id", StringType, false, Metadata.empty),
StructField("whTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库发车次数
StructField("maxWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库最大发车次数
StructField("minWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库最小发车次数
StructField("avgWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库平均发车次数
StructField("areaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域发车次数
StructField("maxAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域最大发车次数
StructField("minAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域最小发车次数
StructField("avgAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域平均发车次数
StructField("companyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司发车次数
StructField("maxCompanyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司最大发车次数
StructField("minCompanyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司最小发车次数
StructField("avgCompanyTransportToolTotalCount", LongType, true, Metadata.empty) //各公司平均发车次数
))

//TODO 5)将计算好的指标数据写入到kudu数据库中
val dotRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(dotRows)
val wsRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(wsRows)

val dotDataFrame: DataFrame = sparkSession.createDataFrame(dotRDD, schemaDot)
val wsDataFrame: DataFrame = sparkSession.createDataFrame(wsRDD, schemaWs)

save(dotDataFrame, OfflineTableDefine.ttDotSummary)
save(wsDataFrame, OfflineTableDefine.ttWsSummary)

// 6)删除缓存数据
ttDotDetailDF.unpersist()
ttWarehouseDetailDF.unpersist()
ttDotDetailGroupByDayDF.unpersist()
ttWsDetailGroupByDayDF.unpersist()

// 7)停止任务,退出sparksession
sparkSession.stop()
}
}
举报

相关推荐

0 条评论