我正在尝试编写一个代码,当温度高于阈值温度(如代码中定义)时创建警报,但键控流正在产生问题。我是 flink 的新手,也是 scala 的中间人。我需要这段代码的帮助
我几乎尝试了一切
def main(args: Array[String]): Unit = {
val TEMPERATURE_THRESHOLD: Double = 50.00
val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
val src = see.addSource(new FlinkKafkaConsumer010("broadcast",
new JSONKeyValueDeserializationSchema(true), properties))
var data = src.map { v => {
val loc = v.get("locationID").asInstanceOf[String]
val temperature = v.get("temp").asDouble()
//val json = Map("locationID" -> locationID, "temp" -> v.temp)
//val jsonVal = JSONObject(json).toString()
(loc, temperature)
}}
data = data
.keyBy(
v => v._1
)
{"locationID": "ASK", "temp": 35} {"locationID": "BC", "temp": 45} {"locationID": "CHD", "temp": 55} {"locationID": "RAJ", "temp": 65} {"locationID": "EGY", "temp": 55}
我认为我的元组从某处获取空值,我需要帮助忽略该错误
Error:-
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
...
at flinkBroadcast1$.main(flinkBroadcast1.scala:59)
at flinkBroadcast1.main(flinkBroadcast1.scala)
Caused by: java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
...
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
...
Caused by: java.lang.NullPointerException