我有两个事件流
-
L = (l1、l3、l8、...)- 比较稀疏,表示用户登录某个 IP
-
E = (e2、e4、e5、e9、...)- 是特定IP的日志流
较低的索引代表时间戳...如果我们将两个流连接在一起并按时间排序我们会得到:
-
l1, e2, l3, e4, e5, l8, e9, ...
是否可以实现自定义Window
/ Trigger
将事件分组到会话的函数(不同用户登录之间的时间):
-
l1 - l3 : e2
-
l3 - l8 : e4, e5
-
l8 - l14 : e9, e10, e11, e12, e13
- ...
我看到的问题是两个流不一定是排序的。我考虑过按时间戳对输入流进行排序。那么使用窗口化就很容易实现GlobalWindow
和定制Trigger
——然而,这似乎是不可能的。
我是否遗漏了一些东西,或者在当前的 Flink(v1.3.2)中绝对不可能这样做?
Thanks
问题:E3不应该在L4之前吗?
排序非常简单,使用ProcessFunction
。像这样的事情:
public static class SortFunction extends ProcessFunction<Event, Event> {
private ValueState<PriorityQueue<Event>> queueState = null;
@Override
public void open(Configuration config) {
ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
// state name
"sorted-events",
// type information of state
TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
}));
queueState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
PriorityQueue<Event> queue = queueState.value();
if (queue == null) {
queue = new PriorityQueue<>(10);
}
queue.add(event);
queueState.update(queue);
timerService.registerEventTimeTimer(event.timestamp);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
PriorityQueue<Event> queue = queueState.value();
Long watermark = context.timerService().currentWatermark();
Event head = queue.peek();
while (head != null && head.timestamp <= watermark) {
out.collect(head);
queue.remove(head);
head = queue.peek();
}
}
}
更新:参见如何使用 Flink 对无序事件时间流进行排序了解一般更好的方法的描述。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)