Flink:行时间属性不得位于常规联接的输入行中

2023-12-12

使用 flink SQL API,我想将多个表连接在一起并在时间窗口内进行一些计算。 我有 3 个来自 CSV 文件的表,一个来自 Kafka。 在卡夫卡表中,我有一个字段timestampMs,我想将其用于我的时间窗口操作。

为此,我执行了以下代码:

reamExecutionEnvironment env = ... ;
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

TableSource table1 = CsvTableSource.builder()
        .path("path/to/file1.csv")
        .ignoreFirstLine()
        .fieldDelimiter(",")
        .field("id1", Types.STRING)
        .field("someInfo1", Types.FLOAT)
        .build();

TableSource table2 = CsvTableSource.builder()
        .path("path/to/file2.csv")
        .ignoreFirstLine()
        .fieldDelimiter(",")
        .field("id2", Types.STRING)
        .field("someInfo2", Types.STRING)
        .build();

TableSource table3 = CsvTableSource.builder()
        .path("path/to/file3.csv")
        .ignoreFirstLine()
        .fieldDelimiter(",")
        .field("id2", Types.STRING)
        .field("id1", Types.STRING)
        .field("someInfo3", Types.FLOAT)
        .build();

tableEnv.registerTableSource("Table1",table1);
tableEnv.registerTableSource("Table2",table2);
tableEnv.registerTableSource("Table3",table3);


Schema schemaExt = new Schema().schema(SOME_SCHEMA);
schemaExt = schemaExt.field("rowtime", Types.SQL_TIMESTAMP).rowtime(new Rowtime().timestampsFromField("timestampMs").watermarksPeriodicBounded(40000));

tableEnv.connect(new Kafka()
        .version("universal")
        .topic(MY_TOPIC)
        .properties(MY_PROPERTIES)
        .sinkPartitionerRoundRobin()
)
            .withFormat(...)
            .withSchema(schemaExt)
            .inAppendMode()
            .registerTableSource("KafkaInput");

Table joined = tableEnv.sqlQuery("SELECT * FROM table1 " +
        "join table3 on table1.id2 = table3.id2 " +
        "join table2 on table3.id1 = table2.id1 " +
        "join KafkaInput on table3.id2 = KafkaInput.id2");

tableEnv.registerTable("Joined", joined);

int windowWidth = 5;
int frequency = 2;
Table processed = tableEnv.sqlQuery("SELECT id1 FROM Joined " +
        "GROUP BY id1, HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '30' SECOND)");



Sink s = createSink(this.esEndpoint, this.esPattern, this.schemaHandler.getSchemaStr());


tableEnv.registerTableSink("MySink", ...);

processed.insertInto("MySink");

env.execute();

但是当我运行它时,出现以下错误:

Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 
Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

但我不明白解决方法提示部分。如何创建时间属性并在加入表后进行一些窗口计算。

- - 编辑 - -

在上面的代码中,我替换了以下几行:

Table joined = tableEnv.sqlQuery("SELECT * FROM table1 " +
        "join table3 on table1.id2 = table3.id2 " +
        "join table2 on table3.id1 = table2.id1 " +
        "join KafkaInput on table3.id2 = KafkaInput.id2");

tableEnv.registerTable("Joined", joined);

By :

Table staticJoined = tableEnv.sqlQuery("SELECT *, TIMESTAMP('1970-01-01 00:00:00') as rowtime FROM table1 " +
        "join table3 on table1.id2 = table3.id2 " +
        "join table2 on table3.id1 = table2.id1 ");

TemporalTableFunction temporalFunction = staticJoined.createTemporalTableFunction( "rowtime" , "id2");
tableEnv.registerFunction("CSVData", temporalFunction);

tableEnv.registerTable("Joined",
    tableEnv.sqlQuery("SELECT * FROM KafkaInput, LATERAL TABLE (CSVData(KafkaInput.rowtime)) as Statics WHERE Statics.id2 = KafkaInput.id2")
);

但我收到 TemporalTableFunction 的错误:

Exception in thread "main" java.lang.AssertionError: Cannot add expression of different type to set:
set type is RecordType(BIGINT genTimestampMs, BIGINT timestampMs, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkConfId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkId, FLOAT density, FLOAT count, FLOAT surface, TIMESTAMP(3) NOT NULL rowtime, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" cameraName, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId00, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId, FLOAT coefficient, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId0, FLOAT thresholdLow, FLOAT thresholdMedium, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId1, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" name, TIMESTAMP(3) rowtime0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" StationName) NOT NULL
expression type is RecordType(BIGINT genTimestampMs, BIGINT timestampMs, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkConfId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkId, FLOAT density, FLOAT count, FLOAT surface, TIMESTAMP(3) NOT NULL rowtime, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" cameraName, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId00, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId, FLOAT coefficient, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId0, FLOAT thresholdLow, FLOAT thresholdMedium, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId1, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" name, TIMESTAMP(0) NOT NULL rowtime0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" StationName) NOT NULL
set is rel#26:LogicalCorrelate.NONE(left=HepRelVertex#24,right=HepRelVertex#25,correlation=$cor0,joinType=inner,requiredColumns={8})
expression is LogicalTemporalTableJoin#32

“集合类型”和“表达式类型”之间的两个字段不匹配。TIMESTAMP(3) rowtime0 and TIMESTAMP(0) NOT NULL rowtime0

问题是我没有名为的字段rowtime0。看起来它是一个内部字段。我真的不明白这里发生了什么


您的查询定义常规联接,即没有时间联接约束的联接。由于 Flink 将所有表视为动态的(即,假设它们将来可能会发生变化),因此没有时间限制的常规联接不能保证按时间戳顺序(大致)发出行。但是,时间属性需要时间戳顺序,以确保可以在不完全具体化流的情况下执行后续操作(例如窗口聚合)​​。因此,Flink 不允许时间属性作为不保留时间顺序的常规连接的输入(以及输出)。

如果 Flink 知道 CSV 文件中的表是固定的而不是动态的,那么这个问题就不会存在。然而,这一推理尚未得到支持。

作为解决方法,您可以将 CSV 表建模为时态表(没有改变)和加入他们与卡夫卡表。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink:行时间属性不得位于常规联接的输入行中 的相关文章

随机推荐

  • Google Apps 脚本 - 获取 doPost 的 IP 并在电子表格中进行跟踪

    有没有一种简单的方法来捕获d的IPoPost e 请求并将其与 POST 的传入值一起存储以跟踪请求的来源 它似乎无法通过e parameter 我找不到其他属性或方法的完整列表e 目前无法获取传入请求的 IP 地址或用户代理doGet o
  • 如何使用 Selenium 和 Xpath 通过忽略大小写的文本查找元素

    我使用的是java版本 1 8 0 191 和selenium 3 141 59 我试图找出页面是否包含 error 或 erreur 一词 另外 我希望它不区分大小写 查找文本很容易 List
  • Try-catch 可以加速我的代码吗?

    我编写了一些代码来测试 try catch 的影响 但看到了一些令人惊讶的结果 static void Main string args Thread CurrentThread Priority ThreadPriority Highes
  • UISearchBar:更改外观 - 形状、背景、覆盖图像

    我想更改默认 UISearchBar 的外观 例如 您将如何在 Google iPhone 应用程序中重新创建搜索框 如下所示 如何叠加图像来产生这种效果 source isedb com 经过对自定义搜索栏可能性的一些调查 我倾向于说这是
  • jQuery UI 弹跳效果对齐 Firefox 和 IE8 中左侧的元素

    在 Firefox 和 IE8 或更低版本中 JQuery UI 的反弹效果存在问题 IE9 Chrome Safari都能正常渲染反弹效果 任何想法是什么导致了这个 该问题在 Firefox 和 Chrome 中都有体现 弹出窗口询问您是
  • Matlab 中的无效对象句柄错误

    我有以下代码 它是卫星绕地球运动的图形渲染 function ex global state fh figure Menu none Toolbar none Units characters hPanAni uipanel parent
  • Flutter:任务“:app:packageDebug”执行失败

    您好 我在使用 flutter 编译时遇到问题 我已经尝试使用 flutter run 进行调试 debug 但没有成功 我认为来自 gradle 但我不知道如何解决这个问题 Resolving dependencies 3 5s FAIL
  • JavaScript 倒计时

    我已经在网上搜索过 但所有可用的都是您指定日期的地方 并且它会倒计时到该日期 我需要的是从 27分43秒 以这种格式 一直倒数到0的东西 无论何时他们登陆页面 有人得到任何可用的片段吗 像这样的事情应该可以解决问题 我很无聊 决定自己做而不
  • 我可以在 CDH 中安装多个 Spark 版本吗?

    我使用的是cdh5 1 0 它已经安装了默认的spark 但是 我想使用 Spark 1 3 我也可以将此版本安装到cdh5 1 0吗 如何才能设置这些呢 新版本的spark也会通过Cloudera manager监控吗 是的 您可以运行您
  • 检测应用程序的首次运行

    我正在创建一个应用程序 当应用程序第一次启动时 我必须在其中创建一个 plist 稍后我将使用 plist 来存储用户稍后输入的详细信息 如何检测应用程序的首次启动 我正在尝试 NSUserDefaults 但我认为我做错了什么 您可以使用
  • 如何在 Android 上的 MapView 中添加地图比例尺?

    我正在努力添加一个地图比例 根据当前的缩放级别在屏幕上显示当前的长度 我有一种感觉 它可能存在一些预定义的类可供使用 但我不知道 我已经搜索了很多但找不到任何东西 我非常感谢任何帮助 Alex 好吧 我现在明白了 Luis 的回答对我帮助很
  • 如何将值传递给构造函数?

    很抱歉我的问题有点理论化 我是 OOP 新手 正在研究以下代码 public interface IShape double getArea public class Rectangle IShape int lenght int widt
  • 如何从已经有 main 的 gradle 项目中运行 Kotlin 脚本?

    我有 Ktor 项目 其主要名称为Application kt 布局是这样的 com myProject Application kt Testing kt api Routes kt routes NewRoutes kt OpenApi
  • Boost ASIO HTTP 客户端 POST

    我正在尝试让 boost ASIO 库发送帖子 但变量从未发送到服务器 我知道服务器工作正常 用curl测试 此代码不起作用 变量 msg 未发布到服务器 但当我使用curl 时它确实起作用 tcp resolver resolver io
  • 在外部区域达到阈值后允许 BottomSheet 向上滑动

    我正在尝试复制当前 Google 地图的行为 该行为允许从底部栏向上滑动时显示底部工作表 请注意 在下面的录音中 我首先点击底部栏上的一个按钮 然后向上滑动 这又会显示其后面的工作表 我找不到任何地方解释如何实现这样的事情 我尝试探索 Bo
  • Android sha512 示例

    有人可以提供一个关于如何使用 java android 哈希密码的示例吗PW HASH ITERATION COUNTsha512 salt 的迭代 在伪代码中 hash sha512 concat pw salt for i 1 i
  • 在 django-apps 中查找静态文件和模板的顺序

    例如 我的 django 项目中有 2 个应用程序 它们的模板和静态文件具有相同的子路径 app1 static style css templates index html app2 static style css templates
  • Dymola 标志列表

    Dymola 中的一些设置可以通过在 命令 窗口中设置标志来更改 标志的一些示例是 Advanced AutoFormatting Advanced PedanticModelica Advanced LogStartValuesForIt
  • 以编程方式将 RDLC 报告另存为 PDF

    我有一份报告需要运行多次并保存为 PDF 我目前正在以编程方式生成 PDF 格式的报告 但希望保存报告 而无需用户每次都手动选择保存选项 我用来将单个报告呈现为 PDF 的代码是 Dim warnings As Microsoft Repo
  • Flink:行时间属性不得位于常规联接的输入行中

    使用 flink SQL API 我想将多个表连接在一起并在时间窗口内进行一些计算 我有 3 个来自 CSV 文件的表 一个来自 Kafka 在卡夫卡表中 我有一个字段timestampMs 我想将其用于我的时间窗口操作 为此 我执行了以下