(
df
.join_asof(
df
.filter(pl.col('value').is_not_null())
.with_columns(
gap_time=(pl.col('timestamp')-pl.col('timestamp').shift().over('id'))
.dt.seconds(),
prev_good_time=pl.col('timestamp').shift().over('id'),
prev_good_value=pl.col('value').shift().over('id')
)
.drop('value'),
on='timestamp', by='id', strategy='forward'
)
.with_columns(
gap_value=pl.when(pl.col('value').is_null())
.then((pl.col('value')-((pl.col('value')
.forward_fill().shift()
).over('id'))).backward_fill()),
gap_time=pl.when(pl.col('value').is_null())
.then(pl.col('gap_time')),
gap_proportion=pl.when(pl.col('value').is_null())
.then((pl.col('timestamp')-pl.col('prev_good_time')).dt.seconds()),
)
.with_columns(
portion=pl.col('gap_value')*(pl.col('gap_proportion')/pl.col('gap_time'))
)
.with_columns(
fill_value=pl.col('prev_good_value')+pl.col('portion')
)
.select(
'id','timestamp',
value=pl.when(pl.col('value').is_null())
.then(pl.col('fill_value'))
.otherwise(
pl.col('value')
)
)
)
我们做的第一件事是对原始版本的过滤版本执行 join_asof 。这使我们能够计算有效值之间的时间,并留出与非空值和值本身关联的最近时间。 join 的 asof 部分表示将加入on基于时间但滚动直到找到下一个(或上一个)匹配时间,然后by其他一些等式列。
您可以嵌套大部分其余的计算,而无需重复自己或使用如此多的上下文,但我让它非常冗长,因此很容易解构。之所以有这么多的电话with_columns
的问题是您无法在同一上下文中设置和使用列,因此每当您创建要再次使用的列时,您都必须链接另一个上下文。
输出(不包括中间列)
shape: (8, 3)
┌─────┬─────────────────────┬───────────┐
│ id ┆ timestamp ┆ value │
│ --- ┆ --- ┆ --- │
│ str ┆ datetime[μs] ┆ f64 │
╞═════╪═════════════════════╪═══════════╡
│ a ┆ 2023-09-13 14:05:34 ┆ 10.0 │
│ a ┆ 2023-09-13 14:15:04 ┆ 17.770961 │
│ a ┆ 2023-09-13 14:30:01 ┆ 30.0 │
│ b ┆ 2023-09-13 12:12:02 ┆ 5.0 │
│ b ┆ 2023-09-13 12:15:02 ┆ 10.0 │
│ b ┆ 2023-09-13 12:30:07 ┆ 20.055556 │
│ b ┆ 2023-09-13 12:45:01 ┆ 29.988889 │
│ b ┆ 2023-09-13 13:00:02 ┆ 40.0 │
└─────┴─────────────────────┴───────────┘
numpy 可以做到
这是一种让 numpy 完成这项工作的 hacky(就好像上面的内容不是 hacky 一样)的方法。
finaldf=[]
df=df.with_columns(pl.col('value').cast(pl.Float64))
for little_df in df.partition_by('id'):
x=little_df.filter(pl.col('value').is_null()).select(pl.col('timestamp').to_physical()).to_numpy()
xp,fp = little_df.filter(pl.col('value').is_not_null()).select('timestamp','value').to_numpy().transpose()
finaldf.append(
pl.concat([
little_df.filter(pl.col('value').is_not_null()).lazy(),
little_df.filter(pl.col('value').is_null()).with_columns(value=pl.Series(np.interp(x, xp, fp).transpose()[0])).lazy()
])
)
finaldf=pl.concat(finaldf).sort(['id','timestamp']).collect()
finaldf
shape: (8, 3)
┌─────┬─────────────────────┬───────────┐
│ id ┆ timestamp ┆ value │
│ --- ┆ --- ┆ --- │
│ str ┆ datetime[μs] ┆ f64 │
╞═════╪═════════════════════╪═══════════╡
│ a ┆ 2023-09-13 14:05:34 ┆ 10.0 │
│ a ┆ 2023-09-13 14:15:04 ┆ 17.770961 │
│ a ┆ 2023-09-13 14:30:01 ┆ 30.0 │
│ b ┆ 2023-09-13 12:12:02 ┆ 5.0 │
│ b ┆ 2023-09-13 12:15:02 ┆ 10.0 │
│ b ┆ 2023-09-13 12:30:07 ┆ 20.055556 │
│ b ┆ 2023-09-13 12:45:01 ┆ 29.988889 │
│ b ┆ 2023-09-13 13:00:02 ┆ 40.0 │
└─────┴─────────────────────┴───────────┘
另一种更简洁的极坐标方式
在第一轮中,我专注于复制所有相同的中间列,但如果我直接寻找答案,我们可以做到这一点......
(
df.join_asof(
df.filter(pl.col('value').is_not_null())
.with_columns(
value_slope=(pl.col('value')-pl.col('value').shift().over('id'))/(pl.col('timestamp')-pl.col('timestamp').shift().over('id')),
value_slope_since=pl.col('timestamp').shift(),
value_base=pl.col('value').shift()
)
.drop('value'),
on='timestamp', by='id', strategy='forward'
)
.select('id','timestamp',value=pl.coalesce(pl.col('value'), pl.col('value_base')+pl.col('value_slope')*(pl.col('timestamp')-pl.col('value_slope_since'))))
)
可扩展的功能
def interp(df, y_col, id_cols=None):
if not isinstance(y_col, str):
raise ValueError("y_col should be string")
if isinstance(id_cols, str):
id_cols=[id_cols]
if id_cols is None:
id_cols=['__dummyid']
df=df.with_columns(__dummyid=0)
lf=df.select(id_cols + [y_col]).lazy()
value_cols=[x for x in df.columns if x not in id_cols and x!=y_col]
for value_col in value_cols:
lf=lf.join(
df.join_asof(
df.filter(pl.col(value_col).is_not_null())
.select(
*id_cols, y_col,
__value_slope=(pl.col(value_col)-pl.col(value_col).shift().over(id_cols))/(pl.col(y_col)-pl.col(y_col).shift().over(id_cols)),
__value_slope_since=pl.col(y_col).shift(),
__value_base=pl.col(value_col).shift()
),
on=y_col, by=id_cols, strategy='forward'
)
.select(
id_cols+ [y_col] + [pl.coalesce(pl.col(value_col),
pl.coalesce(pl.col('__value_base'), pl.col('__value_base').shift(-1))+
pl.coalesce(pl.col('__value_slope'), pl.col('__value_slope').shift(-1))*(pl.col(y_col)-
pl.coalesce(pl.col('__value_slope_since'), pl.col('__value_slope_since').shift(-1)))).alias(value_col)]
)
.lazy(),
on=[y_col]+id_cols
)
if id_cols[0]=='__dummyid':
lf=lf.select(pl.exclude('__dummyid'))
return lf.collect()
有了这个功能你就可以做
interp(df, "timestamp", "id")
其中第一个参数是 df,第二个参数是您的时间或 y 列。第三个可选参数是如果您有 id 列(它可以采用列表或单个字符串)。它将推断 df 中未作为时间或 id 列提供给它的任何列都是值,并且它将对它们进行插值。
如果你能将它修补到pl.DataFrame
您可以将它用作数据框方法,如下所示
pl.DataFrame.interp=interp
df.interp('timestamp','id')