客快物流大数据项目(六十六):车辆主题(代码片段)

Lansonli Lansonli     2022-12-04     723

关键词:

文章目录

车辆主题

一、背景介绍

二、指标明细

三、表关联关系

1、​​​​​​​​​​​​​​事实表

2、​​​​​​​维度表

3、​​​​​​​​​​​​​​关联关系

四、​​​​​​​车辆数据拉宽开发

1、拉宽后的字段

2、​​​​​​​​​​​​​​SQL语句

3、Spark实现

五、车辆数据指标开发

1、​​​​​​​​​​​​​​计算的字段

2、​​​​​​​Spark实现


车辆主题

一、背景介绍

车辆主题主要是统计各个网点、区域、公司的发车情况,反映了区域或者公司的吞吐量及运营状况。

​​​​​​​二、指标明细

指标列表

维度

发车次数

各网点发车次数

各区域发车次数

各公司发车次数

最大发车次数

各网点最大发车次数

各区域最大发车次数

各分公司最大发车次数

最小发车次数

各网点最小发车次数

各区域最小发车次数

各分公司最小发车次数

平均发车次数

各网点平均发车次数

各区域平均发车次数

各分公司平均发车次数

三、​​​​​​​表关联关系

1、​​​​​​​​​​​​​​事实表

表名

描述

tbl_transport_tool

车辆事实表

tbl_warehouse_transport_tool

仓库车辆关联表

2、​​​​​​​维度表

表名

描述

tbl_dot

网点表

tbl_company

公司表

tbl_warehouse

仓库表

tbl_company_warehouse_map

公司仓库关联表

tbl_transport_tool

车辆表

3、​​​​​​​​​​​​​​关联关系

车辆表与维度表的关联关系如下:

四、​​​​​​​车辆数据拉宽开发

1、拉宽后的字段

1.1、拉宽网点车辆表

字段名

别名

字段描述

tbl_transport_tool

id

id

运输工具ID

tbl_transport_tool

brand

brand

运输工具品牌

tbl_transport_tool

model

model

运输工具型号

tbl_transport_tool

type

type

运输工具类型

tbl_codes

codeDesc/ttTypeName

type_name

车辆类型描述

tbl_transport_tool

givenLoad

given_load

额定载重

tbl_transport_tool

loadCnUnit

load_cn_unit

中文载重单位

tbl_transport_tool

loadEnUnit

load_en_unit

英文载重单位

tbl_transport_tool

buyDt

buy_dt

购买时间

tbl_transport_tool

licensePlate

license_plate

牌照

tbl_transport_tool

state

state

运输工具状态

tbl_codes

codeDesc/ttStateName

state_name

运输工具状态描述

tbl_transport_tool

cdt

cdt

创建时间

tbl_transport_tool

udt

udt

修改时间

tbl_transport_tool

remark

remark

备注

tbl_dot

id

dot_id

网点id

tbl_dot

dotNumber

dot_number

网点编号

tbl_dot

dotName

dot_name

网点名称

tbl_dot

dotAddr

dot_addr

网点地址

tbl_dot

dotGisAddr

dot_gis_addr

网点GIS地址

tbl_dot

dotTel

dot_tel

网点电话

tbl_dot

manageAreaId

manage_area_id

网点管理辖区ID

tbl_dot

manageAreaGis

manage_area_gis

网点管理辖区地理围栏

tbl_company

id

company_id

公司ID

tbl_company

companyName

company_name

公司名称

tbl_company

cityId

city_id

城市ID

tbl_company

companyNumber

company_number

公司编号

tbl_company

companyAddr

company_addr

公司地址

tbl_company

companyAddrGis

company_addr_gis

公司gis地址

tbl_company

companyTel

company_tel

公司电话

tbl_company

isSubCompany

is_sub_company

母公司ID

tbl_transport_tool

yyyyMMdd(cdt)

day

创建时间

年月日格式

​​​​​​​1.2、拉宽仓库车辆表

字段名

别名

字段描述

tbl_transport_tool

id

id

运输工具ID

tbl_transport_tool

brand

brand

运输工具品牌

tbl_transport_tool

model

model

运输工具型号

tbl_transport_tool

type

type

运输工具类型

tbl_codes

codeDesc/ttTypeName

type_name

车辆类型描述

tbl_transport_tool

givenLoad

given_load

额定载重

tbl_transport_tool

loadCnUnit

load_cn_unit

中文载重单位

tbl_transport_tool

loadEnUnit

load_en_unit

英文载重单位

tbl_transport_tool

buyDt

buy_dt

购买时间

tbl_transport_tool

licensePlate

license_plate

牌照

tbl_transport_tool

state

state

运输工具状态

tbl_transport_tool

cdt

cdt

创建时间

tbl_transport_tool

udt

udt

修改时间

tbl_transport_tool

remark

remark

备注

tbl_warehouse

id

ws_id

仓库ID

tbl_warehouse

name

name

仓库名称

tbl_warehouse

addr

addr

仓库地址

tbl_warehouse

addrGis

addr_gis

仓库gis地址

tbl_warehouse

employeeId

employee_id

仓库负责人

tbl_warehouse

type

ws_type

仓库类型

tbl_warehouse

area

area

占地面积

tbl_warehouse

isLease

is_lease

是否租赁

tbl_company

id

company_id

公司ID

tbl_company

companyName

company_name

公司名称

tbl_company

cityId

city_id

城市ID

tbl_company

companyNumber

company_number

公司编号

tbl_company

companyAddr

company_addr

公司地址

tbl_company

companyAddrGis

company_addr_gis

公司gis地址

tbl_company

companyTel

company_tel

公司电话

tbl_company

isSubCompany

is_sub_company

母公司ID

tbl_transport_tool

yyyyMMdd(cdt)

day

创建时间

年月日格式

2、​​​​​​​​​​​​​​SQL语句

​​​​​​​​​​​​​​2.1、拉宽网点车辆表

SELECT 
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
TTL ."given_load" ,
TTL ."load_cn_unit" ,
TTL ."load_en_unit" ,
TTL ."buy_dt" ,
TTL ."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
dot."id" AS dot_id,
dot."dot_name" ,
dot."dot_number" ,
dot."dot_addr" ,
dot."dot_gis_addr" ,
dot."dot_tel" ,
dot."manage_area_id" ,
dot."manage_area_gis" ,
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_transport_tool"  ttl
LEFT JOIN "tbl_dot_transport_tool" tdtl ON ttl."id" = tdtl."transport_tool_id" 
LEFT JOIN "tbl_dot" dot ON DOT ."id" = TDTL ."dot_id"
LEFT JOIN "tbl_company_dot_map" companydot ON companydot."dot_id" = TDTL ."dot_id"
LEFT JOIN "tbl_company" company ON company."id" = companydot."company_id" 

2.2、​​​​​​​​​​​​​​拉宽仓库车辆表

SELECT 
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
ttl."given_load" ,
ttl."load_cn_unit" ,
ttl."load_en_unit" ,
ttl."buy_dt" ,
ttl."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
warehouse."id" ,
warehouse."name",
warehouse."addr",
warehouse."addr_gis",
warehouse."employee_id",
warehouse."type",
warehouse."area",
warehouse."is_lease",
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_warehouse_transport_tool"  twt
LEFT JOIN "tbl_transport_tool" ttl ON twt."transport_tool_id" = ttl."id" 
LEFT JOIN "tbl_warehouse" warehouse ON WAREHOUSE ."id" = twt."warehouse_id"
LEFT JOIN "tbl_company_warehouse_map" warehouse_map ON warehouse_map."warehouse_id" = warehouse."id" 
LEFT JOIN "tbl_company" company ON company."id" = warehouse_map."company_id"

3、Spark实现

实现步骤:

  • dwd目录下创建 TransportToolDWD 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 获取运输工具表(tbl_transport_tool)数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 获取网点运输工具关联表(tbl_dot_transport_tool数据,并缓存数据
  • 获取网点表(tbl_dot)数据,并缓存数据
  • 获取公司网点关联表(tbl_company_dot_map)数据,并缓存数据
  • 获取仓库运输工具关联表(tbl_warehouse_transport_tool数据,并缓存数据
  • 获取公司仓库关联表(tbl_company_warehouse_map数据,并缓存数据
  • 获取仓库表(tbl_warehouse数据,并缓存数据
  • 获取公司表(tbl_company数据,并缓存数据
  • 根据以下方式拉宽仓库车辆明细数据
    • 根据交通工具id,在交通工具表中获取交通工具数据
    • 根据网点id,在网点表中获取网点数据
    • 根据公司id,在公司表中获取公司数据
    • 根据仓库id,在仓库表中获取仓库数据
  • 创建网点车辆明细宽表(若存在则不创建)
  • 创建仓库车辆明细宽表(若存在则不创建)
  • 将仓库车辆明细宽表数据写入到kudu数据表中
  • 删除缓存数据

3.1、初始化环境变量

初始化运单明细拉宽作业的环境变量

package cn.it.logistics.offline.dwd

import cn.it.logistics.common.Configuration, SparkUtils
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * 车辆主题开发
 * 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
 * 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
 * 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
 */
object TransportToolDWD extends OfflineApp
  //定义应用的名称
  val appName = this.getClass.getSimpleName

  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = 
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建车辆明细宽表的schema表结构
     * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6)将缓存的数据删除掉
     * 7)停止任务
     */

    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //数据处理
    execute(sparkSession)
  
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = 
    sparkSession.stop()
  

 ​​​​3.2、加载运输工具表及车辆相关的表并缓存

  • 加载运输工具表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//加载车辆表数据(事实表)
val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

//加载仓库车辆表数据
val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)

//加载网点表的数据
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)

//加载公司网点关联表的数据
val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)

//加载公司表的数据
val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)

//加载仓库车辆关联表数据(事实表)
val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

//加载仓库公司关联表
val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)

//加载仓库表数据
val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)

//加载物流码表数据
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)

import sparkSession.implicits._
//获取运输工具类型
val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))

//获取运输工具状态
val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))

3.3、​​​​​​​定义网点车辆宽表的关联关系

  • 为了在DWS层任务中方便的获取每日增量网点车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:

//TODO 4)定义维度表与事实表的关联
val left_outer: String = "left_outer"
// 4.1:拉宽网点车辆表
val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
  .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
  .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
  .join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
  .join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
  .join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
  .withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
  .sort(ttDF.col("cdt").asc)
  .select(
    ttDF("id"), //车辆表id
    ttDF("brand"), //车辆表brand
    ttDF("model"), //车辆表model
    ttDF("type").cast(IntegerType), //车辆表type
    transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
    ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
    ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
    ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
    ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
    ttDF("licensePlate").as("license_plate"), //车辆表license_plate
    ttDF("state").cast(IntegerType), //车辆表state
    transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
    ttDF("cdt"), //车辆表cdt
    ttDF("udt"), //车辆表udt
    ttDF("remark"), //车辆表remark
    dotDF("id").as("dot_id"), //网点表dot_id
    dotDF("dotNumber").as("dot_number"), //网点表dot_number
    dotDF("dotName").as("dot_name"), //网点表dot_name
    dotDF("dotAddr").as("dot_addr"), //网点表dot_addr
    dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr
    dotDF("dotTel").as("dot_tel"), //网点表dot_tel
    dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id
    dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis
    companyDF("id").alias("company_id"), //公司表id
    companyDF("companyName").as("company_name"), //公司表company_name
    companyDF("cityId").as("city_id"), //公司表city_id
    companyDF("companyNumber").as("company_number"), //公司表company_number
    companyDF("companyAddr").as("company_addr"), //公司表company_addr
    companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
    companyDF("companyTel").as("company_tel"), //公司表company_tel
    companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
    $"day"
  )

3.4、​​​​​​​创建网点车辆明细宽表并将网点车辆明细数据写入到kudu数据表中

网点车辆明细宽表数据需要保存到kudu中,因此在第一次执行网点车辆明细拉宽操作时,网点车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建

实现步骤:

  • TransportToolDWD 单例对象中调用save方法

实现过程:

  • TransportToolDWD 单例对象Main方法中调用save方法
//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail)

3.5、​​​​​​​​​​​​​​定义仓库车辆宽表的关联关系

  • 为了在DWS层任务中方便的获取每日增量仓库车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:

// 4.2:拉宽仓库车辆表
// 拉宽仓库车辆表
val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表
  .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
  .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
  .join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库
  .join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表
  .join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer)
  .withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
  .sort(ttDF.col("cdt").asc)
  .select(
    ttDF("id"), //车辆表id
    ttDF("brand"), //车辆表brand
    ttDF("model"), //车辆表model
    ttDF("type").cast(IntegerType), //车辆表type
    transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
    ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load
    ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
    ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
    ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
    ttDF("licensePlate").as("license_plate"), //车辆表license_plate
    ttDF("state").cast(IntegerType), //车辆表state
    transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述
    ttDF("cdt"), //车辆表cdt
    ttDF("udt"), //车辆表udt
    ttDF("remark"), //车辆表remark
    wsDF("id").as("ws_id"), //仓库表id
    wsDF("name"), //仓库表name
    wsDF("addr"), //仓库表addr
    wsDF("addrGis").as("addr_gis"), //仓库表addr_gis
    wsDF("employeeId").as("employee_id"), //仓库表employee_id
    wsDF("type").as("ws_type").cast(IntegerType), //仓库表type
    wsDF("area"), //仓库表area
    wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease
    companyDF("id").alias("company_id"), //公司表id
    companyDF("companyName").as("company_name"), //公司表company_name
    companyDF("cityId").as("city_id"), //公司表city_id
    companyDF("companyNumber").as("company_number"), //公司表company_number
    companyDF("companyAddr").as("company_addr"), //公司表company_addr
    companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
    companyDF("companyTel").as("company_tel"), //公司表company_tel
    companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
    $"day"
  )

3.6、创建仓库车辆明细宽表并将仓库车辆明细数据写入到kudu数据表中

仓库车辆明细宽表数据需要保存到kudu中,因此在第一次执行仓库车辆明细拉宽操作时,仓库车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建

实现步骤:

  • TransportToolDWD 单例对象中调用save方法

实现过程:

  • TransportToolDWD 单例对象Main方法中调用save方法
save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail)

3.7、​​​​​​​​​​​​​​删除缓存数据

为了释放资源,车辆明细宽表数据计算完成以后,需要将缓存的源表数据删除。

//TODO 6)将缓存的数据删除掉
ttDF.unpersist()
ttDotDF.unpersist()
dotDF.unpersist()
companyDotDF.unpersist()
companyDF.unpersist()
ttWsDF.unpersist()
companyWareHouseMapDF.unpersist()
wsDF.unpersist()
codesDF.unpersist()

3.8、​​​​​​​​​​​​​​完整代码

package cn.it.logistics.offline.dwd

import cn.it.logistics.common.CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.date_format
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.DataFrame, SparkSession
import org.apache.spark.storage.StorageLevel

/**
 * 车辆主题开发
 * 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
 * 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
 * 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
 */
object TransportToolDWD extends OfflineApp
  //定义应用的名称
  val appName = this.getClass.getSimpleName

  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = 
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建车辆明细宽表的schema表结构
     * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6)将缓存的数据删除掉
     * 7)停止任务
     */

    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //数据处理
    execute(sparkSession)
  
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = 
    //TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
    //加载车辆表数据(事实表)
    val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

    //加载仓库车辆表数据
    val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)

    //加载网点表的数据
    val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)

    //加载公司网点关联表的数据
    val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)

    //加载公司表的数据
    val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)

    //加载仓库车辆关联表数据(事实表)
    val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

    //加载仓库公司关联表
    val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)

    //加载仓库表数据
    val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)

    //加载物流码表数据
    val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)

    import sparkSession.implicits._
    //获取运输工具类型
    val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))

    //获取运输工具状态
    val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))

    //TODO 4)定义维度表与事实表的关联
    val left_outer: String = "left_outer"
    // 4.1:拉宽网点车辆表
    val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
      .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
      .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
      .join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
      .join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
      .join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
      .withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
      .sort(ttDF.col("cdt").asc)
      .select(
        ttDF("id"), //车辆表id
        ttDF("brand"), //车辆表brand
        ttDF("model"), //车辆表model
        ttDF("type").cast(IntegerType), //车辆表type
        transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
        ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
        ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
        ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
        ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
        ttDF("licensePlate").as("license_plate"), //车辆表license_plate
        ttDF("state").cast(IntegerType), //车辆表state
        transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
        ttDF("cdt"), //车辆表cdt
        ttDF("udt"), //车辆表udt
        ttDF("remark"), //车辆表remark
        dotDF("id").as("dot_id"), //网点表dot_id
        dotDF("dotNumber").as("dot_number"), //网点表dot_number
        dotDF("dotName").as("dot_name"), //网点表dot_name
        dot

客快物流大数据项目(九十六):clickhouse的versionedcollapsingmergetree深入了解(代码片段)

文章目录ClickHouse的VersionedCollapsingMergeTree深入了解一、创建VersionedCollapsingMergeTree引擎表的语法二、折叠数据三、使用示例ClickHouse的VersionedCollapsingMergeTree深入了解该引擎继承自 MergeTree 并将折叠行的逻辑添加到合并数据部分的算... 查看详情

客快物流大数据项目(六十):将消费的kafka数据转换成bean对象(代码片段)

目录将消费的kafka数据转换成bean对象一、将OGG数据转换成bean对象二、​​​​​​​将Canal数据转换成bean对象三、完整代码将消费的kafka数据转换成bean对象一、​​​​​​​将OGG数据转换成bean对象实现步骤:消费kafka的log... 查看详情

客快物流大数据项目(七十六):使用impala对kudu进行dml操作

文章目录使用Impala对kudu进行DML操作一、将数据插入Kudu表 查看详情

客快物流大数据项目(八十六):clickhouse的深入了解

文章目录ClickHouse的深入了解一、介绍二、特性三、优势四、​​​​​​​​​​​​​​劣势五、​​​​​​​​​​​​​​基准测试六、应用场景ClickHouse的深入了解一、介绍ClickHouse是俄罗斯的Yandex于2016年开源的面向OL... 查看详情

客快物流大数据项目(七十二):impalasql语法(代码片段)

文章目录Impalasql语法一、数据库特定语言1、创建数据库2、删除数据库二、​​​​​​​表特定语句1、createtable语句2、insert语句3、select语句4、describe语句5、altertable6、delete、truncatetable7、view视图8、orderby子句9、groupby子句10、ha... 查看详情

客快物流大数据项目(八十九):clickhouse的数据类型支持(代码片段)

文章目录ClickHouse的数据类型支持一、整型二、​​​​​​​浮点型三、​​​​​​​​​​​​​​Decimal四、布尔型五、字符串类型六、​​​​​​​​​​​​​​UUID七、​​​​​​​Date类型八、​​​​​​​Da... 查看详情

客快物流大数据项目(四十二):java代码操作kudu(代码片段)

目录Java代码操作Kudu一、构建maven工程二、导入依赖三、​​​​​​​创建包结构四、​​​​​​​初始化方法五、​​​​​​​创建表六、​​​​​​​插入数据七、​​​​​​​查询数据八、修改数据九、​​​... 查看详情

客快物流大数据项目(八十八):clickhouse快速入门(代码片段)

文章目录ClickHouse快速入门一、​​​​​​​​​​​​​​安装ClickHouse(单机)1、安装yum-utils工具包2、添加ClickHouse的yum源3、安装ClickHouse的服务端和客户端4、关于安装的说明5、查看ClickHouse的版本信息二、在命令行... 查看详情

客快物流大数据项目(四十三):kudu的分区方式(代码片段)

目录kudu的分区方式一、HashPartitioning(哈希分区)二、RangePartitioning(范围分区) 三、​​​​​​​MultilevelPartitioning(多级分区)kudu的分区方式为了提供可扩展性,Kudu表被划分为称为tablets的单元,并分布在许多tabletservers上。... 查看详情

客快物流大数据项目(九十七):clickhouse的sql语法(代码片段)

文章目录ClickHouse的SQL语法一、​​​​​​​常用的SQL命令二、​​​​​​​​​​​​​​select查询语法三、insertinto语法四、​​​​​​​​​​​​​​alter语法ClickHouse的SQL语法一、​​​​​​​常用的SQL命令作用S... 查看详情

客快物流大数据项目:docker的迁移与备份(代码片段)

Docker的迁移与备份一、容器保存为镜像可以通过以下命令将容器保存为镜像dockercommitmynginxmynginx_image基于新创建的镜像创建容器dockerrun-di--name=mynginx2-p81:80mynginx_image访问81端口二、镜像备份可以通过以下命令将镜像保存为tar文... 查看详情

客快物流大数据项目(七十九):impala映射kudu表(代码片段)

文章目录Impala映射kudu表一、​​​​​​​​​​​​​​登录Hue页面1、选择Impala2、登录Hue页面二、选择Impala执行引擎1、选择Impala执行引擎2、进入编写执行sql语句窗口三、执行sql语句映射Kudu表Impala映射kudu表一、​​​​​... 查看详情

客快物流大数据项目学习框架

文章目录客快物流大数据项目学习框架前言一、项目简介二、功能介绍三、项目背景四、服务器资源规划五、技术亮点及价值六、智慧物流大数据平台客快物流大数据项目学习框架前言利用框架的力量,看懂游戏规则,... 查看详情

客快物流大数据项目(九十二):clickhouse的mergetree系列引擎介绍和mergetree深入了解(代码片段)

文章目录ClickHouse的MergeTree系列引擎介绍和MergeTree深入了解一、MergeTree系列引擎介绍二、​​​​​​​MergeTree深入了解1、创建MergeTree表的说明2、创建MergeTree引擎的表3、删除MergeTree引擎的表ClickHouse的MergeTree系列引擎介绍和MergeTr... 查看详情

客快物流大数据项目(一百):clickhouse的使用

文章目录ClickHouse的使用一、使用Java操作ClickHouse1、构建maven工程 查看详情

客快物流大数据项目(一百零五):启动elasticsearch

文章目录启动ElasticSearch一、启动ES服务端二、​​​​​​​启动Kibana启动ElasticSearch 查看详情

客快物流大数据项目(一百零五):启动elasticsearch

文章目录启动ElasticSearch一、启动ES服务端二、​​​​​​​启动Kibana启动ElasticSearch 查看详情

客快物流大数据项目(七十七):使用impala对kudu更改表属性操作(代码片段)

文字目录使用Impala对kudu更改表属性操作一、重命名Impala映射表二、重新命名内部表的基础Kudu表三、​​​​​​​将外部表重新映射到不同的Kudu表四、更改KuduMaster地址五、将内部管理的表更改为外部使用Impala对kudu更改表属性... 查看详情