客快物流大数据项目(六十七):客户主题(代码片段)

Lansonli Lansonli     2022-12-04     635

关键词:

文章目录

客户主题

一、背景介绍

​​​​​​​二、指标明细

三、​​​​​​​表关联关系

1、​​​​​​​事实表

2、​​​​​​​​​​​​​​维度表

3、​​​​​​​​​​​​​​关联关系

四、客户数据拉宽开发

1、​​​​​​​​​​​​​​拉宽后的字段

2、​​​​​​​​​​​​​​SQL语句

3、​​​​​​​​​​​​​​Spark实现

五、​​​​​​​​​​​​​​客户数据指标开发

1、​​​​​​​​​​​​​​计算的字段

2、Spark实现


客户主题

一、​​​​​​​背景介绍

客户主题主要是通过分析用户的下单情况构建用户画像

​​​​​​​二、指标明细

指标列表

总客户数

今日新增客户数

留存率(超过180天未下单表示已流失,否则表示留存)

活跃用户数(近10天内有发件的客户表示活跃用户)

月度新老用户数(应该是月度新用户!)

沉睡用户数(3个月~6个月之间的用户表示已沉睡)

流失用户数(9个月未下单表示已流失)

客单数

客单价

平均客单数

普通用户数

三、​​​​​​​表关联关系

1、​​​​​​​事实表

表名

描述

tbl_customer

用户表

2、​​​​​​​​​​​​​​维度表

表名

描述

tbl_codes

物流系统码表

tbl_consumer_sender_info

客户寄件信息表

tbl_express_package

快递包裹表

 

3、​​​​​​​​​​​​​​关联关系

用户表与维度表的关联关系如下:

四、客户数据拉宽开发

1、​​​​​​​​​​​​​​拉宽后的字段

字段名

别名

字段描述

tbl_customer

id

id

客户ID

tbl_customer

name

name

客户姓名

tbl_customer

tel

tel

客户电话

tbl_customer

mobile

mobile

客户手机

tbl_customer

email

email

客户邮箱

tbl_customer

type

type

客户类型ID

tbl_codes

codeDesc

type_name

客户类型名称

tbl_customer

isownreg

is_own_reg

是否自行注册

tbl_customer

regdt

regdt

注册时间

tbl_customer

regchannelid

reg_channel_id

注册渠道ID

tbl_customer

state

state

客户状态ID

tbl_customer

cdt

cdt

创建时间

tbl_customer

udt

udt

修改时间

tbl_customer

lastlogindt

last_login_dt

最后登录时间

tbl_customer

remark

remark

备注

tbl_consumer_sender_info

cdt

first_cdt

首次下单时间

tbl_consumer_sender_info

cdt

last_cdt

尾次下单时间

tbl_express_package

billCount

billCount

下单总数

tbl_express_package

totalAmount

totalAmount

累计下单金额

tbl_customer

yyyyMMdd(cdt)

day

创建时间

年月日格式

2、​​​​​​​​​​​​​​SQL语句

SELECT 
TC."id" ,
TC."name" ,
TC."tel",
TC."mobile",
TC."email",
TC."type",
TC."is_own_reg",
TC."reg_dt",
TC."reg_channel_id",
TC."state",
TC."cdt",
TC."udt",
TC."last_login_dt",
TC."remark",
customercodes."code_desc",
sender_info.first_cdt AS first_sender_cdt ,
sender_info.last_cdt AS last_sender_cdt, 
sender_info.billCount AS billCount, 
sender_info.totalAmount AS totalAmount
FROM "tbl_customer" tc 
LEFT JOIN (
SELECT 
	"ciid", min(sender_info."id") first_id, max(sender_info."id") last_id, min(sender_info."cdt") first_cdt, max(sender_info."cdt") last_cdt,COUNT(sender_info."id" ) billCount,sum(express_package."actual_amount") totalAmount
	FROM "tbl_consumer_sender_info" sender_info
	LEFT JOIN "tbl_express_package" express_package
		ON SENDER_INFO."pkg_id" =express_package."id"
	GROUP BY sender_info."ciid"
) sender_info
	ON	tc."id" = sender_info."ciid"
LEFT JOIN "tbl_codes" customercodes ON customercodes."type" =16 AND tc."type" =customercodes."code" 

 

3、​​​​​​​​​​​​​​Spark实现

实现步骤:

  • dwd目录下创建 CustomerDWD 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 获取客户表(tbl_customer)数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 获取客户寄件信息表(tbl_consumer_sender_info数据,并缓存数据
  • 获取客户包裹表(tbl_express_package数据,并缓存数据
  • 获取物流字典码表(tbl_codes)数据,并缓存数据
  • 根据以下方式拉宽仓库车辆明细数据
    • 根据客户id,在客户表中获取客户数据
    • 根据包裹id,在包裹表中获取包裹数据
    • 根据客户类型id,在物流字典码表中获取客户类型名称数据
  • 创建客户明细宽表(若存在则不创建)
  • 将客户明细宽表数据写入到kudu数据表中
  • 删除缓存数据

3.1、​​​​​​​​​​​​​​初始化环境变量

初始化客户明细拉宽作业的环境变量

package cn.it.logistics.offline.dwd

import cn.it.logistics.common.CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame, SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType

/**
 * 客户主题数据的拉宽操作
 */
object CustomerDWD extends OfflineApp 
  //定义应用的名称
  val appName = this.getClass.getSimpleName

  def main(args: Array[String]): Unit = 
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建车辆明细宽表的schema表结构
     * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6)将缓存的数据删除掉
     * 7)停止任务
     */

    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //数据处理
    execute(sparkSession)
  

  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = 
    sparkSession.stop()
  

 

3.2、​​​​​​​​​​​​​​加载客户相关的表并缓存

  • 加载客户表的时候,需要指定日期条件,因为客户主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
//导入隐士转换
import sparkSession.implicits._
val customerSenderInfoDF: DataFrame = getKuduSource(sparkSession, TableMapping.consumerSenderInfo, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
val customerDF = getKuduSource(sparkSession, TableMapping.customer, true).persist(StorageLevel.DISK_ONLY_2)
val expressPageageDF = getKuduSource(sparkSession, TableMapping.expressPackage, true).persist(StorageLevel.DISK_ONLY_2)
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
val customerTypeDF = codesDF.where($"type" === CodeTypeMapping.CustomType)

3.3、​​​​​​​​​​​​​​定义表的关联关系

  • 为了在DWS层任务中方便的获取每日增量客户表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:

//TODO 4)定义维度表与事实表的关联关系
val left_outer = "left_outer"

/**
 * 获取每个用户的首尾单发货信息及发货件数和总金额
 */
val customerSenderDetailInfoDF: DataFrame = customerSenderInfoDF.join(expressPageageDF, expressPageageDF("id") === customerSenderInfoDF("pkgId"), left_outer)
  .groupBy(customerSenderInfoDF("ciid"))
  .agg(min(customerSenderInfoDF("id")).alias("first_id"),
    max(customerSenderInfoDF("id")).alias("last_id"),
    min(expressPageageDF("cdt")).alias("first_cdt"),
    max(expressPageageDF("cdt")).alias("last_cdt"),
    count(customerSenderInfoDF("id")).alias("totalCount"),
    sum(expressPageageDF("actualAmount")).alias("totalAmount")
  )

val customerDetailDF: DataFrame = customerDF
  .join(customerSenderDetailInfoDF, customerDF("id") === customerSenderInfoDF("ciid"), left_outer)
  .join(customerTypeDF, customerDF("type") === customerTypeDF("code").cast(IntegerType), left_outer)
  .sort(customerDF("cdt").asc)
  .select(
    customerDF("id"),
    customerDF("name"),
    customerDF("tel"),
    customerDF("mobile"),
    customerDF("type").cast(IntegerType),
    customerTypeDF("codeDesc").as("type_name"),
    customerDF("isownreg").as("is_own_reg"),
    customerDF("regdt").as("regdt"),
    customerDF("regchannelid").as("reg_channel_id"),
    customerDF("state"),
    customerDF("cdt"),
    customerDF("udt"),
    customerDF("lastlogindt").as("last_login_dt"),
    customerDF("remark"),
    customerSenderDetailInfoDF("first_id").as("first_sender_id"), //首次寄件id
    customerSenderDetailInfoDF("last_id").as("last_sender_id"), //尾次寄件id
    customerSenderDetailInfoDF("first_cdt").as("first_sender_cdt"), //首次寄件时间
    customerSenderDetailInfoDF("last_cdt").as("last_sender_cdt"), //尾次寄件时间
    customerSenderDetailInfoDF("totalCount"), //寄件总次数
    customerSenderDetailInfoDF("totalAmount") //总金额
  )

 

3.4、​​​​​​​​​​​​​​创建客户明细宽表并将客户明细数据写入到kudu数据表中

客户明细宽表数据需要保存到kudu中,因此在第一次执行客户明细拉宽操作时,客户明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建

实现步骤:

  • CustomerDWD 单例对象中调用save方法

实实现过程:

  • CustomerDWD 单例对象Main方法中调用save方法
save(customerDetailDF, OfflineTableDefine.customerDetail)

3.5、​​​​​​​​​​​​​​删除缓存数据

为了释放资源,客户明细宽表数据计算完成以后,需要将缓存的源表数据删除。

//移除缓存
customerDetailDF.unpersist
codesDF.unpersist
expressPackageDF.unpersist
customerSenderDF.unpersist
customerDF.unpersist

3.6、完整代码

package cn.it.logistics.offline.dwd

import cn.it.logistics.common.CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame, SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType

/**
 * 客户主题数据的拉宽操作
 */
object CustomerDWD extends OfflineApp 
  //定义应用的名称
  val appName = this.getClass.getSimpleName

  def main(args: Array[String]): Unit = 
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建车辆明细宽表的schema表结构
     * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6)将缓存的数据删除掉
     * 7)停止任务
     */

    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //数据处理
    execute(sparkSession)
  

  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = 

    //导入隐士转换
    import sparkSession.implicits._
    val customerSenderInfoDF: DataFrame = getKuduSource(sparkSession, TableMapping.consumerSenderInfo, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
    val customerDF = getKuduSource(sparkSession, TableMapping.customer, true).persist(StorageLevel.DISK_ONLY_2)
    val expressPageageDF = getKuduSource(sparkSession, TableMapping.expressPackage, true).persist(StorageLevel.DISK_ONLY_2)
    val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
    val customerTypeDF = codesDF.where($"type" === CodeTypeMapping.CustomType)

    //TODO 4)定义维度表与事实表的关联关系
    val left_outer = "left_outer"

    /**
     * 获取每个用户的首尾单发货信息及发货件数和总金额
     */
    val customerSenderDetailInfoDF: DataFrame = customerSenderInfoDF.join(expressPageageDF, expressPageageDF("id") === customerSenderInfoDF("pkgId"), left_outer)
      .groupBy(customerSenderInfoDF("ciid"))
      .agg(min(customerSenderInfoDF("id")).alias("first_id"),
        max(customerSenderInfoDF("id")).alias("last_id"),
        min(expressPageageDF("cdt")).alias("first_cdt"),
        max(expressPageageDF("cdt")).alias("last_cdt"),
        count(customerSenderInfoDF("id")).alias("totalCount"),
        sum(expressPageageDF("actualAmount")).alias("totalAmount")
      )

    val customerDetailDF: DataFrame = customerDF
      .join(customerSenderDetailInfoDF, customerDF("id") === customerSenderInfoDF("ciid"), left_outer)
      .join(customerTypeDF, customerDF("type") === customerTypeDF("code").cast(IntegerType), left_outer)
      .sort(customerDF("cdt").asc)
      .select(
        customerDF("id"),
        customerDF("name"),
        customerDF("tel"),
        customerDF("mobile"),
        customerDF("type").cast(IntegerType),
        customerTypeDF("codeDesc").as("type_name"),
        customerDF("isownreg").as("is_own_reg"),
        customerDF("regdt").as("regdt"),
        customerDF("regchannelid").as("reg_channel_id"),
        customerDF("state"),
        customerDF("cdt"),
        customerDF("udt"),
        customerDF("lastlogindt").as("last_login_dt"),
        customerDF("remark"),
        customerSenderDetailInfoDF("first_id").as("first_sender_id"), //首次寄件id
        customerSenderDetailInfoDF("last_id").as("last_sender_id"), //尾次寄件id
        customerSenderDetailInfoDF("first_cdt").as("first_sender_cdt"), //首次寄件时间
        customerSenderDetailInfoDF("last_cdt").as("last_sender_cdt"), //尾次寄件时间
        customerSenderDetailInfoDF("totalCount"), //寄件总次数
        customerSenderDetailInfoDF("totalAmount") //总金额
      )

    save(customerDetailDF, OfflineTableDefine.customerDetail)
    // 5.4:将缓存的数据删除掉
    customerDF.unpersist()
    customerSenderInfoDF.unpersist()
    expressPageageDF.unpersist()
    customerTypeDF.unpersist()
    
    sparkSession.stop()
  

五、​​​​​​​​​​​​​​客户数据指标开发

1、​​​​​​​​​​​​​​计算的字段

字段名

字段描述

id

主键id(数据产生时间)

customerTotalCount

总客户数

addtionTotalCount

今日新增客户数(注册时间为今天)

lostCustomerTotalCount

留存数(超过180天未下单表示已流失,否则表示留存)

lostRate

留存率

activeCount

活跃用户数(10天内有发件的客户表示活跃用户)

monthOfNewCustomerCount

月度新老用户数(应该是月度新用户!)

sleepCustomerCount

沉睡用户数(3个月~6个月之间的用户表示已沉睡)

loseCustomerCount

流失用户数(9个月未下单表示已流失)

customerBillCount

客单数

customerAvgAmount

客单价

avgCustomerBillCount

平均客单数

2、Spark实现

实现步骤:

  • dws目录下创建 ConsumerDWS 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 根据指定的日期获取拉宽后的用户宽表(tbl_customer_detail)增量数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 指标计算
    • 总客户数
    • 今日新增客户数(注册时间为今天)
    • 留存数(超过180天未下单表示已流失,否则表示留存)
    • 留存率
    • 活跃用户数(近10天内有发件的客户表示活跃用户)
    • 月度新老用户数(应该是月度新用户!)
    • 沉睡用户数(3个月~6个月之间的用户表示已沉睡)
    • 流失用户数(9个月未下单表示已流失)
    • 客单数
    • 客单价
    • 平均客单数
    • 普通用户数
    • 获取当前时间yyyyMMddHH
  • 构建要持久化的指标数据(需要判断计算的指标是否有值,若没有需要赋值默认值
  • 通过StructType构建指定Schema
  • 创建客户指标数据表(若存在则不创建)
  • 持久化指标数据到kudu表

2.1、​​​​​​​​​​​​​​初始化环境变量

package cn.it.logistics.offline.dws

import cn.it.logistics.common.Configuration, DateHelper, OfflineTableDefine, SparkUtils
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame, Row, SparkSession
import org.apache.spark.sql.types.DoubleType, LongType, Metadata, StringType, StructField, StructType
import org.apache.spark.sql.functions._
import scala.collection.mutable.ArrayBuffer

/**
 * 客户主题指标计算
 */
object CustomerDWS  extends  OfflineApp 
  //定义应用程序的名称
  val appName = this.getClass.getSimpleName

  def main(args: Array[String]): Unit = 
    /**
     * 实现步骤:
     * 1)创建SparkConf对象
     * 2)创建SparkSession对象
     * 3)读取客户明细宽表的数据
     * 4)对客户明细宽表的数据进行指标的计算
     * 5)将计算好的指标数据写入到kudu数据库中
     * 5.1:定义指标结果表的schema信息
     * 5.2:组织需要写入到kudu表的数据
     * 5.3:判断指标结果表是否存在,如果不存在则创建
     * 5.4:将数据写入到kudu表中
     * 6)删除缓存数据
     * 7)停止任务,退出sparksession
     */

    //TODO 1)创建SparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //处理数据
    execute(sparkSession)
  

  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = 
sparkSession.stop()
  

2.2、加载客户宽表增量数据并缓存

加载客户宽表的时候,需要指定日期条件,因为客户主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。

//TODO 3)读取客户明细宽表的数据(用户主题的数据不需要按照天进行增量更新,而是每天全量运行)
val customerDetailDF = getKuduSource(sparkSession, OfflineTableDefine.customerDetail, Configuration.isFirstRunnable)

2.3、​​​​​​​​​​​​​​指标计算

//定义数据集合
val rows: ArrayBuffer[Row] = ArrayBuffer[Row]()

//TODO 4)对客户明细宽表的数据进行指标的计算
val customerTotalCount: Row = customerDetailDF.agg(count($"id").alias("total_count")).first()

//今日新增客户数
val addTotalCount: Long = customerDetailDF.where(date_format($"regDt", "yyyy-MM-dd").equalTo(DateHelper.getyesterday("yyyy-MM-dd"))).agg(count($"id")).first().getLong(0)

//留存率(超过180天未下单表示已经流失,否则表示留存)
//留存用户数
//val lostCustomerTotalCount: Long = customerDetailDF.join(customerSenderInfoDF.where("cdt >= date_sub(now(), 180)"), customerDetailDF("id") === customerSenderInfoDF("ciid")).count()
val lostCustomerTotalCount: Long = customerDetailDF.where("last_sender_cdt >= date_sub(now(), 180)").count()
println(lostCustomerTotalCount)

//留存率,超过180天未下单的用户数/所有的用户数
val lostRate: Double = (lostCustomerTotalCount / (if (customerTotalCount.isNullAt(0)) 1D else customerTotalCount.getLong(0))).asInstanceOf[Number].doubleValue()
println(lostRate)

// 活跃用户数(近10天内有发件的客户表示活跃用户)
val activeCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 10)").count

// 月度新老用户数(应该是月度新用户!)
val monthOfNewCustomerCount = customerDetailDF.where($"regDt".between(trunc($"regDt", "MM"), date_format(current_date(), "yyyy-MM-dd"))).count

// 沉睡用户数(3个月~6个月之间的用户表示已沉睡)
val sleepCustomerCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 180) and last_sender_cdt<=date_sub(now(), 90)").count
println(sleepCustomerCount)

// 流失用户数(9个月未下单表示已流失)
val loseCustomerCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 270)").count
println(loseCustomerCount)

// 客单数
val customerSendInfoDF = customerDetailDF.where("first_sender_id is not null")

val customerBillCountAndAmount: Row = customerSendInfoDF.agg(sum("totalCount").alias("totalCount"), sum("totalAmount").alias("totalAmount")).first()

// 客单价
val customerAvgAmount = customerBillCountAndAmount.get(1).toString.toDouble / customerBillCountAndAmount.get(0).toString.toDouble //总金额/总件数
println(customerAvgAmount)

// 平均客单数
val avgCustomerBillCount = customerSendInfoDF.count / customerDetailDF.count

// 获取昨天时间yyyyMMdd
val cdt = DateHelper.getyesterday("yyyyMMdd")
// 构建要持久化的指标数据
val rowInfo = Row(
  cdt,
  if (customerTotalCount.isNullAt(0)) 0L else customerTotalCount.get(0).asInstanceOf[Number].longValue(),
  addTotalCount,
  lostCustomerTotalCount,
  lostRate,
  activeCount,
  monthOfNewCustomerCount,
  sleepCustomerCount,
  loseCustomerCount,
  if (customerBillCountAndAmount.isNullAt(0)) 0L else customerBillCountAndAmount.get(0).asInstanceOf[Number].longValue(),
  customerAvgAmount,
  avgCustomerBillCount
)
rows.append(rowInfo)

 

2.4、​​​​​​​通过StructType构建指定Schema

import sparkSession.implicits._
val schema = StructType(Array(
  StructField("id", StringType, true, Metadata.empty),
  StructField("customerTotalCount", LongType, true, Metadata.empty),
  StructField("addtionTotalCount", LongType, true, Metadata.empty),
  StructField("lostCustomerTotalCount", LongType, true, Metadata.empty),
  StructField("lostRate", DoubleType, true, Metadata.empty),
  StructField("activeCount", LongType, true, Metadata.empty),
  StructField("monthOfNewCustomerCount", LongType, true, Metadata.empty),
  StructField("sleepCustomerCount", LongType, true, Metadata.empty),
  StructField("loseCustomerCount", LongType, true, Metadata.empty),
  StructField("customerBillCount", LongType, true, Metadata.empty),
  StructField("customerAvgAmount", DoubleType, true, Metadata.empty),
  StructField("avgCustomerBillCount", LongType, true, Metadata.empty)
))

 

2.5、​​​​​​​​​​​​​​持久化指标数据到kudu表

// 5.2:组织要写入到kudu表的数据
val data: RDD[Row] = sparkSession.sparkContext.makeRDD(rows)
val quotaDF: DataFrame = sparkSession.createDataFrame(data, schema)
save(quotaDF, OfflineTableDefine.customerSummery)

2.6、完整代码

package cn.it.logistics.offline.dws

import cn.it.logistics.common.Configure, DateHelper, OfflineTableDefine, SparkUtils
import cn.it.logistics.offline.OfflineApp
import cn.it.logistics.offline.dws.ExpressBillDWS.appName, execute
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame, Row, SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType, LongType, Metadata, StringType, StructField, StructType

import scala.collection.mutable.ArrayBuffer

/**
 * 客户主题开发
 * 读取客户明细宽表的数据,然后进行指标开发,将结果存储到kudu表中(DWS层)
 */
object ConsumerDWS extends  OfflineApp
  //定义应用的名称
  val appName: String = this.getClass.getSimpleName

  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = 
    /**
     * 实现步骤:
     * 1)创建sparkConf对象
     * 2)创建SparkSession对象
     * 3)读取客户宽表数据(判断是全量装载还是增量装载),将加载的数据进行缓存
     * 4)对客户明细表的数据进行指标计算
     * 5)将计算好的数写入到kudu表中
     *   5.1)定义写入kudu表的schema结构信息
     *   5.2)将组织好的指标结果集合转换成RDD对象
     *   5.3)创建表,写入数据
     * 6)删除缓存,释放资源
     * 7)停止作业,退出sparkSession
     */

    //TODO 1)创建sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName),
      SparkUtils.parameterParser(args)
    )

    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configure.LOG_OFF)

    //执行数据处理的逻辑
    execute(sparkSession)
  

  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = 
    //TODO 3)读取客户明细宽表的数据(用户主题的数据不需要按照天进行增量更新,而是每天全量运行)
    val customerDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.customerDetail, true)

    import sparkSession.implicits._
    val schema = StructType(Array(
      StructField("id", StringType, true, Metadata.empty),
      StructField("customerTotalCount", LongType, true, Metadata.empty),
      StructField("addtionTotalCount", LongType, true, Metadata.empty),
      StructField("lostCustomerTotalCount", LongType, true, Metadata.empty),
      StructField("lostRate", DoubleType, true, Metadata.empty),
      StructField("activeCount", LongType, true, Metadata.empty),
      StructField("monthOfNewCustomerCount", LongType, true, Metadata.empty),
      StructField("sleepCustomerCount", LongType, true, Metadata.empty),
      StructField("loseCustomerCount", LongType, true, Metadata.empty),
      StructField("customerBillCount", LongType, true, Metadata.empty),
      StructField("customerAvgAmount", DoubleType, true, Metadata.empty),
      StructField("avgCustomerBillCount", LongType, true, Metadata.empty),
      StructField("normalCustomerCount", LongType, true, Metadata.empty)
    ))

    //定义数据集合
    val rows: ArrayBuffer[Row] = ArrayBuffer[Row]()

    //TODO 4)对客户明细宽表的数据进行指标的计算
    val customerTotalCount: Row = customerDetailDF.agg(count($"id").alias("total_count")).first()

    //今日新增客户数
    val addTotalCount: Long = customerDetailDF.where(date_format($"regDt", "yyyy-MM-dd").equalTo(DateHelper.getyestday("yyyy-MM-dd"))).agg(count($"id")).first().getLong(0)

    //留存率(超过180天未下单表示已经流失,否则表示留存)
    //留存用户数
    //val lostCustomerTotalCount: Long = customerDetailDF.join(customerSenderInfoDF.where("cdt >= date_sub(now(), 180)"), customerDetailDF("id") === customerSenderInfoDF("ciid")).count()
    val lostCustomerTotalCount: Long = customerDetailDF.where("last_sender_cdt >= date_sub(now(), 180)").count()
    println(lostCustomerTotalCount)

    //留存率,超过180天未下单的用户数/所有的用户数
    val lostRate: Double = (lostCustomerTotalCount / (if (customerTotalCount.isNullAt(0)) 1D else customerTotalCount.getLong(0))).asInstanceOf[Number].doubleValue()
    println(lostRate)

    // 活跃用户数(近10天内有发件的客户表示活跃用户)
    val activeCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 10)").count

    // 月度新老用户数(应该是月度新用户!)
    val monthOfNewCustomerCount = customerDetailDF.where($"regDt".between(trunc($"regDt", "MM"), date_format(current_date(), "yyyy-MM-dd"))).count

    // 沉睡用户数(3个月~6个月之间的用户表示已沉睡)
    val sleepCustomerCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 180) and last_sender_cdt<=date_sub(now(), 90)").count
    println(sleepCustomerCount)

    // 流失用户数(9个月未下单表示已流失)
    val loseCustomerCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 270)").count
    println(loseCustomerCount)

    // 客单数
    val customerSendInfoDF = customerDetailDF.where("first_sender_id is not null")

    val customerBillCountAndAmount: Row = customerSendInfoDF.agg(sum("totalCount").alias("totalCount"), sum("totalAmount").alias("totalAmount")).first()

    // 客单价
    val customerAvgAmount = customerBillCountAndAmount.get(1).toString.toDouble / customerBillCountAndAmount.get(0).toString.toDouble //总金额/总件数
    println(customerAvgAmount)

    // 平均客单数
    val avgCustomerBillCount = customerSendInfoDF.count / customerDetailDF.count

    // 普通用户数
    val normalCustomerRow: Row = customerDetailDF.where("type=1").agg(count($"id").alias("total_count")).first()
    println(normalCustomerRow)
    val normalCustomerCount: Long = if (normalCustomerRow.isNullAt(0)) 0L else normalCustomerRow.get(0).asInstanceOf[Number].longValue()

    // 获取昨天时间yyyyMMdd
    val cdt = DateHelper.getyestday("yyyyMMdd")
    // 构建要持久化的指标数据
    val rowInfo = Row(
      cdt,
      if (customerTotalCount.isNullAt(0)) 0L else customerTotalCount.get(0).asInstanceOf[Number].longValue(),
      addTotalCount,
      lostCustomerTotalCount,
      lostRate,
      activeCount,
      monthOfNewCustomerCount,
      sleepCustomerCount,
      loseCustomerCount,
      if (customerBillCountAndAmount.isNullAt(0)) 0L else customerBillCountAndAmount.get(0).asInstanceOf[Number].longValue(),
      customerAvgAmount,
      avgCustomerBillCount,
      normalCustomerCount
    )
    rows.append(rowInfo)

    // 5.2:组织要写入到kudu表的数据
    val data: RDD[Row] = sparkSession.sparkContext.makeRDD(rows)

    val quotaDF: DataFrame = sparkSession.createDataFrame(data, schema)
    save(quotaDF, OfflineTableDefine.customerSummery)

    //删除缓存,释放资源
    customerDetailDF.unpersist()

    sparkSession.stop()
  

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

客快物流大数据项目(九十七):clickhouse的sql语法(代码片段)

文章目录ClickHouse的SQL语法一、​​​​​​​常用的SQL命令二、​​​​​​​​​​​​​​select查询语法三、insertinto语法四、​​​​​​​​​​​​​​alter语法ClickHouse的SQL语法一、​​​​​​​常用的SQL命令作用S... 查看详情

客快物流大数据项目(七十七):使用impala对kudu更改表属性操作(代码片段)

文字目录使用Impala对kudu更改表属性操作一、重命名Impala映射表二、重新命名内部表的基础Kudu表三、​​​​​​​将外部表重新映射到不同的Kudu表四、更改KuduMaster地址五、将内部管理的表更改为外部使用Impala对kudu更改表属性... 查看详情

客快物流大数据项目(六十):将消费的kafka数据转换成bean对象(代码片段)

目录将消费的kafka数据转换成bean对象一、将OGG数据转换成bean对象二、​​​​​​​将Canal数据转换成bean对象三、完整代码将消费的kafka数据转换成bean对象一、​​​​​​​将OGG数据转换成bean对象实现步骤:消费kafka的log... 查看详情

客快物流大数据项目(八十八):clickhouse快速入门(代码片段)

文章目录ClickHouse快速入门一、​​​​​​​​​​​​​​安装ClickHouse(单机)1、安装yum-utils工具包2、添加ClickHouse的yum源3、安装ClickHouse的服务端和客户端4、关于安装的说明5、查看ClickHouse的版本信息二、在命令行... 查看详情

客快物流大数据项目(七十二):impalasql语法(代码片段)

文章目录Impalasql语法一、数据库特定语言1、创建数据库2、删除数据库二、​​​​​​​表特定语句1、createtable语句2、insert语句3、select语句4、describe语句5、altertable6、delete、truncatetable7、view视图8、orderby子句9、groupby子句10、ha... 查看详情

客快物流大数据项目(八十九):clickhouse的数据类型支持(代码片段)

文章目录ClickHouse的数据类型支持一、整型二、​​​​​​​浮点型三、​​​​​​​​​​​​​​Decimal四、布尔型五、字符串类型六、​​​​​​​​​​​​​​UUID七、​​​​​​​Date类型八、​​​​​​​Da... 查看详情

客快物流大数据项目(四十二):java代码操作kudu(代码片段)

目录Java代码操作Kudu一、构建maven工程二、导入依赖三、​​​​​​​创建包结构四、​​​​​​​初始化方法五、​​​​​​​创建表六、​​​​​​​插入数据七、​​​​​​​查询数据八、修改数据九、​​​... 查看详情

客快物流大数据项目(四十三):kudu的分区方式(代码片段)

目录kudu的分区方式一、HashPartitioning(哈希分区)二、RangePartitioning(范围分区) 三、​​​​​​​MultilevelPartitioning(多级分区)kudu的分区方式为了提供可扩展性,Kudu表被划分为称为tablets的单元,并分布在许多tabletservers上。... 查看详情

客快物流大数据项目:docker的迁移与备份(代码片段)

Docker的迁移与备份一、容器保存为镜像可以通过以下命令将容器保存为镜像dockercommitmynginxmynginx_image基于新创建的镜像创建容器dockerrun-di--name=mynginx2-p81:80mynginx_image访问81端口二、镜像备份可以通过以下命令将镜像保存为tar文... 查看详情

客快物流大数据项目(九十六):clickhouse的versionedcollapsingmergetree深入了解(代码片段)

文章目录ClickHouse的VersionedCollapsingMergeTree深入了解一、创建VersionedCollapsingMergeTree引擎表的语法二、折叠数据三、使用示例ClickHouse的VersionedCollapsingMergeTree深入了解该引擎继承自 MergeTree 并将折叠行的逻辑添加到合并数据部分的算... 查看详情

客快物流大数据项目(七十九):impala映射kudu表(代码片段)

文章目录Impala映射kudu表一、​​​​​​​​​​​​​​登录Hue页面1、选择Impala2、登录Hue页面二、选择Impala执行引擎1、选择Impala执行引擎2、进入编写执行sql语句窗口三、执行sql语句映射Kudu表Impala映射kudu表一、​​​​​... 查看详情

客快物流大数据项目学习框架

文章目录客快物流大数据项目学习框架前言一、项目简介二、功能介绍三、项目背景四、服务器资源规划五、技术亮点及价值六、智慧物流大数据平台客快物流大数据项目学习框架前言利用框架的力量,看懂游戏规则,... 查看详情

客快物流大数据项目(九十二):clickhouse的mergetree系列引擎介绍和mergetree深入了解(代码片段)

文章目录ClickHouse的MergeTree系列引擎介绍和MergeTree深入了解一、MergeTree系列引擎介绍二、​​​​​​​MergeTree深入了解1、创建MergeTree表的说明2、创建MergeTree引擎的表3、删除MergeTree引擎的表ClickHouse的MergeTree系列引擎介绍和MergeTr... 查看详情

客快物流大数据项目(一百):clickhouse的使用

文章目录ClickHouse的使用一、使用Java操作ClickHouse1、构建maven工程 查看详情

客快物流大数据项目(五十一):数据库表分析

目录数据库表分析一、物流运输管理数据库表1、揽件表(tbl_collect_package)2、客户表(tbl_customer)3、物流系统码表(tbl_codes)4、快递单据表(tbl_express_bill)5、快递包裹表(tbl_express_package& 查看详情

客快物流大数据项目(一百零五):启动elasticsearch

文章目录启动ElasticSearch一、启动ES服务端二、​​​​​​​启动Kibana启动ElasticSearch 查看详情

客快物流大数据项目(一百零五):启动elasticsearch

文章目录启动ElasticSearch一、启动ES服务端二、​​​​​​​启动Kibana启动ElasticSearch 查看详情

客快物流大数据项目(一百零六):实时etl处理

文章目录实时ETL处理一、业务流程二、​​ 查看详情