我有一个关于在 IDE 中运行 Flink 流作业或作为 fat jar 运行而不将其部署到 Flink 服务器的问题。
问题是当我的工作中有超过 1 个任务槽时,我无法在 IDE 中运行它。
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
kafkaProperties.setProperty("group.id", "test");
env.setParallelism(1);
DataStream<String> kafkaSource = env
.addSource(new FlinkKafkaConsumer010<>("flink-source", new SimpleStringSchema(), kafkaProperties))
.name("Kafka-Source")
.slotSharingGroup("Kafka-Source");
kafkaSource.print().slotSharingGroup("Print");
env.execute("Flink Streaming Java API Skeleton");
}
}
我知道该作业需要 2 个插槽,并且我可以在 Flink 集群中拥有两个任务管理器,但是如何在 IDE 中本地运行它。
目前,我必须为本地所有操作员指定相同的 slotSharingGroup 名称才能拥有一个插槽。但它并不灵活。
你如何处理?
这是您所描述的一个已知错误。可以找到对应的JIRA问题here https://issues.apache.org/jira/browse/FLINK-8712.
解决这个问题的方法是手动设置任务槽的数量。TaskExecutor
已开始。您可以通过以下方式执行此操作TaskManagerOptions.NUM_TASK_SLOTS
配置选项:
final int parallelism = ...;
final Configuration configuration = new Configuration();
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, configuration);
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)