如何在火花聚合函数中实现scala类型安全

     2023-02-16     156

关键词:

【中文标题】如何在火花聚合函数中实现scala类型安全【英文标题】:How to implement scala type safety inside spark aggregation function 【发布时间】:2021-09-13 22:34:45 【问题描述】:

如何对 agg 函数中聚合的值实现类型安全?我的目标是在运行前进行类型检查。 $"event.zeroToSixty",例如在编译时不会被检查,所以我想实现一些可以做的事情。

SomeMetadataExtracted case 类包含event 列内的所有类型仅供参考

val currentDay = jobParameters.date.format(DateUtils.PartitionDateFormat)

val plusCar =
  MPHEventWrapper
    .loadMPHTable[SomeMetadataExtracted](
      plusTable,
      Seq(currentDay))

plusCar
  .groupByKey(row =>
    ( row.date,
      row.locale,
      row.carCode))
  .agg(
    count(when($"event.zeroToSixty" === Within5Seconds, 1 ) ).as[Long], <= need type checking here
    count(when( $"event.carLaunched" =!= Unknown, 1 ) ).as[Long], <= need type checking here
    count(when($"event.successfulLaunch" === true, 1)).as[Long])) <= need type checking here
  .map(
    case (
          (date, locale, carType),
          total_quick_cars,
          total_known_launches,
          total_successful_launches,

        ) =>
      carSpeedAggregate(
        date,
        carType,
        locale,
        total_quick_cars,
        total_known_launches,
        total_successful_launches,

      )
  )

更新代码(感谢 Artem !!!) - 新问题,代码非常占用内存

plusCar
    .groupByKey(row =>  (row.date,
        row.locale,
        row.carCode,
        ))
    .mapGroups 
        case ((date: String, locale:String, carCode: String), events: Iterator[EventWithCommonDimensions[EventCombiner[SomeMetadataExtracted, ANStep]]]) => 
          val newEvent = events.toTraversable

          val zeroToSixty = newEvent.count(e =>  e.event.exists(_. e.zeroToSixty.getOrElse("UNKNOWN") =!= FollowUpUnknown ))
          val carLaunched = newEvent.count(e =>  e.event.exists(_.carLaunched.get === Within5Seconds ))
          val successfulLaunch = newEvent.count(e => e.event.exists(_.successfulLaunch == isTrue) )

          carSpeedAggregate(
            date,
            locale,
            deviceCode,
            taskName.get,
            total_quick_hangups.toLong,
            total_followup_calls.toLong,
            total_phone_calls.toLong,
            total_call_attempts.toLong
          )
        

【问题讨论】:

您检查过数据集吗?数据集在编译时提供类型安全。 【参考方案1】:

agg 是一个用于无类型操作的函数。相反,您可以使用 groupByKey 和 mapGroups 的组合。

# Suppose SomeMetadataExtracted has following fields
case class SomeMetadataExtracted(
    date: Timestamp, 
    locale: String, 
    carCode: String, 
    zeroToSixty: String,
    carLaunched: String,
    successfulLaunch: Boolean
    )

plusCar
  .as[SomeMetadataExtracted] //you have to make following import to do like this: import spark_session.implicits._
  .groupByKey((event: SomeMetadataExtracted) =>
    ( event.date,
      event.locale,
      event.carCode))
  .mapGroups
      case ((date, locale, carCode), events: Iterator[SomeMetadataExtracted]) =>
    carSpeedAggregate(
        date,
        locale,
        carCode,
        events.count(e => e.zeroToSixty == Within5Seconds),
        events.count(e => e.carLaunched != Unknown),
        events.count(e => e.successfulLaunch)
    )

【讨论】:

一件事是datelocalecarCode 来自同一行中的不同案例类。如果我在第一个示例中按事件分组,则为 row.event 从您的示例中不清楚。你能分享一下plusCar的类型吗?如果它是 DataFrame 共享,请使用它的架构。 实际上我得到了它的工作!我将发布更改,但在此之前,似乎只有第一个 count 函数被调用。有什么原因吗? 啊,看起来像是一个 TraversableOnce 函数。我怎样才能使这个 Traversable? 我现在不明白你的问题。您可以分享更新的代码吗?在我的示例中,事件是一个 Iterator[SomeMetadataExtracted],因此您可以使用像 regular scala iterator 这样的事件

您将如何在 Scala 中实现缓存方面

】您将如何在Scala中实现缓存方面【英文标题】:HowwouldyouimplementacachingaspectinScala【发布时间】:2014-05-0710:16:20【问题描述】:头脑风暴:我正在开发一个Scala项目,我们在该项目中进行服务调用,并且需要使用memcache缓存返回值... 查看详情

如何在 spark scala 中实现 uniqueConcatenate、uniqueCount [关闭]

】如何在sparkscala中实现uniqueConcatenate、uniqueCount[关闭]【英文标题】:howtoimplementuniqueConcatenate,uniqueCountinsparkscala[closed]【发布时间】:2022-01-1017:01:58【问题描述】:我正在尝试转换数据,旧代码在Tibco中并使用uniqueConcatenate、uniqueC... 查看详情

在mongodb中实现聚合函数

随着组织产生的数据爆炸性增长,从GB到TB,从TB到PB,传统的数据库已经无法通过垂直扩展来管理如此之大数据。传统方法存储和处理数据的成本将会随着数据量增长而显著增加。这使得很多组织都在寻找一种经济的解决方案,... 查看详情

如何在 Scala 中实现 DAO?

】如何在Scala中实现DAO?【英文标题】:HowtoimplementDAOinScala?【发布时间】:2011-07-1819:24:50【问题描述】:我想在Scala中实现DAO,如下所示:traitDAO[PK,-T,-Q]//Tisa"valueobject",PKisaprimarykey,andQisqueryparameters.defcreate(t:T):Unitdefupdate(t:T):Unitdefr... 查看详情

如何在 scala 运行时知道不同的连接类型火花

】如何在scala运行时知道不同的连接类型火花【英文标题】:Howtoknowinscalaruntimethedifferentjointypesspark【发布时间】:2019-01-0408:46:25【问题描述】:我想根据可用的Spark连接类型白名单测试用户输入。有没有办法通过内置的spark了解... 查看详情

如何在 Scala 中实现 Kafka Consumer

】如何在Scala中实现KafkaConsumer【英文标题】:HowdoIimplementKafkaConsumerinScala【发布时间】:2016-12-1000:04:35【问题描述】:我正在尝试在scala中实现一个kafka消费者。我已经看过一百万篇关于如何用Java进行操作的教程,甚至有些(likethi... 查看详情

如何在 Scala 中实现真正的 Singleton

】如何在Scala中实现真正的Singleton【英文标题】:HowtoimplementarealSingletoninScala【发布时间】:2016-01-1006:09:21【问题描述】:我有混合Java/Scala项目。有一些用Java实现并使用一些Scala类的Quartz作业。这些类应该使用相同的SparkContext实... 查看详情

如何在 SQL 中实现聚合? (这与 GroupBy 无关)

】如何在SQL中实现聚合?(这与GroupBy无关)【英文标题】:HowtoimplementanAggregationinSQL?(ThisisnotaboutGroupBy)【发布时间】:2018-06-2903:51:22【问题描述】:在大学项目的范围内,我应该实现我的数据库的聚合。我得到了一个类似于这个... 查看详情

如何在scala中实现一个真正的singleton(代码片段)

我有混合Java/Scala项目。有一些Quartz作业用Java实现,并使用一些Scala类。这些类应该使用相同的SparkContext实例,所以我实现了一些应该是singleton的东西,如下所示:objectSparkContextLoadervarhasSC=falsevarsc:Any=0defgetSC(workers):SparkContext=if(!ha... 查看详情

如何在 Apache Spark Java 或 Scala 中实现这一点?

】如何在ApacheSparkJava或Scala中实现这一点?【英文标题】:HowdoIachievethisinApacheSparkJavaorScala?【发布时间】:2018-08-0102:14:44【问题描述】:汽车上的设备在行程开始时不会发送TRIPID,但会在行程结束时发送一个。如何将对应的TRIPIDS... 查看详情

在Scala spark中实现动态字符串插值?

...我有一个字符串,其中包含需要进入我预期数据帧的.agg函数的函数。我的数据数据框看起来像valclient=Seq((1,"A","D",10),(2,"A","D",5),(3,"B","C",56),(5,"B","D", 查看详情

如何要求泛型类型在泛型函数中实现 Add、Sub、Mul 或 Div 之类的操作?

】如何要求泛型类型在泛型函数中实现Add、Sub、Mul或Div之类的操作?【英文标题】:HowdoIrequireagenerictypeimplementanoperationlikeAdd,Sub,Mul,orDivinagenericfunction?【发布时间】:2015-03-2115:18:36【问题描述】:我正在尝试在Rust中实现一个通用... 查看详情

我将如何在播放框架 2.4.3 (Scala) 中实现拦截器/过滤器

】我将如何在播放框架2.4.3(Scala)中实现拦截器/过滤器【英文标题】:HowwouldIimplementanInterceptor/Filterinplayframework2.4.3(Scala)【发布时间】:2015-11-1723:33:59【问题描述】:我是PlayFramework的新手,我想实现一个拦截器或过滤器,在请求... 查看详情

根据scala中的条件对列进行火花数据框聚合

】根据scala中的条件对列进行火花数据框聚合【英文标题】:sparkdataframeaggregationofcolumnbasedonconditioninscala【发布时间】:2020-02-0309:44:15【问题描述】:我有以下格式的csv数据。我需要找到2017年营业额超过100的前2供应商。Turnover=Sum... 查看详情

如何在 Javascript 中实现 Haskell 的 FRP Behavior 类型?

】如何在Javascript中实现Haskell的FRPBehavior类型?【英文标题】:HowtoimplementHaskell\'sFRPBehaviortypeinJavascript?【发布时间】:2017-07-1111:10:15【问题描述】:我想了解Haskell中函数式反应式编程的原意,以及它与FRP在Javascript中的实际应用... 查看详情

在 XSLT 中实现 product()

...2014-10-2700:33:27【问题描述】:XPath2.0(和3.0)具有方便的聚合函数sum()和avg(),但没有返回数值原子值序列的乘积。虽然在允许赋值语句的语言中实现这样的函数是微不足道的,但在XSLT中似乎并不那么容易。我发现获得聚合产品... 查看详情

如何在 C# 中实现 Base64 URL 安全编码?

】如何在C#中实现Base64URL安全编码?【英文标题】:HowtoachieveBase64URLsafeencodinginC#?【发布时间】:2014-12-0819:52:09【问题描述】:我想在C#中实现Base64URL安全编码。在Java中,我们有一个通用的Codec库,它为我提供了一个URL安全编码字... 查看详情

如何将不同的聚合函数应用于同一列为啥对火花数据框进行分组? [复制]

】如何将不同的聚合函数应用于同一列为啥对火花数据框进行分组?[复制]【英文标题】:Howtoapplydifferentaggregationfunctionstothesamecolumnwhygroupingsparkdataframe?[duplicate]如何将不同的聚合函数应用于同一列为什么对火花数据框进行分组?... 查看详情