如何重新采样(下采样)时间序列大数据,从 10 Hz(毫秒)想要使用 pyspark 转换为 1 Hz(秒)

     2023-03-23     106

关键词:

【中文标题】如何重新采样(下采样)时间序列大数据,从 10 Hz(毫秒)想要使用 pyspark 转换为 1 Hz(秒)【英文标题】:How to resample (Downsample) the time series big data, from 10 Hz (miliseconds) wants to convert to 1 Hz (seconds) using pyspark 【发布时间】:2021-12-28 13:04:10 【问题描述】:

我正在使用 pyspark 处理时间序列大数据,我有 GB(100 GB 或更多)的数据,行数以百万或十亿为单位。我是使用 pyspark 的大数据新手。想要重新采样(下采样)数据原始数据以毫秒为单位的时间戳为 10 Hz 我想以秒为单位将此数据转换为 1 Hz。如果您能给我一些想法,那将非常有帮助。如果您可以向我推荐任何我可以用来使用 spark 处理(大数据)大数据的文档/解决方案,那也很棒。以下是样本数据。 DF=

start_timestamp end_timestamp value
2020-11-05 03:25:02.088 2020-11-05 04:10:19.288 0.0
2020-11-05 04:24:25.288 2020-11-05 04:24:25.218 0.4375
2020-11-05 04:24:25.218 2020-11-05 04:24:25.318 1.0625
2020-11-05 04:24:25.318 2020-11-05 04:24:25.418 1.21875
2020-11-05 04:24:25.418 2020-11-05 04:24:25.518 1.234375
2020-11-05 04:24:25.518 2020-11-05 04:24:25.618 1.265625
2020-11-05 04:24:25.618 2020-11-05 04:24:25.718 1.28125

我尝试了我得到的代码:PySpark: how to resample frequencies

这是我的示例代码:

day = 1   #60 * 60 * 24
epoch = (col("start_timestamp").cast("bigint") / day).cast("bigint") * day

with_epoch = distinctDF.withColumn("epoch", epoch)

min_epoch, max_epoch = with_epoch.select(min_("epoch"), max_("epoch")).first()


ref = spark.range(
    min_epoch, max_epoch + 1, day
).toDF("epoch")  
(ref
    .join(with_epoch, "epoch", "left")
    .orderBy("epoch")
    .withColumn("start_timestamp_resampled", timestamp_seconds("epoch"))
    .show(15, False))

代码正在运行,但我不确定它是否正确:输出如下所示。但它是否在列中显示空值。

epoch start_timestamp end_timestamp value start_timestamp_resampled
1604546702 2020-11-05 03:25:02.088 2020-11-05 04:10:19.288 0.0 2020-11-05 03:25:02
1604546703 null null null 2020-11-05 03:25:03
1604546704 null null null 2020-11-05 03:25:04
1604546705 null null null 2020-11-05 03:25:05
1604546706 null null null 2020-11-05 03:25:06
1604546707 null null null 2020-11-05 03:25:07

【问题讨论】:

欢迎来到Stack Overflow.!要求有关问题方法的一般指导的问题通常过于宽泛,不适合本网站。人们有自己解决问题的方法,因此不可能有正确的答案。仔细阅读 Where to Start 和 Minimal Reproducible Example,然后编辑您的帖子。 10Hz的频率不是毫秒周期,所以你的问题标题已经自相矛盾了。 @UlrichEckhardt 我的时间戳看起来像这样“2020-11-05 03:25:02.088”所以 0.088 以毫秒为单位,所有数据都以 10 Hz 记录。所以想将其下采样到 1Hz(即毫秒到秒)。 Hold on:有单位(如1.52s中的秒)、分辨率(如1/50秒或0.02s)和采样率(如20Hz频率或0.05s周期)。请注意,您的时间戳没有单位,而是不同单位的混合形式(不确定正确的术语)。然后,您可以下采样但仍保持分辨率。您还可以转换单位,例如从那种混合形式秒或分钟。此外,当然,您可以放弃精度并舍入到更小的分辨率。你到底想要什么?也许,对于一组示例输入,您可以提供预期的输出。 【参考方案1】:

在进行下采样时,您必须考虑如何处理丢失的数据。

使用连接,您只会在时间戳匹配时获取数据。但您也可以决定使用以下方法聚合数据点:均值、最大值、最小值、总和...

我会怎么做:

import pyspark.sql.functions as F
df = df.withColumn("Timestamp_resampled", F.date_trunc(timestamp, format='yyyy-MM-dd HH:mm:ss'))
df = df.groupby("Timestamp_resampled").agg(<function of your choice>)

然后,一旦重新采样,如果您缺少时间戳,您可以使用带有 joinepoch_range 的方法来填充缺少的时间戳,并确保每秒都有一个。

【讨论】:

谢谢,因为我的主 DF 列“start_timestamp”以毫秒为单位(格式='yyyy-MM-dd HH:mm:ss.sss)。我有点困惑并得到错误:“语法错误:位置参数遵循关键字参数”。根据您的解决方案,此代码应该进行下采样,对吗? ''' DF1 = DF.withColumn("start_timestamp", F.date_trunc(format='yyyy-MM-dd HH:mm:ss', timestamp)) df = DF1.groupby("start_timestamp").agg(mean) ''' @SSS,尝试传递“时间戳”参数拳头,然后传递“格式”,这就是错误的含义。 感谢解决方案,它运行良好。 :) @SSS,我已经更新了答案,如果可行,请随时接受!

如何将 pandas Dataframe 时间序列数据从 8hz 重新采样到 16hz?

】如何将pandasDataframe时间序列数据从8hz重新采样到16hz?【英文标题】:HowtoresamplepandasDataframetimeseriesdatafrom8hzto16hz?【发布时间】:2021-11-0802:30:08【问题描述】:我在pandasDataframe中有时间序列数据,采样率为8hz,即每秒8个样本。... 查看详情

pyspark:在日期和时间上重新采样 pyspark 数据帧

...meondateandtime【发布时间】:2020-06-2813:27:27【问题描述】:如何重新采样pyspark数据帧,就像在pandas中我们有pd.grouper和pd.resample一样,我可以在h、2h、3h、week上重新采样。我有以下示例pyspark数据框,我如何在列ind和d 查看详情

如何在不更改特定列的情况下对数据框中的数据进行重新采样?

】如何在不更改特定列的情况下对数据框中的数据进行重新采样?【英文标题】:Howtoresampledataindataframewithoutchangingonespecificcolumn?【发布时间】:2019-11-1814:59:51【问题描述】:例如,我有几行想要重新采样到1秒的时间范围,但我... 查看详情

如何在不更改特定列的情况下对数据框中的数据进行重新采样?

】如何在不更改特定列的情况下对数据框中的数据进行重新采样?【英文标题】:Howtoresampledataindataframewithoutchangingonespecificcolumn?【发布时间】:2019-11-1814:59:51【问题描述】:例如,我有几行想要重新采样到1秒的时间范围,但我... 查看详情

如何从最后一行开始对时间序列数据进行反向重采样?

】如何从最后一行开始对时间序列数据进行反向重采样?【英文标题】:Howtodobackwardresamplingontimeseriesdatastartingfromthelastrow?【发布时间】:2019-11-2708:20:39【问题描述】:我有几行数据(每秒),我曾经以两个小时为单位重新采样... 查看详情

如何在管道中重新采样文本(不平衡组)?

】如何在管道中重新采样文本(不平衡组)?【英文标题】:Howtoresampletext(imbalancedgroups)inapipeline?【发布时间】:2019-06-0416:43:15【问题描述】:我正在尝试使用MultinomialNB进行一些文本分类,但我遇到了问题,因为我的数据不平衡... 查看详情

如何对具有多列的df重新采样

】如何对具有多列的df重新采样【英文标题】:Howtoresampleadfwithmultiplecolumns【发布时间】:2022-01-0507:33:24【问题描述】:我有多个请求的分钟数据。我想将其重新采样为每小时并按请求分组,以便我可以获得每小时的请求总数这... 查看详情

从大型数据集中采样

...1:14:48【问题描述】:我有一个包含112k行和2列的数据集。如何从这个数据集中平均采样以获得一个大约10k行的小数据集?我的意思是相等,因为这个数据集有56k行,列名为True=1,56k行,列名为``´True=0```。所以我想用True=1列采样10... 查看详情

无法从 1 分钟到 5 分钟的数据重新采样 pandas 时间序列

】无法从1分钟到5分钟的数据重新采样pandas时间序列【英文标题】:Troubleresamplingpandastimeseriesfrom1minto5mindata【发布时间】:2022-01-0918:51:13【问题描述】:我有一个1分钟间隔的盘中股票数据,如下所示:importyfinanceasyfimportpandasaspdn=... 查看详情

在 PyQtGraph 中绘制大时间序列时使用预下采样数据

】在PyQtGraph中绘制大时间序列时使用预下采样数据【英文标题】:Usingpre-downsampleddatawhenplottinglargetimeseriesinPyQtGraph【发布时间】:2015-08-1010:56:33【问题描述】:我需要在PyQtGraph中绘制一个大的时间序列(数百万个点)。按原样绘... 查看详情

熊猫数据框每天重新采样,没有日期时间索引

】熊猫数据框每天重新采样,没有日期时间索引【英文标题】:pandasdataframeresampleperdaywithoutdatetimeindex【发布时间】:2016-10-1623:18:54【问题描述】:我有一个如下形式的熊猫数据框:timestampslight72004-02-2800:58:45150.88262004-02-2800:59:4514... 查看详情

将音频缓冲区从 44100 重新采样到 16000

】将音频缓冲区从44100重新采样到16000【英文标题】:resampleaudiobufferfrom44100to16000【发布时间】:2014-12-2207:24:53【问题描述】:我有data-uri格式的音频数据,然后我将此data-uri转换为缓冲区,现在我需要此缓冲区数据以新的采样率... 查看详情

如何使不同长度的不同数据帧长度相等(下采样和上采样)

...述】:我有许多长度在28到179之间的不同长度的数据帧(时间序列)。我需要将它们全部设为104。(对低于104的数据进行上采样,对 查看详情

如何在 Pandas/Numpy 中使用 dateOffset 对日内时间序列数据进行重新采样?

】如何在Pandas/Numpy中使用dateOffset对日内时间序列数据进行重新采样?【英文标题】:HowtodoresampleofintradaytimeseriesdatawithdateOffsetinPandas/Numpy?【发布时间】:2014-11-1203:00:32【问题描述】:我正在处理期货数据,即当天在00:00:00之前开... 查看详情

如何将不规则的时间序列重新采样为每日频率并跨越到今天?

】如何将不规则的时间序列重新采样为每日频率并跨越到今天?【英文标题】:Howtoresampleirregulartimeseriestodailyfrequencyandhaveitspantotoday?【发布时间】:2019-04-1308:42:49【问题描述】:我有一个不规则间隔(相对于时间频率)的熊猫数... 查看详情

如何计算熊猫中重新采样的多索引数据帧

】如何计算熊猫中重新采样的多索引数据帧【英文标题】:HowcanIcountaresampledmulti-indexeddataframeinpandas【发布时间】:2014-07-0411:56:58【问题描述】:我找到了有关如何重新采样多索引的说明:ResamplingWithinaPandasMultiIndex但是,一旦我... 查看详情

如何在不使用 ffmpeg 保持视频持续时间的情况下重新采样 FPS?

】如何在不使用ffmpeg保持视频持续时间的情况下重新采样FPS?【英文标题】:HowtoresampleFPSwithoutkeepingdurationofthevideousingffmpeg?【发布时间】:2014-08-3112:26:27【问题描述】:我有一个50fps的avi视频,但相机以25fps的速度拍摄。因此,... 查看详情

如何使用 NAudio 将原始音频从 WasapiCapture 重新采样到 g711 mulaw?

】如何使用NAudio将原始音频从WasapiCapture重新采样到g711mulaw?【英文标题】:HowtoresamplerawaudiofromWasapiCapturetog711mulawwithNAudio?【发布时间】:2019-07-1022:19:54【问题描述】:我正在尝试使用NAudio使客户端模拟软件电话,通过捕获本地... 查看详情