对流并集进行排序以识别 Apache Flink 中的用户会话

2023-12-07

我有两个事件流

  • L = (l1、l3、l8、...)- 比较稀疏,表示用户登录某个 IP
  • E = (e2、e4、e5、e9、...)- 是特定IP的日志流

较低的索引代表时间戳...如果我们将两个流连接在一起并按时间排序我们会得到:

  • l1, e2, l3, e4, e5, l8, e9, ...

是否可以实现自定义Window / Trigger将事件分组到会话的函数(不同用户登录之间的时间):

  • l1 - l3 : e2
  • l3 - l8 : e4, e5
  • l8 - l14 : e9, e10, e11, e12, e13
  • ...

我看到的问题是两个流不一定是排序的。我考虑过按时间戳对输入流进行排序。那么使用窗口化就很容易实现GlobalWindow和定制Trigger——然而,这似乎是不可能的。

我是否遗漏了一些东西,或者在当前的 Flink(v1.3.2)中绝对不可能这样做?

Thanks


问题:E3不应该在L4之前吗?

排序非常简单,使用ProcessFunction。像这样的事情:

public static class SortFunction extends ProcessFunction<Event, Event> {
  private ValueState<PriorityQueue<Event>> queueState = null;

  @Override
  public void open(Configuration config) {
    ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
        // state name
        "sorted-events",
        // type information of state
        TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
        }));
    queueState = getRuntimeContext().getState(descriptor);
  }

  @Override
  public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
    TimerService timerService = context.timerService();

    if (context.timestamp() > timerService.currentWatermark()) {
      PriorityQueue<Event> queue = queueState.value();
      if (queue == null) {
        queue = new PriorityQueue<>(10);
      }
      queue.add(event);
      queueState.update(queue);
      timerService.registerEventTimeTimer(event.timestamp);
    }
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
    PriorityQueue<Event> queue = queueState.value();
    Long watermark = context.timerService().currentWatermark();
    Event head = queue.peek();
    while (head != null && head.timestamp <= watermark) {
      out.collect(head);
      queue.remove(head);
      head = queue.peek();
    }
  }
}

更新:参见如何使用 Flink 对无序事件时间流进行排序了解一般更好的方法的描述。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

对流并集进行排序以识别 Apache Flink 中的用户会话 的相关文章

  • Scala - 如何解决“值不是 Nothing 的成员”错误

    此示例代码基于 Atmosphere 类 但如果有人可以让我了解该错误的一般含义 我想我可以找出任何特定于 Atmosphere 的解决方案 val bc BroadcasterFactory getDefault lookup broad
  • Flink 在 Kubernetes 上的部署和 Native Kubernetes 有什么不同

    黑白的主要区别是什么原生 Kubernetes https ci apache org projects flink flink docs stable ops deployment native kubernetes html and 库
  • 如何使用 apply/unapply 方法重现案例类行为?

    我尝试用普通类和伴生对象替换案例类 但突然出现类型错误 编译良好的代码 综合示例 trait Elem A B def C other Elem C A Elem C B other match case Chain head tail g
  • Scala 集合不一致

    为什么 Scala Collections API 中的集合和列表之间缺乏一致性 例如 有不可变的 Set 但也有可变的 Set 如果我想使用后者 我可以简单地这样做 val set Set A set new A 但是 本身不存在可变列表
  • 使用原始类型模拟案例类

    考虑以下类型结构 trait HasId T def id T case class Entity id Long extends HasId Long 比方说 我们想在一些测试中模拟实体类 val entityMock mock Enti
  • Scala:类似 Option (Some, None) 但具有三种状态:Some、None、Unknown

    我需要返回值 当有人询问值时 告诉他们以下三件事之一 这是值 没有价值 我们没有关于该值的信息 未知 情况 2 与情况 3 略有不同 示例 val radio car radioType 我们知道该值 返回无线电类型 例如 pioneer
  • 如何捕获 Oozie Spark 输出

    有没有办法捕获spark的输出然后将其输入到shell上 我们当前正在使用 scala 创建 jar 文件 并希望我们的 Spark 输出成为 shell 输入 我的想法是使用 wf actionData spark XXXX var 我只
  • 在 Scala 和 SBT 中调试较长的编译时间

    在我的 Scala SBT 项目中 我有一个文件需要 5 分钟才能编译 所有其他的都可以在几秒钟内编译 这使得开发非常痛苦 我确信我滥用了一些 Scala 构造 但我不知道如何调试它 如何在 Scala 中调试较长的编译时间 我正在使用 S
  • Scala:如何在超类上实现克隆方法,并在子类中使用它?

    我可能会以错误的方式处理这个问题 但我想要一个像这样的对象 class MyDataStructure def myClone val clone new MyDataStructure do stuff to make clone the
  • Scala 模式匹配变量绑定

    为什么提取器返回时不能以 样式绑定变量Option
  • 缓存 Slick DBIO 操作

    我正在尝试加快 SELECT FROM WHERE name 的速度Play 中的查询类型 Scala 应用程序 我正在使用 Play 2 4 Scala 2 11 play slick 1 1 1 包 该软件包使用Slick 3 1版本
  • 对 Scala Not Null 特征的库支持

    Notice 从 Scala 2 11 开始 NotNull已弃用 据我了解 如果您希望引用类型不可为空 则必须混合魔法NotNull特征 编译器会自动阻止你输入null 可以值在里面 看到这个邮件列表线程 http www nabble
  • Scala 特性:val/def 和 require

    下面的代码抛出IllegalArgumentException trait T val x Long require x gt 0 object T extends App val y new T val x 42L 而以下情况则不然 tr
  • Scala 解析器组合器的运算符优先级

    我正在研究需要考虑运算符优先级的解析逻辑 我的需求并不太复杂 首先 我需要乘法和除法比加法和减法具有更高的优先级 例如 1 2 3 应视为 1 2 3 这是一个简单的例子 但你明白了 我需要将更多自定义标记添加到优先级逻辑中 我可以根据此处
  • 使用 Apache Spark 读取 JSON - `corrupt_record`

    我有一个json file nodes看起来像这样 toid osgb4000000031043205 point 508180 748 195333 973 index 1 toid osgb4000000031043206 point
  • Spark中如何获取map任务的ID?

    Spark中有没有办法获取map任务的ID 例如 如果每个映射任务都调用用户定义的函数 我可以从该用户定义的函数中获取该映射任务的 ID 吗 我不确定您所说的地图任务 ID 是什么意思 但您可以使用以下方式访问任务信息TaskContext
  • 按字符分割字符串

    scala 有一个标准的分割字符串的方法StringOps split 但它的行为有点让我惊讶 演示一下 使用快捷便利功能 def sp str String str split toList 以下表达式全部计算结果为 true sp Li
  • Scala 交互式解释器 (REPL) - 如何将输出重定向到文本文件?

    是否可能 如果可能 是如何做到的 通常 gt and gt gt 在 Windows 或 Linux 命令行上工作的命令在这种情况下不起作用 您可以从控制台以编程方式执行此操作 import java io FileOutputStream
  • scala中的协变类型参数需要在java接口中保持不变

    我有一个看起来像这样的特征 一些进一步的信息可以在我自己提出了这个相关问题 https stackoverflow com questions 3695990 inheritance and automatic type conversio
  • 关于 scala.math.Integral 的问题

    有什么方法mkNumericOps andmkOrderingOps of scala math Integral http www scala lang org api current scala math Integral html我们

随机推荐

  • 如何使用 python 写回到谷歌文档电子表格中的某个单元格

    所以问题是 我从电子表格中的行的第一列 例如 A2 获取一些信息 然后我将对该信息进行一些检查 之后我想将结果写回该行的下一列 我怎么做 是否有某种功能可以让我指示后面 前面 上面 下面的列 所以我可以在该单元格中写入信息 当然 Pytho
  • python AttributeError:模块“pygame”没有属性“display”

    我开始使用 Python 特别是 pygame 模块 但是当我尝试创建一个窗口时 会发生此错误 gt gt gt import pygame gt gt gt width height 300 200 gt gt gt screen pyg
  • 另一台机器的时间

    在 C 中 当我们使用 DateTime Now 时 属性值是本地计算机的当前日期和时间 如何获取另一台具有IP地址或机器名称的机器的时间 您可以通过编写一个为您提供当前时间的服务来实现吗 或连接到远程计算机并发送一些 wmi 查询 类似问
  • OnDraw() 未触发,surfaceView 中未绘制任何内容 - Android

    你好 我在水平滚动视图中有一个 SurfaceView 我想通过 onDraw 调用来填充图像 然而 什么也没有绘制 我有一个类 其中的绘图是通过线程 CanvasThread 完成的 public class PanelChart ext
  • R read.csv - 带有特定符号(>)的标题

    当我通过 R 读取 csv 文件时 所有特定符号 gt 例如 csv 文件 用户 gt 75 R 显示用户 75 我怎样才能避免这种情况 您可以使用check names FALSE在你的read csv call From read cs
  • 索引如何提高 mongodb 中的查询性能

    我需要了解 mongo 中的索引如何提高查询性能 目前我的数据库没有索引 我如何为现有数据库建立索引 我还需要创建一个仅用于索引的新字段吗 从根本上来说 MongoDB 中的索引与其他数据库系统中的索引类似 MongoDB 支持 Mongo
  • Visual Studio NugetPackageManager 界面中的“版本”列有何意义? (与“已安装”列不同)

    已安装 列已填充 但 版本 列未填充 版本 栏是什么意思 与 已安装 列不同 我熟悉语义版本的概念 所以我确切地知道版本号的概念对于 nuget 包意味着什么 我想知道到底是什么that列于that接口意思 后续关于空白的问题结束了here
  • 蓝鸟承诺范围

    我刚刚开始使用承诺来尝试清理一些 回调地狱 我决定尝试 bluebird 并在浏览器中运行它 但立即遇到了范围界定问题 有没有办法在新的 Promise 中设置 thisArg 下面的示例显示承诺解析器内的 this 值设置为浏览器窗口 但
  • Bitmap.getPixel 始终返回黑色

    我正在创建一个应用程序 其中涉及获取屏幕部分的颜色 为此 我使用 Bitmap getPixel 方法来检索屏幕的指定像素 然后将其转换为 RGB 格式 以便以后更轻松地进行编码 问题是 当我使用 getPixel 方法时 无论屏幕上是什么
  • 使用 cygwin 在 Windows 上安装 GMP

    我是 C 新手 我必须处理大整数 所以我必须通过 Cygwin 安装 GMP 我能找到的有关安装此程序的任何文档都假设您知道自己在说什么 而我确实不知道 无论如何 我有权利 tar或者其他什么 正确提取它 现在我看到的任何网站都说要运行 c
  • 如何修复 java Apache POI 中的 NotOfficeXmlFileException?

    我正在尝试创建一个新的 Excel 文件 其中仅包含 hello 这是我的代码 import java io File import java io FileInputStream import java io FileNotFoundEx
  • 将 html 文本添加到超大 jquery 图像幻灯片

    我只想将 html 文本添加到著名的图像滑块超大的 这是他们的演示页面 http buildinternet com project supersized slideshow 3 2 demo html 该 html 可以正好位于演示中 m
  • 如何在 shell 脚本中使用远程服务器上的带有远程变量的数组?

    这就是我正在尝试做的 bin bash array local 1 2 3 4 5 ssh user server lt lt EOF index remote 1 echo index remote echo array local in
  • 什么有限状态机捕获具有相同数量的“01”和“10”的二进制字符串?

    我需要帮助设计一个有限状态机 该状态机接受包含尽可能多的模式出现的二进制字符串01作为模式的出现10 我有点很难准确理解哪些字符串应该被接受 哪些字符串应该被拒绝 欢迎任何指导 有问题的语言是什么 包含尽可能多的模式出现的二进制字符串01作
  • 如何更新 Azure 通知中心注册中的过期时间?

    我使用 Azure 通知中心已经有一段时间了 然而 我为一个新项目创建了一个新的通知中心 我注意到一些非常奇怪的行为 每当我创建一个注册时ExpirationDate被设定为12 31 9999 7 59 59 因此 对于某些人来说 我认为
  • 导入 .ics 文件时出现 Google 日历错误

    我很难让我的订阅日历与 Google 日历配合使用 以下 URL 可在 Outlook 和 Apple 日历 iCal 中正常工作 但我收到来自 Google 的错误 您提供的地址不包含有效 iCal 或 GData 格式的日历 我的 UR
  • 调试 python 多处理中的错误

    我正在使用Pool的功能multiprocessing模块以便在不同数据上并行运行相同的代码 事实证明 我的代码在某些数据上引发了异常 但没有给出发生这种情况的精确行 Traceback most recent call last File
  • 使用 Apache Spark (Java) 将 CSV 数据加载到 Dataframe 并转换为数组

    我有一个包含以下数据的 CSV 文件 1 2 5 2 4 2 3 我想将它们加载到具有数组字符串模式的数据框中 输出应如下所示 1 2 5 2 4 2 3 这里已经使用 scala 回答了这个问题 Spark 将字符串列转换为数组 我想在
  • Excel VBA 更改 HTML 选择标记中的选项

    最近 我发布了一个问题vbscript 捕获 HTML 选择选项标记中的文本 这对我在 Internet Explorer 平台上的帮助很大 但是 我有一个新项目 其网站稍微复杂一些 该网站有多个嵌套表格 这些表格的格式可容纳form元素
  • 对流并集进行排序以识别 Apache Flink 中的用户会话

    我有两个事件流 L l1 l3 l8 比较稀疏 表示用户登录某个 IP E e2 e4 e5 e9 是特定IP的日志流 较低的索引代表时间戳 如果我们将两个流连接在一起并按时间排序我们会得到 l1 e2 l3 e4 e5 l8 e9 是否可