如何在 flink-kafka 中省略空值异常,任何帮助都可以

2023-12-07

我正在尝试编写一个代码,当温度高于阈值温度(如代码中定义)时创建警报,但键控流正在产生问题。我是 flink 的新手,也是 scala 的中间人。我需要这段代码的帮助

我几乎尝试了一切

 def main(args: Array[String]): Unit = {
    val TEMPERATURE_THRESHOLD: Double = 50.00

    val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")

    val src = see.addSource(new FlinkKafkaConsumer010("broadcast",
      new JSONKeyValueDeserializationSchema(true), properties))


    var data = src.map { v => {
      val loc = v.get("locationID").asInstanceOf[String]
      val temperature = v.get("temp").asDouble()
    //val json = Map("locationID" -> locationID, "temp" -> v.temp)
    //val jsonVal = JSONObject(json).toString()
      (loc, temperature)
    }}
 data = data
      .keyBy(
        v => v._1
      )
{"locationID": "ASK", "temp": 35} {"locationID": "BC", "temp": 45} {"locationID": "CHD", "temp": 55} {"locationID": "RAJ", "temp": 65} {"locationID": "EGY", "temp": 55}

我认为我的元组从某处获取空值,我需要帮助忽略该错误

Error:-

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
...

    at flinkBroadcast1$.main(flinkBroadcast1.scala:59)
    at flinkBroadcast1.main(flinkBroadcast1.scala)
Caused by: java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
...
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
...
Caused by: java.lang.NullPointerException

None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 flink-kafka 中省略空值异常,任何帮助都可以 的相关文章

随机推荐

  • 如果将新分区添加到 Kafka 主题,消费者偏移量会发生什么?

    当添加新分区时 消费者偏移量会发生什么 它保持不变吗 是的 它保持不变 为每个分区单独维护一个偏移量 因此您的新分区的偏移量将从 0 开始 并且不会影响其他偏移量
  • 在 matplotlib 中使用动画的颜色图问题

    I use matplotlib animation对名为 3D 的数组中的数据进行动画处理arr 我使用 h5 文件读取数据h5py图书馆一切都很好 但是 当使用动画时 颜色图卡在数据范围的第一帧中 并且经过一些步骤后 它在绘图时显示非标
  • XY布局JAVA

    Java 有什么类型的 XY 布局吗 所以我可以在 X 和 Y 坐标处设置一个按钮 它应该是那么大等等 因为这个边框布局以及网格和面板的东西让我发疯 它们到处流动并伸展开来 为了使它们变小 您必须将面板放在面板中面板中面板中 将容器的布局设
  • Jplayer Ready功能不触发flash的解决办法

    我正在使用 Jplayer 2 2 0 和 jquery 1 6 在我的应用程序中播放音频文件 音频必须使用 flash 解决方案 但是 当解决方案选项设置为闪烁时 不会触发就绪功能 因此当我播放文件时会出现错误 当谷歌搜索此问题时 推荐的
  • 通过 Windows docker 文件设置 git

    I write Dockerfile这是基于视窗纳米服务器 我需要将 git 添加到该图像中 为了实现它 我做了以下工作 RUN Invoke WebRequest https github com git for windows git
  • 我应该安装哪个版本的 VS 才能在构建服务器上进行单元测试?

    几周后 我们有了第一个 TFS 2013 设置 该设置相当小 并且暂时只有少数 即 10 开发人员需要使用它 它被设置为 测试 看看它是否适合我们的组织 现在 我们还想测试构建服务器功能 我们已经配置了构建服务器并启用了单个代理 MSDN
  • 如何获取android上assets文件夹中的文件属性?

    我想获取android上assets文件夹中的文件属性 但这不像一般操作使用某种方法来获取 任何人都可以帮助我获取文件属性 例如修改日期 我的资产文件有一个 ok txt 请帮我吗 如果我理解正确 我认为这篇博文将回答您的问题 其中代码显示
  • Naudio - 将 32 位 wav 转换为 16 位 wav

    我一直在尝试将 32 位立体声 wav 转换为 16 位单声道 wav 我使用 naudio 来捕获声音 并认为仅使用四个更重要字节中的两个就可以了 这是 DataAvailable 的实现 void waveIn DataAvailabl
  • 使用 Excel VBA 打开 OneDrive 上的文件

    我有一个 Excel 文件 文件 1 我希望在其中使用 VBA 代码打开我的 OneDrive 上的文件 文件 2 很多人会使用文件 1 所以我希望它在后台打开文件 2 我已经为文件 2 创建了一个共享链接 通过此链接 任何人都应该能够访问
  • 多个 FK 关系中仅 1 个约束

    我有一个逻辑表结构 其中Table A与多个其他表有关系 Table B Table C 但从功能上来说Table A can only ever have the FK forBorC 已填充 Valid Table A Record I
  • “错误:无法解析:com.android.support:appcompat-v7:29.0.1”[重复]

    这个问题在这里已经有答案了 我想尝试 Intellij 平台进行 Android 开发 但即使在新鲜之后它也显示 gradle 失败 请帮忙 我尝试过调整线路 implementation com android support appco
  • 通过python将参数传递到批处理文件中

    我正在尝试将参数发送到批处理文件 我有一个批处理文件 HelloWorld bat 它在脚本的各个点总共要求 4 个输入 我尝试使用 subprocess Popen subprocess call 和 os system 但我无法传递参数
  • 如何在 Android 上检查我的互联网访问情况?

    我真正想要的是 当用户没有连接时 我想显示一些对话框 表明他或她没有连接 我尝试将其放在我的 MainActivity 上 但仍然不起作用 public boolean isOnline ConnectivityManager cm Con
  • 在 TC 服务器上部署 Rails 应用程序

    好吧 这是我的问题 我在 TC 服务器上部署了一个 Rails 应用程序 我有一个 WAR 文件 服务器能够呈现通过 WEB INF 读取的数据 我的看法是这样的 现在 仅当 f label 被注释掉并且 html 标签如 用户名 is w
  • 直接在django中使用pymongo

    我正在使用 Django 和 MongoDB 构建一个网站 我们可以使用 2 个流行的 API 框架来连接Django and MongoDB 一个是mongoengine另一个是django mongodb engine 因为最新的mon
  • 使用 Guava 将两个列表压缩到 Java 8 中的不可变多重映射中?

    for 循环看起来像 ImmutableListMultiMap
  • 结构不在内存中

    我创建了一个这样的结构 struct Options double bindableKeys 567 double graphicLocation 150 double textures 300 Options options 在此声明之后
  • 合并 MDI 窗口的菜单条项

    如何合并具有相同菜单名称的父窗体和子窗体的菜单项 将菜单项的 MergeAction 设置为 MatchOnly Added 因为这可能会有点棘手 所以我将添加一个步骤列表来制作一个简单的示例 创建一个新的 Windows 窗体应用程序 添
  • 打开chrome中其他扩展插入的元素的“关闭”shadowRoot

    好吧 chrome中的谷歌翻译扩展 具有弹出功能 它立即显示所选单词的翻译 我想访问弹出窗口显示的那些翻译 但这个弹出元素是shadowRoot 关闭 所以javascript无法访问其内容 我红色一篇关于该主题和作者的文章说 但实际上 没
  • 如何在 flink-kafka 中省略空值异常,任何帮助都可以

    我正在尝试编写一个代码 当温度高于阈值温度 如代码中定义 时创建警报 但键控流正在产生问题 我是 flink 的新手 也是 scala 的中间人 我需要这段代码的帮助 我几乎尝试了一切 def main args Array String