如何使用 Flink SQL 按事件时间对流进行排序

2023-12-26

我有一个故障DataStream<Event>我想要排序,以便事件按事件时间时间戳排序。我将我的用例简化为我的 Event 类只有一个字段 -timestamp field:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.setParallelism(1);

    DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
            .assignTimestampsAndWatermarks(new TimestampsAndWatermarks());

    Table events = tableEnv.fromDataStream(eventStream, "timestamp.rowtime");
    tableEnv.registerTable("events", events);
    Table sorted = tableEnv.sqlQuery("SELECT timestamp FROM events ORDER BY eventTime ASC");
    DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);

    sortedEventStream.print();

    env.execute();
}

我收到此错误:

线程“main”中的异常 org.apache.flink.table.api.SqlParserException:SQL 解析失败。 在第 1 行第 8 列遇到“timestamp FROM”。

似乎我没有以正确的方式指定事件时间属性,但不清楚出了什么问题。


问题原来是使用timestamp作为我的 Event 类中的字段名称。将其更改为eventTime足以让一切正常运转:

public class Sort {
    public static final int OUT_OF_ORDERNESS = 1000;

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
                .assignTimestampsAndWatermarks(new TimestampsAndWatermarks());

        Table events = tableEnv.fromDataStream(eventStream, "eventTime.rowtime");
        tableEnv.registerTable("events", events);
        Table sorted = tableEnv.sqlQuery("SELECT eventTime FROM events ORDER BY eventTime ASC");
        DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);

        sortedEventStream.print();

        env.execute();
    }

    public static class Event {
        public Long eventTime;

        Event() {
            this.eventTime = Instant.now().toEpochMilli() + (new Random().nextInt(OUT_OF_ORDERNESS));
        }
    }

    private static class OutOfOrderEventSource implements SourceFunction<Event> {
        private volatile boolean running = true;

        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            while(running) {
                ctx.collect(new Event());
                Thread.sleep(1);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }

    private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
        public TimestampsAndWatermarks() {
            super(Time.milliseconds(OUT_OF_ORDERNESS));
        }

        @Override
        public long extractTimestamp(Event event) {
            return event.eventTime;
        }
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 Flink SQL 按事件时间对流进行排序 的相关文章

  • 谁能分享一下 Scala 中的 Flink Kafka 示例吗?

    谁能分享一下Scala中Flink Kafka 主要是从Kafka接收消息 的工作示例吗 我知道有一个Kafka字数统计 https github com apache spark blob master examples src main
  • 使用 Flink LocalEnvironment 进行生产

    我想了解本地执行环境的局限性以及它是否可以用于在生产中运行 感谢任何帮助 见解 谢谢 LocalExecutionEnvironment 启动一个 Flink MiniCluster 它在单个 JVM 中运行整个 Flink 系统 JobM
  • Apache Flink:KeyedStream 上的数据分布不均匀

    我在 Flink 中有这样的 Java 代码 env setParallelism 6 Read from Kafka topic with 12 partitions DataStream
  • Flink时间特性和AutoWatermarkInterval

    在 Apache Flink 中 setAutoWatermarkInterval interval 向下游操作员生成水印 以便他们提前事件时间 如果水印在指定的时间间隔内没有更改 没有事件到达 运行时将不会发出任何水印 另一方面 如果在下
  • 如何在其他流的基础上过滤Apache flink流?

    我有两个流 一个是 Int 另一个是 json 在 json Schema 中 有一个键是一些 int 所以我需要通过与另一个整数流的键比较来过滤 json 流 那么在 Flink 中是否可能 是的 您可以使用 Flink 进行这种流处理
  • 在任务管理器之间均匀分配 Flink 运算符

    我正在 15 台机器的裸机集群上构建 Flink 流应用程序原型 我使用带有 90 个任务槽 15x6 的纱线模式 该应用程序从单个 Kafka 主题读取数据 Kafka主题有15个分区 所以我也将源算子的并行度设置为15 但是 我发现 F
  • flink kafka生产者在检查点恢复时以一次模式发送重复消息

    我正在写一个案例来测试 flink 两步提交 下面是概述 sink kafka曾经是kafka生产者 sink stepmysql接收器是否扩展two step commit sink comparemysql接收器是否扩展two step
  • Apache Flink 检查点卡住

    我们正在运行一个 ListState 介于 300GB 到 400GB 之间的作业 并且有时该列表可能会增加到数千 在我们的用例中 每个项目都必须有自己的 TTL 因此我们使用 S3 上的 RocksDB 后端为此 ListState 的每
  • 在 Flink 中,我可以在同一个槽中拥有一个算子的多个子任务吗?

    探索Apache Flink几天了 对Task Slot的概念有些疑惑 虽然有人问了几个问题 但有一点我不明白 我正在使用一个玩具应用程序进行测试 运行本地集群 我已禁用运算符链接 我从文档中知道插槽允许内存隔离而不是 CPU 隔离 阅读文
  • 在 Flink 流中使用静态 DataSet 丰富 DataStream

    我正在编写一个 Flink 流程序 其中我需要使用一些静态数据集 信息库 IB 来丰富用户事件的数据流 对于例如假设我们有一个买家的静态数据集 并且有一个传入的事件点击流 对于每个事件 我们希望添加一个布尔标志来指示事件的执行者是否是买家
  • Apache Flink - 作业内部无法识别自定义 java 选项

    我已将以下行添加到 flink conf yaml 中 env java opts Ddy props path PATH TO PROPS FILE 启动 jobmanager jobmanager sh start cluster 时
  • Flink 的简单 hello world 示例

    我正在寻找 Apache flink 的 hello world 体验的最简单的示例 假设我刚刚在一个干净的盒子上安装了 flink 那么为了 让它做某事 我需要做的最低限度是什么 我意识到这很模糊 这里有一些例子 来自终端的三个 pyth
  • Apache Flink 上的 zipWithIndex

    我想为我的输入的每一行分配一个id 这应该是一个数字0 to N 1 where N是输入中的行数 粗略地说 我希望能够执行以下操作 val data sc textFile textFilePath numPartitions val r
  • Flink 使用 Ceph 作为持久存储

    Flink 文档建议 Ceph 可以用作状态的持久存储 https ci apache org projects flink flink docs release 1 3 dev stream checkpointing html http
  • 我可以将 flink RocksDB 状态后端与本地文件系统一起使用吗?

    我正在探索使用 FlinkrocksDb 状态后端 文档似乎暗示我可以使用常规文件系统 例如 file data flink checkpoints 但代码 javadoc 仅在此处提到 hdfs 或 s3 选项 我想知道是否可以将本地文件
  • Flink 窗口:聚合并输出到接收器

    我们有一个数据流 其中每个元素都是这种类型 id String type Type amount Integer 我们想要聚合这个流并输出总和amount每周一次 目前的解决方案 Flink 管道示例如下所示 stream keyBy ty
  • 当我重新运行 Flink 消费者时,Kafka 再次消费最新消息

    我在用 Scala 编写的 Apache Flink API 中创建了一个 Kafka 消费者 每当我从某个主题传递一些消息时 它就会及时接收它们 但是 当我重新启动使用者时 它不会接收新的或未使用的消息 而是使用发送到该主题的最新消息 这
  • 2022年Flink可以支持什么Java版本?

    假设我开始一个新的 Flink Java 项目 如果我寻找 稳定的 Flink Java 生产体验 我应该使用哪个版本 官方docs https nightlies apache org flink flink docs master do
  • Flink任务管理器内存不足和内存配置

    我们使用 Flink 流在单个集群上运行一些作业 我们的工作是使用rocksDB 来保存状态 该集群配置为在 3 个独立的 VM 上使用单个 Jobmanager 和 3 个 Taskmanager 运行 每个 TM 均配置为运行 14GB
  • Flink:Jobmanager UI 中设置的并行度与任务槽有何关系?

    假设我有 8 个任务管理器和 16 个任务槽 如果我使用 Jobmanager UI 提交作业并将并行度设置为 8 我是否只使用 8 个任务槽 如果我有 8 个具有 8 个槽位的任务管理器 并以并行度 8 提交相同的作业 该怎么办 是完全一

随机推荐