如何在 Spark SQL(PySpark) 中实现自增

     2023-04-15     181

关键词:

【中文标题】如何在 Spark SQL(PySpark) 中实现自增【英文标题】:How to implement auto increment in spark SQL(PySpark) 【发布时间】:2016-10-25 04:20:43 【问题描述】:

我需要在我的 spark sql 表中实现一个自动增量列,我该怎么做。请指导我。我正在使用 pyspark 2.0

谢谢 卡利安

【问题讨论】:

查看***.com/questions/31955309/… @MRSrinivas 感谢您的详细回复我会试试的,最近我尝试从 pyspark.sql.functions import monotonically_increasing_id 解决它已经工作的问题。它为从 0 开始索引的每一行提供 id,非常感谢 【参考方案1】:

我会编写/重用 stateful Hive udf 并向 pySpark 注册,因为 Spark SQL 确实对 Hive 有很好的支持。

在下面的代码中检查这一行 @UDFType(deterministic = false, stateful = true) 以确保它是有状态的 UDF。

package org.apache.hadoop.hive.contrib.udf;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.io.LongWritable;

/**
 * UDFRowSequence.
 */
@Description(name = "row_sequence",
    value = "_FUNC_() - Returns a generated row sequence number starting from 1")
@UDFType(deterministic = false, stateful = true)
public class UDFRowSequence extends UDF

  private LongWritable result = new LongWritable();

  public UDFRowSequence() 
    result.set(0);
  

  public LongWritable evaluate() 
    result.set(result.get() + 1);
    return result;
  


// End UDFRowSequence.java

现在构建 jar 并在 pyspark 启动时添加位置。

$ pyspark --jars your_jar_name.jar

然后注册sqlContext

sqlContext.sql("CREATE TEMPORARY FUNCTION row_seq AS 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'")

现在在选择查询中使用row_seq()

sqlContext.sql("SELECT row_seq(), col1, col2 FROM table_name")

Project to use Hive UDFs in pySpark

【讨论】:

我已经按照您指定的方式构建了 jar,并且还创建了临时函数。现在我创建了一个表sqlContext.sql("Create table abc(id int, name string)")sqlContext.sql("INSERT into TABLE abc SELECT row_seq(), 'John'")sqlContext.sql("INSERT into TABLE abc SELECT row_seq(), 'Tim'")。当我选择 * 语句时,我同时得到 iD 作为 1 而不是 12 您的代码中是否在标签@UDFType 内设置了stateful = true 我需要这样的东西,但问题是,它是否可以扩展 2 亿的数据。实际上,我想将包含 2 亿行的大文件分解为包含文件的确切 10K 行的较小文件。我想为每一行添加自动递增数,并在这样的帮助下批量读取(id > 10,001 和 id 是否可以在 python 中执行此 UDF?并在 sqlContext 中注册?

如何使用 jupyter notebook 在 pyspark 中的 Hive 上使用 %sql Magic 字符串启用 spark SQL

】如何使用jupyternotebook在pyspark中的Hive上使用%sqlMagic字符串启用sparkSQL【英文标题】:HowtoenablethesparkSQLwith%sqlMagicstringonHiveinpysparkusingjupyternotebook【发布时间】:2019-07-2414:01:55【问题描述】:如何在jupyternotebook上启用%sqlMagicstring,... 查看详情

在 spark 中实现 informatica 逻辑

...该填充该列的值【问题讨论】:我想你的意思是,scala、pyspark或java@theb 查看详情

在 Spark SQL (pyspark) 中将行转置为列

】在SparkSQL(pyspark)中将行转置为列【英文标题】:TransposerowstoColumnsinSparkSQL(pyspark)【发布时间】:2017-10-2507:46:05【问题描述】:我想在Spark中进行以下转换我的目标是获得输出,我希望如果我可以进行中间转换,我可以轻松获得输... 查看详情

如何在 PySpark SQL when() 子句中使用聚合值?

】如何在PySparkSQLwhen()子句中使用聚合值?【英文标题】:HowdoyouuseaggregatedvalueswithinPySparkSQLwhen()clause?【发布时间】:2021-12-0323:24:12【问题描述】:我正在尝试学习PySpark,并尝试学习如何使用SQLwhen()子句更好地对我的数据进行分... 查看详情

如何在字典中使用 pyspark.sql.functions.when() 的多个条件?

】如何在字典中使用pyspark.sql.functions.when()的多个条件?【英文标题】:HowdoIusemultipleconditionswithpyspark.sql.funtions.when()fromadict?【发布时间】:2019-08-1523:34:36【问题描述】:我想根据字典中的值生成一个when子句。它与正在做的事情... 查看详情

pyspark:如何获取 spark 数据帧的 Spark SQLContext?

】pyspark:如何获取spark数据帧的SparkSQLContext?【英文标题】:pyspark:HowtoobtaintheSparkSQLContextofthesparkdataframe?【发布时间】:2020-07-0306:07:30【问题描述】:我有一个接受sparkDataFrame的函数,我想获取DataFrame所在的Spark上下文。原因是... 查看详情

pyspark在spark sql中函数之间的使用范围

】pyspark在sparksql中函数之间的使用范围【英文标题】:pysparkuserangebetweenfunctioninsparksql【发布时间】:2019-07-1115:12:37【问题描述】:当我跑步时spark.sql(\'\'\'selectclient,avg(amount)over(partitionbyclientorderbymy_timestamprangebetweeninterval30dayspr 查看详情

如何删除 Spark 表列中的空格(Pyspark)

】如何删除Spark表列中的空格(Pyspark)【英文标题】:HowtoremoveblankspacesinSparktablecolumn(Pyspark)【发布时间】:2017-12-0316:33:31【问题描述】:我想从特定列(purch_location)中的所有值中删除空格。我使用的是spark表,而不是数据框或SQL... 查看详情

Spark - PySpark sql 错误

】Spark-PySparksql错误【英文标题】:Spark-PySparksqlerror【发布时间】:2016-10-1406:08:36【问题描述】:我有一个简单的pyspark代码,但我无法运行它。我尝试在Ubuntu系统上运行它并使用PyCharmIDE。我想连接到OracleXE数据库并且我想打印我... 查看详情

如何在 PySpark 中将 sql 函数与 UDAF 组合/链接

】如何在PySpark中将sql函数与UDAF组合/链接【英文标题】:Howtocombine/chainsqlfunctionswithUDAFsinPySpark【发布时间】:2019-11-1503:47:09【问题描述】:我正在尝试在PySpark中的Spark数据帧上使用一堆预定义的sql函数以及我自己的UDAF@F.udfdefmode(... 查看详情

如何对 Pyspark spark.sql 数据框中的数据进行同质化

】如何对Pysparkspark.sql数据框中的数据进行同质化【英文标题】:HowtohomogonizedatainaPysparkspark.sqldataframe【发布时间】:2019-04-1100:14:57【问题描述】:我下载了一个1.9GB的csv文件,其中包含AirBnB数据。尽管所有列的数据类型都是“字... 查看详情

使用 pyspark 在数据块中实现 FileNotFound 异常

】使用pyspark在数据块中实现FileNotFound异常【英文标题】:implementFileNotFoundexceptionindatabricksusingpyspark【发布时间】:2020-11-1217:01:39【问题描述】:我正在尝试在数据块中使用pyspark实现异常处理,其中我需要检查文件是否存在于源... 查看详情

在 spark sql--pyspark 中查找特定字符串

】在sparksql--pyspark中查找特定字符串【英文标题】:findspecificstringinsparksql--pyspark【发布时间】:2020-03-2613:29:34【问题描述】:我试图在员工数据框的数据框列中找到完全匹配的字符串Employeedays_presentAlex1,2,11,23,John21,23,25,28需要根... 查看详情

了解如何在 Spark 中执行 Hive SQL

...了解在Spark中查询配置单元表时会发生什么。我正在使用PySpark例如:warehouse_location=\'\\user\\hive\\warehouse\'frompyspark.sql 查看详情

如何在 Pyspark 中启用 Apache Arrow

】如何在Pyspark中启用ApacheArrow【英文标题】:howtoenableApacheArrowinPyspark【发布时间】:2020-02-0417:21:34【问题描述】:我正在尝试启用ApacheArrow以转换为Pandas。我正在使用:pyspark2.4.4pyarrow0.15.0熊猫0.25.1numpy1.17.2这是示例代码spark.conf.s... 查看详情

如何在 pyspark.sql.functions.when() 中使用多个条件?

】如何在pyspark.sql.functions.when()中使用多个条件?【英文标题】:HowdoIusemultipleconditionswithpyspark.sql.functions.when()?【发布时间】:2015-10-1514:56:35【问题描述】:我有一个包含几列的数据框。现在我想从其他2列派生一个新列:frompyspar... 查看详情

PYSPARK:如何将带有多个 case 语句的 SQL 查询转换为 Pyspark/Pyspark-SQL?

】PYSPARK:如何将带有多个case语句的SQL查询转换为Pyspark/Pyspark-SQL?【英文标题】:PYSPARK:HowtocovertSQLquerywithmultiplecasestatementstoPyspark/Pyspark-SQL?【发布时间】:2022-01-1908:14:04【问题描述】:我有两组带有多个case语句的查询。我需要... 查看详情

如何融化 Spark DataFrame?

...rkDataFrame?【发布时间】:2017-05-3022:05:33【问题描述】:在PySpark或至少在Scala中,ApacheSpark中是否有相当于PandasMelt的函数?到目前为止,我一直在Python中运行一个示例数据集,现在我想对整个数据集使用Spark。【问题讨论】:另见u... 查看详情