如何增加 Flink taskmanager.numberOfTaskSlots 以在没有 Flink 服务器的情况下运行它(在 IDE 或 fat jar 中)

2024-01-11

我有一个关于在 IDE 中运行 Flink 流作业或作为 fat jar 运行而不将其部署到 Flink 服务器的问题。

问题是当我的工作中有超过 1 个任务槽时,我无法在 IDE 中运行它。

public class StreamingJob {

public static void main(String[] args) throws Exception {
    // set up the streaming execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties kafkaProperties = new Properties();
    kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
    kafkaProperties.setProperty("group.id", "test");
    env.setParallelism(1);

    DataStream<String> kafkaSource = env
        .addSource(new FlinkKafkaConsumer010<>("flink-source", new SimpleStringSchema(), kafkaProperties))
        .name("Kafka-Source")
        .slotSharingGroup("Kafka-Source");

    kafkaSource.print().slotSharingGroup("Print");

    env.execute("Flink Streaming Java API Skeleton");
}

}

我知道该作业需要 2 个插槽,并且我可以在 Flink 集群中拥有两个任务管理器,但是如何在 IDE 中本地运行它。

目前,我必须为本地所有操作员指定相同的 slotSharingGroup 名称才能拥有一个插槽。但它并不灵活。

你如何处理?


这是您所描述的一个已知错误。可以找到对应的JIRA问题here https://issues.apache.org/jira/browse/FLINK-8712.

解决这个问题的方法是手动设置任务槽的数量。TaskExecutor已开始。您可以通过以下方式执行此操作TaskManagerOptions.NUM_TASK_SLOTS配置选项:

final int parallelism = ...;
final Configuration configuration = new Configuration();
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, configuration);
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何增加 Flink taskmanager.numberOfTaskSlots 以在没有 Flink 服务器的情况下运行它(在 IDE 或 fat jar 中) 的相关文章

  • Gradle 构建错误:无法从 https://repo1.maven.org/maven2/io/fabric/tools/gradle/maven-metadata.xml 加载 Maven 元数据

    我在 Android studio 中遇到 gradle 构建错误 如下所示 Error A problem occurred configuring project MyApp Could not resolve all dependen
  • Java中有没有一种方法可以通过名称实例化一个类?

    我正在寻找问题 从字符串名称实例化一个类 https stackoverflow com questions 9854900 instantiate an class from its string name它描述了如何在有名称的情况下实例
  • 在 Java 中克隆对象 [3 个问题]

    这样做会调用Asub的clone方法吗 或者Asub深度克隆是否正确 如果没有的话 有没有办法通过这种方法对Asub进行深度克隆呢 abstract class Top extends TopMost protected Object cl
  • 序列的排列?

    我有具体数量的数字 现在我想以某种方式显示这个序列的所有可能的排列 例如 如果数字数量为3 我想显示 0 0 0 0 0 1 0 0 2 0 1 0 0 1 1 0 1 2 0 2 0 0 2 1 0 2 2 1 0 0 1 0 1 1 0
  • Junit:如何测试从属性文件读取属性的方法

    嗨 我有课ReadProperty其中有一个方法ReadPropertyFile返回类型的Myclass从属性文件读取参数值并返回Myclass目的 我需要帮助来测试ReadPropertyFile方法与JUnit 如果可能的话使用模拟文件
  • 在内存中使用 byte[] 创建 zip 文件。 Zip 文件总是损坏

    我创建的 zip 文件有问题 我正在使用 Java 7 我尝试从字节数组创建一个 zip 文件 其中包含两个或多个 Excel 文件 应用程序始终完成 没有任何异常 所以 我以为一切都好 当我尝试打开 zip 文件后 Windows 7 出
  • 如何获取之前的URL?

    我需要调用我的网络应用程序的 URL 例如 如果有一个从 stackoverflow com 到我的网站 foo com 的链接 我需要 Web 应用程序 托管 bean 中的 stackoverflow 链接 感谢所有帮助 谢谢 并不总是
  • 谷歌应用程序引擎会话

    什么是java应用程序引擎 默认会话超时 如果我们将会话超时设置为非常非常长的时间 会不会产生不良影响 因为谷歌应用程序引擎会话默认情况下仅存储在数据存储中 就像facebook一样 每次访问该页面时 会话仍然永远存在 默认会话超时设置为
  • 从最终实体获取根证书和中间证书

    作为密码学的菜鸟 我每天都会偶然发现一些简单的事情 今天只是那些日子之一 我想用 bouncy castle 库验证 java 中的 smime 消息 我想我几乎已经弄清楚了 但此时的问题是 PKIXparameters 对象的构建 假设我
  • Eclipse Maven Spring 项目 - 错误

    I need help with an error which make me crazy I started to study Java EE and I am going through tutorial on youtube Ever
  • 像 Java 这样的静态类型语言中动态方法解析背后的原因是什么

    我对 Java 中引用变量的动态 静态类型和动态方法解析的概念有点困惑 考虑 public class Types Override public boolean equals Object obj System out println i
  • 如何在用户输入数据后重新运行java代码

    嘿 我有一个基本的java 应用程序 显示人们是成年人还是青少年等 我从java开始 在用户输入年龄和字符串后我找不到如何制作它它们被归类为 我希望它重新运行整个过程 以便其他人可以尝试 的节目 我一直在考虑做一个循环 但这对我来说没有用
  • 在我的 Spring Boot 示例中无法打开版本 3 中的 Swagger UI

    我在 Spring Boot 示例中打开 swagger ui 时遇到问题 当我访问 localhost 8080 swagger ui 或 localhost 8080 root api name swagger ui 时出现这种错误 S
  • Eclipse 选项卡宽度不变

    我浏览了一些与此相关的帖子 但它们似乎并不能帮助我解决我的问题 我有一个项目 其中 java 文件以 2 个空格的宽度缩进 我想将所有内容更改为 4 空格宽度 我尝试了 正确的缩进 选项 但当我将几行修改为 4 空格缩进时 它只是将所有内容
  • 不接受任何内容也不返回任何内容的函数接口[重复]

    这个问题在这里已经有答案了 JDK中是否有一个标准的函数式接口 不接受也不返回任何内容 我找不到一个 像下面这样 FunctionalInterface interface Action void execute 可运行怎么样 Functi
  • java.io.Serialized 在 C/C++ 中的等价物是什么?

    C C 的等价物是什么java io Serialized https docs oracle com javase 7 docs api java io Serializable html 有对序列化库的引用 用 C 序列化数据结构 ht
  • 专门针对 JSP 的测试驱动开发

    在理解 TDD 到底是什么之前 我就已经开始编写测试驱动的代码了 在没有实现的情况下调用函数和类可以帮助我以更快 更有效的方式理解和构建我的应用程序 所以我非常习惯编写代码 gt 编译它 gt 看到它失败 gt 通过构建其实现来修复它的过程
  • 找不到符号 NOTIFICATION_SERVICE?

    package com test app import android app Notification import android app NotificationManager import android app PendingIn
  • 如何使用mockito模拟构建器

    我有一个建造者 class Builder private String name private String address public Builder setName String name this name name retur
  • Spring Rest 和 Jsonp

    我正在尝试让我的 Spring Rest 控制器返回jsonp但我没有快乐 如果我想返回 json 但我有返回的要求 完全相同的代码可以正常工作jsonp我添加了一个转换器 我在网上找到了用于执行 jsonp 转换的源代码 我正在使用 Sp

随机推荐

  • 聚合和聚合根的混淆

    我被分配了一个非常简单的项目作为考试 我有想法使用领域驱动设计来开发它 你们中的许多人可能会说该应用程序非常简单 使用存储库和 UoW 只是浪费时间 您可能是对的 但我认为这是一个学习更多东西的机会 该应用程序是一个 机票 系统 从下图中您
  • 视差部分初始背景位置与页面滚动时不一致

    我花了一整天的时间尝试创建一个视差部分 无论它放在页面上的哪个位置 它都能正常工作 但我对代码所做的每一次更改都会解决一个问题并产生另一个问题 到目前为止 这是我的代码 function use strict parallax sectio
  • 如何在 antd select 上添加分页?因为从接口获取数据是巨大的。所以我想实现分页

    如何在 Antd 的 Select 上添加分页 因为从接口获取数据是巨大的 所以我想实现分页 但文档api不支持 import Select from antd const Option Select Option let province
  • 使用地理定位和 Google 地图 API [帮助]

    我是移动开发领域的新手 现在我正在构建一个使用 jQuery mobile 和 PhoneGap 的应用程序 这是我的逻辑 我有一个包含用户及其地址的表 我获取用户地址并通过地图 API 传递它以捕获位置 但我对每条记录都这样做 有时 AP
  • Firestore Web 代码示例给出了无效的参数类型

    我正在尝试 Firebase 的新 Firestore 当我运行代码示例时https firebase google com docs firestore manage data add data authuser 0 https fire
  • css在表单中浮动2个输入字段

    我在理解 css 时遇到以下问题 我有一张登记表 在这种形式中 我使用字段集 现在我想在每行旁边放置两个输入字段 上面的每个字段还应该有一个标签 所以我想要实现的是 label 1 label 2 label 3 label 4
  • Intellij 自动构建 - 不存在

    如果我删除一个方法 则在我手动构建项目之前 不会显示错误 由于不存在方法而在代码中的其他位置 有没有办法让它在保存 更改时自动构建 我在某个地方缺少一个选项吗 你可以尝试理念 12 EAP http confluence jetbrains
  • TFLite 的硬刷操作

    我有一个用 Tensorflow Keras 编写的自定义神经网络 并应用 Hard swish 函数作为激活 如 MobileNetV3 论文中使用的那样 执行 def swish x return x tf nn relu6 x 3 6
  • 嵌套 jquery 选择器触发父级和子级特定事件

    我有一个具有以下结构的表 表 主 tbody tr Row td 输入 EditRow 我的 jquery 看起来像这样 table Main gt tbody gt tr Row live click function e RowClic
  • PhantomJS/CasperJS AssertExists() 失败

    我试图检查我的网页中是否存在选择器 但 casperjs 从未找到它 我尝试过两种方法 1 无需等待 casper then function search for casperjs from google form this test a
  • 出站 ChannelHandler 的捕获所有异常处理

    在 Netty 中 您有入站和出站处理程序的概念 只需在管道的末尾 尾部 添加一个通道处理程序并实现一个捕获所有入站异常处理程序即可实现exceptionCaught覆盖 如果未沿途处理 沿入站管道发生的异常将沿着处理程序传播 直到遇到最后
  • Visual Studio 2012 的 Intellisense 不再接受按空格键的建议

    我一直在多台 PC 上使用 Visual Studio 2005 2008 2010 由其他人或我安装 始终对它的智能感知建议是这样工作的 我开始打字 弹出带有建议的窗口 突出显示当前建议 然后我可以按空格键接受建议 然而 由于我在当前的
  • 为 Flink 集群中的插件添加自定义依赖项

    我有一个 Flink 会话集群 作业管理器 任务管理器 版本 1 11 1 配置了 log4j console properties 以包含 Kafka 附加程序 此外 在作业管理器和任务管理器中 我都启用了 flink s3 fs had
  • 设置所有行的一列的值非常慢

    我有一个包含大约 350 000 行的表 最近我从 MyISAM 存储引擎更改为 InnoDB 我运行查询 UPDATE users SET online 0 每次我的服务器启动时 使用 MyISAM 时都没有任何问题 该查询通常只影响几百
  • EF 5-6.1.1 中的 NullReferenceException 具有同一类型的两个导航属性

    我想首先我有一个解决这个问题的方法 但我今天花了几个小时找出异常的原因 所以我想我应该分享 给定域中的两个实体 public class User public int Id get set public string Name get s
  • 反应本机 ios:默认情况下 geoloc 的准确性很差

    我正在使用 navigator geolocation watchPosition 和 getCurrentPosition 实现健身追踪器 它在 android 和 ios 模拟器上运行良好 具有 5 10m 精度 但在 iphone 5
  • JavaFX:鼠标剪贴板在 Unix 中不起作用

    您可能知道 在 Unix 中 通常使用鼠标进行复制粘贴操作 IE 文本选择自动将其复制到剪贴板并点击滚动条将其粘贴到您单击的位置 对于使用 Java 7 和 Swing 的 GUI 应用程序 我没有出现剪贴板问题 但在 JavaFX 和 J
  • 如何检测Android设备的芯片组信息? [关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 如何检测android设备的芯片组信息 例如 要禁用某些功能MediaTek芯片组 本问题中讨论的 10
  • 无法从站点获取服务器证书

    我无法从我的网站 其他网站 获取证书 我尝试了一些解决方案HttpsURLConnection和方法getServerCertificates但没有任何办法可以解决问题 URL httpsURL new URL https www goog
  • 如何增加 Flink taskmanager.numberOfTaskSlots 以在没有 Flink 服务器的情况下运行它(在 IDE 或 fat jar 中)

    我有一个关于在 IDE 中运行 Flink 流作业或作为 fat jar 运行而不将其部署到 Flink 服务器的问题 问题是当我的工作中有超过 1 个任务槽时 我无法在 IDE 中运行它 public class StreamingJob