将数据帧从 spark 集群写入 cassandra 集群:分区和性能调优

     2023-03-17     71

关键词:

【中文标题】将数据帧从 spark 集群写入 cassandra 集群:分区和性能调优【英文标题】:Write dataframe from spark cluster to cassandra cluster: Partitioning and Performance Tuning 【发布时间】:2020-09-27 03:55:38 【问题描述】:

我有两个集群 - 1. Cloudera Hadoop-Spark 作业在这里运行 2. Cloud - Cassandra集群,多个DC

在将数据帧从我的 spark 作业写入 cassandra 集群时,我在写入前在 spark 中进行了重新分区 (repartionCount=10)。见下文:

import org.apache.spark.sql.cassandra._
records.repartition(repartitionCount).write.cassandraFormat(table, keySpace)
  .mode(SaveMode.Append)
  .options(options)
  .option(CassandraConnectorConf.LocalDCParam.name, cassandraDC.name)
  .option(CassandraConnectorConf.ConnectionHostParam.name, cassandraDC.hosts)
  .save()

在我的多租户 Spark 集群中,对于具有 20M 条记录的 Spark 批处理加载,以及低于配置,我看到很多任务失败、资源抢占和运行失败。

spark.cassandra.output.batch.grouping.buffer.size=1000
spark.cassandra.output.batch.grouping.key=partition
spark.cassandra.output.concurrent.writes=20 
spark.cassandra.connection.compression=LZ4

我应该如何调整这个?重新分区是罪魁祸首吗?

PS:我一开始的理解是:对于 20M 行的负载,“重新分区”应该将负载平均分配给 executors(每个分区有 2M 行),并且批处理将在这些分区级别(在 2M 行)。但是现在,如果 spark-cassandra-connector 在整个数据帧级别(整个 20M 行)上进行批处理,我怀疑这是否会导致不必要的洗牌。

更新:删除“重新分区”大大降低了我的 cloudera spark 集群的性能(在 spark 级别设置的默认分区是 -spark.sql.shuffle.partitions: 200),所以我深入挖掘了一下,发现我最初的理解是正确的。请注意我的 spark 和 cassandra 集群是不同的。 Datastax spark-cassandra-connector 使用 cassandra 协调器节点为每个分区打开一个连接,所以我决定让它保持不变。正如亚历克斯建议的那样,我已经减少了并发写入,我相信这应该会有所帮助。

【问题讨论】:

【参考方案1】:

您不需要在 Spark 中进行重新分区 - 只需将数据从 Spark 写入 Cassandra,不要尝试更改 Spark Cassandra 连接器的默认值 - 它们在大多数情况下都可以正常工作。你需要看看发生了什么样的阶段失败——很可能你只是因为spark.cassandra.output.concurrent.writes=20而重载了Cassandra(使用默认值(5))——有时更少的写入器有助于更快地写入数据,因为你不会过载Cassandra,并且作业没有重新启动。

附: partitionspark.cassandra.output.batch.grouping.key - 它不是 Spark 分区,它是 Cassandra 分区,它取决于分区键列的值。

【讨论】:

非常感谢。随着越来越多的批处理作业并行写入 cassandra 集群,我的集群中出现了很多问题。每个批处理作业尽管很大,但具有很高的基数,其中单个分区 (cassandra) 通常可能少于 1000 行,因此批处理在运行时可能会更小,从而导致对 cassandra 的写入请求更多。此外,我在极少数情况下观察到 cassandra 读取在大量写入期间会下降,尽管写入本身总是非常快。在我的场景中,replica_set 是一个不错的选择,以及上述建议。 如果您的 Cassandra 节点在读取期间经常出现故障,您可能需要调整输入参数。例如,使用LOCAL_ONE 读取通常会使节点过载,而使用LOCAL_QUORUM 读取会减少单个节点的负载,并且因为它不会崩溃,所以它读取速度更快,尽管LOCAL_QUORUMLOCAL_ONE 我们使用 EACH_QUORUM 写入,使用 LOCAL_QUORUM 读取。 Cassandra 并没有停机,但在大量写入期间读取延迟增加了许多倍。 这在意料之中,但主要是你的节点没有关闭

将数据帧从 Spark 转换为 DL4j 使用的类型

】将数据帧从Spark转换为DL4j使用的类型【英文标题】:ConvertingDataframefromSparktothetypeusedbyDL4j【发布时间】:2018-09-1815:13:20【问题描述】:有没有什么方便的方法可以将Spark中的Dataframe转换为DL4j使用的类型?目前在DL4j算法中使用Dar... 查看详情

将 spark 数据帧写入 postgres 数据库

】将spark数据帧写入postgres数据库【英文标题】:WritesparkdataframetopostgresDatabase【发布时间】:2016-08-0809:40:13【问题描述】:spark集群设置如下:conf[\'SparkConfiguration\']=SparkConf()\\.setMaster(\'yarn-client\')\\.setAppName("test")\\.set("spark.execu 查看详情

将 Spark Dataframe 直接写入 HIVE 需要太多时间

...】:2017-11-1419:43:05【问题描述】:我正在使用PySpark将2个数据帧从Spark直接写入Hive。第一个df只有一行7列。第二个df有20M行和20列。写1个df(1row)需要10分钟,而在第二个DF中写1M行需要大约30分 查看详情

无法使用 jdbc 将 spark 数据集写入数据库

】无法使用jdbc将spark数据集写入数据库【英文标题】:Notabletowritesparkdatasettodatabaseusingjdbc【发布时间】:2019-05-1514:08:33【问题描述】:我需要将我的spark数据集写入oracle数据库表。我正在使用带有附加模式的数据集写入方法。但... 查看详情

如何将pyspark数据帧写入不同的hadoop集群

】如何将pyspark数据帧写入不同的hadoop集群【英文标题】:Howtowritepysparkdataframetodifferenthadoopcluster【发布时间】:2020-06-0313:33:16【问题描述】:我正在尝试备份从pyspark程序写入hadoop集群的数据。我可以这样做吗?现在,我正在采... 查看详情

如何将使用 Pandas 在 Spark 集群上编写的文件移动到 HDFS?

】如何将使用Pandas在Spark集群上编写的文件移动到HDFS?【英文标题】:HowtomovefileswrittenwithPandasonSparkclustertoHDFS?【发布时间】:2021-02-1618:10:42【问题描述】:我正在使用集群模式运行Spark作业并使用Pandas写入一些文件,我认为它正... 查看详情

YARN 如何在集群模式下了解 Apache Spark 中的数据局部性

】YARN如何在集群模式下了解ApacheSpark中的数据局部性【英文标题】:HowYARNknowsdatalocalityinApachesparkinclustermode【发布时间】:2018-04-2014:51:29【问题描述】:假设有一个Spark作业将从HDFS读取一个名为records.txt的文件,并进行一些转换... 查看详情

Spark的S3角色授权?

...的Cloudera上使用pyspark2(版本2.0.0.cloudera1)我正在尝试将数据帧从Spark写入S3存储,但由于身份验证而失败:pyspark.sql.utils.IllegalArgumentException:u\'AWS访问密钥ID和Secre 查看详情

如何将 Spark SQL 批处理作业结果写入 Apache Druid?

...】:2019-11-1108:34:35【问题描述】:我想将Spark批处理结果数据写入ApacheDruid。我知道Druid有本地批量摄取,例如index_parallel。Druid在同一个集群中运行Map-Reduce作业。但我只想用Drui 查看详情

如何将 Spark 数据帧写入 impala 数据库

】如何将Spark数据帧写入impala数据库【英文标题】:Howtowritesparkdataframetoimpaladatabase【发布时间】:2016-08-1913:30:29【问题描述】:我使用以下代码通过JDBC连接将spark数据帧写入impala。df.write.mode("append").jdbc(url="jdbc:impala://10.61.1.101:210... 查看详情

将 Spark 数据帧写入带分区的 CSV

】将Spark数据帧写入带分区的CSV【英文标题】:WriteSparkdataframeasCSVwithpartitions【发布时间】:2016-05-2912:30:45【问题描述】:我正在尝试将Spark中的数据帧写入HDFS位置,我希望如果我添加partitionBy符号Spark将创建分区(类似于Parquet格... 查看详情

将 Spark 数据帧写入带分区的 CSV

】将Spark数据帧写入带分区的CSV【英文标题】:WriteSparkdataframeasCSVwithpartitions【发布时间】:2016-05-2912:30:45【问题描述】:我正在尝试将Spark中的数据帧写入HDFS位置,我希望如果我添加partitionBy符号Spark将创建分区(类似于Parquet格... 查看详情

将 Spark 流数据帧写入 MongoDB

】将Spark流数据帧写入MongoDB【英文标题】:WriteaSparkstreamingDataframeintoMongoDB【发布时间】:2018-06-1110:51:12【问题描述】:我在Spark中有一个StreamingDataset具有特定模式。当我想对其进行查询时,我会调用:StreamingQueryquery=querydf.writeSt... 查看详情

将 Spark 数据帧写入 postgres db 时出错

】将Spark数据帧写入postgresdb时出错【英文标题】:ErrorwritingSparkdataframetopostgresdb【发布时间】:2018-03-1322:31:45【问题描述】:我正在尝试将一些元数据从我的Spark作业写入postgres数据库。不断出现以下错误:2018-03-1315:23:40,599rootERRO... 查看详情

将 Spark 结构化流数据写入 Cassandra

】将Spark结构化流数据写入Cassandra【英文标题】:WritingSparkStructureStreamingdataintoCassandra【发布时间】:2018-10-0619:36:01【问题描述】:我想使用PysparkAPI将结构流数据写入Cassandra。我的数据流如下:Nifi->Kafka->SparkStructureStreaming->... 查看详情

spark将数据写入mysql的共享表

】spark将数据写入mysql的共享表【英文标题】:sparkwritingdataintosharetableofmysql【发布时间】:2020-04-0711:41:11【问题描述】:我有一个周期性的spark-scala任务,用于将数据从Hive传输到MySQL。table的结构可以简单的看成:+------+------+|id|na... 查看详情

Spark:如何有效地将数据帧写入 S3

】Spark:如何有效地将数据帧写入S3【英文标题】:Spark:howtowritedataframetoS3efficiently【发布时间】:2020-10-2816:23:46【问题描述】:我正在尝试找出使用(Py)Spark将数据写入S3的最佳方式。我从S3存储桶读取似乎没有问题,但是当我需要... 查看详情

Spark 将数据帧直接从 Hive 写入本地文件系统

】Spark将数据帧直接从Hive写入本地文件系统【英文标题】:SparkwriteDataframesdirectlyfromHivetolocalfilesystem【发布时间】:2019-05-1402:13:17【问题描述】:这个问题几乎是这里要求的复制品:WritingfilestolocalsystemwithSparkinClustermode但我的查... 查看详情