我在 C# 项目中使用 Reactive 库根据配置的策略对数据进行分组。所有这些数据都实现了以下接口
public interface IPoint
{
object Value { get; }
DateTimeOffset Timestamp { get; }
}
我必须实施的分组策略之一是创建固定时间大小的不可重叠的组,查看反应性文档可以使用以下方法来实现Buffer(TimeSpan)
功能。这正是我所需要的,但是我不需要使用运行时计算的时间戳,而是使用中定义的时间戳Timestamp
我的对象的财产。
I found 这个解决方案 https://stackoverflow.com/questions/27501887/can-an-observable-stream-be-partitioned-by-a-timestamp-similar-to-buffer-or-win,这似乎很有效:
public void Subscribe(Action<IEnumerable<IPoint>> callback)
{
long windowSizeTicks = TimeRange.Ticks; // TimeRange is my TimeSpan "buffer" size
dataPoints.GroupByUntil(x => x.Timestamp.Ticks / windowSizeTicks,
g => dataPoints.Where(x => x.Timestamp.Ticks / windowSizeTicks != g.Key))
.SelectMany(x => x.ToList())
.Subscribe(callback);
// dataPoints is ISubject<IPoint>
}
该解决方案只是根据 Ticks 整除创建组TimeRange
,如果第一项不能被它整除,它将无法正常工作。
举个例子来解释我的意思:考虑以下几点
Value: 1, Timestamp: "2021-04-26T00:00:01"
Value: 2, Timestamp: "2021-04-26T00:00:02"
Value: 3, Timestamp: "2021-04-26T00:00:03"
Value: 4, Timestamp: "2021-04-26T00:00:04"
和 2 秒的“缓冲区大小”,我希望它们被分组为[1, 2], [3, 4]
,但我却收到了[1], [2, 3], [4]
。发生这种情况是因为创建分组键时考虑的是绝对时间,而不是与数据列表开头的差异。
我可以保存第一项的时间戳并以这种方式更改分组功能,但我认为(或者至少我希望)可能有更好的解决方案:
public void Subscribe(Action<IEnumerable<IPoint>> callback)
{
long windowSizeTicks = TimeRange.Ticks; // TimeRange is my TimeSpan "buffer" size
dataPoints.GroupByUntil(x => (x.Timestamp.Ticks - firstPoint.Timestamp.Ticks) / windowSizeTicks,
g => dataPoints.Where(x => (x.Timestamp.Ticks - firstPoint.Timestamp.Ticks) / windowSizeTicks != g.Key))
.SelectMany(x => x.ToList())
.Subscribe(callback);
// dataPoints is ISubject<IPoint>
}
我是 Reactive 的新手,所以欢迎任何有用的评论。
谢谢。