通过读取kafka的详细信息动态创建flink窗口

2023-12-05

假设 Kafka 消息包含 flink 窗口大小配置。

我想读取来自 Kafka 的消息并在 flink 中创建一个全局窗口。

问题陈述:

我们可以使用 BroadcastStream 来处理上述场景吗?

Or

还有其他方法可以支持上述情况吗?


Flink 的窗口 API 不支持动态改变窗口大小。

您需要做的是使用处理函数来实现您自己的窗口。在本例中是一个 KeyedBroadcastProcessFunction,其中广播窗口配置。

你可以检查Flink 训练有关如何使用 KeyedProcessFunction 实现时间窗口的示例(复制如下):

public class PseudoWindow extends KeyedProcessFunction<String, KeyedDataPoint<Double>, KeyedDataPoint<Integer>> {
    // Keyed, managed state, with an entry for each window.
    // There is a separate MapState object for each sensor.
    private MapState<Long, Integer> countInWindow;

    boolean eventTimeProcessing;
    int durationMsec;

    /**
     * Create the KeyedProcessFunction.
     * @param eventTime whether or not to use event time processing
     * @param durationMsec window length
     */
    public PseudoWindow(boolean eventTime, int durationMsec) {
        this.eventTimeProcessing = eventTime;
        this.durationMsec = durationMsec;
    }

    @Override
    public void open(Configuration config) {
        MapStateDescriptor<Long, Integer> countDesc =
                new MapStateDescriptor<>("countInWindow", Long.class, Integer.class);
        countInWindow = getRuntimeContext().getMapState(countDesc);
    }

    @Override
    public void processElement(
            KeyedDataPoint<Double> dataPoint,
            Context ctx,
            Collector<KeyedDataPoint<Integer>> out) throws Exception {

        long endOfWindow = setTimer(dataPoint, ctx.timerService());

        Integer count = countInWindow.get(endOfWindow);
        if (count == null) {
            count = 0;
        }
        count += 1;
        countInWindow.put(endOfWindow, count);
    }

    public long setTimer(KeyedDataPoint<Double> dataPoint, TimerService timerService) {
        long time;

        if (eventTimeProcessing) {
            time = dataPoint.getTimeStampMs();
        } else {
            time = System.currentTimeMillis();
        }
        long endOfWindow = (time - (time % durationMsec) + durationMsec - 1);

        if (eventTimeProcessing) {
            timerService.registerEventTimeTimer(endOfWindow);
        } else {
            timerService.registerProcessingTimeTimer(endOfWindow);
        }
        return endOfWindow;
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext context, Collector<KeyedDataPoint<Integer>> out) throws Exception {
        // Get the timestamp for this timer and use it to look up the count for that window
        long ts = context.timestamp();
        KeyedDataPoint<Integer> result = new KeyedDataPoint<>(context.getCurrentKey(), ts, countInWindow.get(ts));
        out.collect(result);
        countInWindow.remove(timestamp);
    }
} 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

通过读取kafka的详细信息动态创建flink窗口 的相关文章

随机推荐

  • 强制浏览器在显示页面之前加载 CSS

    我已经制作了我的网站的移动版本 然而 在加载页面时 网站首先会在未应用 CSS 的情况下显示 一秒钟后 最多 它会应用 CSS 并正确呈现 此行为在所有浏览器 包括移动浏览器 中都是一致的 你知道我如何强制浏览器首先加载CSS 它的尺寸真的
  • 为什么我必须为 React 组件类中定义的方法添加 .bind(this) ,而不是在常规 ES6 类中定义方法

    让我困惑的是为什么当我定义一个 React 组件类时 包含在this对象在定义的方法中未定义 this在类中的生命周期方法中可用 除非我使用 bind this 或者使用箭头函数定义方法 例如下面的代码this state将是未定义的ren
  • 从具有不同高度的表格行中提取pdf文本(java使用pdfbox库)

    黑色形状是需要提取的文本 到目前为止 我已经从列中提取了文本 但是是手动提取的 因为只有 5 个 对区域使用 Rectangle 类 我的问题是 有没有办法对行执行此操作 因为矩形的大小 高度 不同 并且手动对 50 多行执行此操作将是一种
  • Bootstrap 工具提示 HTML, 锚标记上带有很棒的字体图标,但焦点不起作用

    我有以下 HTML 它按预期工作 当悬停到 符号时 会出现工具提示 但是因为其中一个在工具提示内有链接 所以我希望它表现得像 焦点 并且同时也 悬停 因此当我悬停其中包含链接的工具提示时 它仍然像 悬停 一样 当我没有悬停 HTML 时 工
  • 将 DateInterval 格式设置为 ISO8601

    我目前正在开发一个 php 项目 需要将 DateInterval 格式化为 ISO8601 类似这样 P5D 此格式可用于创建 DateTime 和 DateInterval 对象 但我无法找到将 DateInterval 格式化为此格式
  • C#中的滑动窗口算法

    我正在尝试在 C 3 0 中的二维数组上实现简单的滑动窗口算法 我发现this非常有用 但它只涉及一维数组 The post还包括算法的代码 我完全无法将它用于我的场景 任何人都可以建议我如何继续吗 设想 source googlepage
  • 在 MVC javascript 部分解析@? [复制]

    这个问题在这里已经有答案了 我正在尝试使用电子邮件RegEx在 MVC4 的 javascript 部分 但RegEx有 char 它不允许解析它 error Parser Error Message is not valid at the
  • 如何检查进程是否正在运行

    我正在使用下面的代码启动一个进程 QProcess process new QProcess process gt start Path start 方法将启动第三方应用程序 如果进程已经在运行 我不应该再次调用 process gt st
  • 将变量从批处理传递到 powershell

    我有一个批处理文件 要求用户提供变量行set p asset 我这样调用我的 powershell 脚本 SET ThisScriptsDirectory dp0 SET PowerShellScriptPath ThisScriptsDi
  • 继续使用 MPMoviePlayerController 播放声音并锁定屏幕?

    当您观看视频时MPMoviePlayerController用户按下顶部按钮锁定屏幕 应用程序进入睡眠状态 视频中的声音也进入睡眠状态 有什么办法可以让锁停止声音吗 如果没有 有没有办法拦截锁 创建 自定义锁 以节省一些电池但继续播放视频
  • 有没有办法显示 PowerShell 脚本中的所有函数?

    是否有任何命令可以列出我在脚本中创建的所有函数 就像我创建了函数 doXY 和函数 getABC 或类似的东西 然后我输入命令 它显示 函数 doXY 函数 getABC 这将是一个很酷的功能 感谢你的帮助 您可以让 PowerShell
  • Flash Player 10.1 的全局错误处理程序不起作用

    尝试在我的项目中实现新的 FP 10 1 全局错误处理程序 但无论我做什么 任何未捕获的错误仍然会显示 异常 窗口 在 SWF 的调试版本和发布版本中 我想做的就是阻止这些弹出窗口 而是向我的记录器发送消息 这是我的代码 编辑 我现在简化了
  • 如何在 Mozilla Firefox 中一键复制文本?

    此代码在 Google Chrome Opera IE 11 中运行良好 但在 Mozilla firefox 和 Safari 中不起作用 我在以下字符串中收到错误 var 成功 document execCommand 复制
  • 保存和加载音频

    Unity 当我单击 录制 按钮时 Microphone Start 内置麦克风 true 10 44100 当我单击 暂停 按钮时 SavWav Save Application persistentDataPath Resources
  • 复杂的 SPARQL 查询 - Virtuoso 性能提示?

    我有一个相当复杂的 SPARQL 查询 它在并行线程 400 个线程 中执行数千次 为了提高可读性 这里对查询进行了一定程度的简化 命名空间 属性和变量已减少 但复杂性保持不变 联合 图形数量等 该查询针对 4 个图运行 其中最大的包含 5
  • Google BigQuery 表补丁/更新不起作用

    Google Http Request object batchHeaders gt array 3 Content Type gt string application http Content Transfer Encoding gt
  • 如何在打印时去掉数组括号

    打印数组时如何去掉左右括号 var array 1 2 3 4 println array It prints 1 2 3 4 var arrayWithoutBracketsAndCommas array some code printl
  • 如何本地化 Windows 应用商店应用中的通知和组合框? (C#/XAML,多语言应用程序工具包)

    我在 Windows 应用商店应用程序本地化方面遇到一些问题 我能够本地化 xaml 内容 例如 TextBlock Text 或 Button Content 我正在以与此处所示相同的方式进行操作 但我不知道如何本地化以下内容 1 我的组
  • 我是否使用了太多 jQuery?我什么时候越线?

    最近我发现自己经常使用 jQuery 和 JavaScript 经常做与使用 CSS 之前相同的事情 例如 我使用 JavaScript jQuery 替换表格行颜色或创建按钮和链接悬停效果 这是可以接受的吗 或者我应该继续使用 CSS 来
  • 通过读取kafka的详细信息动态创建flink窗口

    假设 Kafka 消息包含 flink 窗口大小配置 我想读取来自 Kafka 的消息并在 flink 中创建一个全局窗口 问题陈述 我们可以使用 BroadcastStream 来处理上述场景吗 Or 还有其他方法可以支持上述情况吗 Fl