我正在尝试使用 Spark 制作一个结构化流应用程序,主要思想是从 kafka 源中读取,处理输入,写回到另一个主题。我已经成功地使 Spark 从 kafka 读取和写入,但是我的问题在于处理部分。我已经尝试使用 foreach 函数来捕获每一行并在写回 kafka 之前对其进行处理,但是它始终只执行 foreach 部分,并且从不写回 kafka。然而,如果我从写入流中删除 foreach 部分,它会继续写入,但现在我失去了处理能力。
如果有人能给我一个关于如何做到这一点的例子,我将非常感激。
这是我的代码
spark = SparkSession \
.builder \
.appName("StructuredStreamingTrial") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "KafkaStreamingSource") \
.load()
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
.writeStream \
.outputMode("update") \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "StreamSink") \
.option("checkpointLocation", "./testdir")\
.foreach(foreach_function)
.start().awaitTermination()
and the foreach_function
简单地说就是
def foreach_function(df):
try:
print(df)
except:
print('fail')
pass
在基于 Pyspark 的结构化流 API 中写入 Kafka 接收器之前处理数据,我们可以使用 UDF 函数轻松处理任何类型的复杂转换。
示例代码如下。此代码尝试读取 JSON 格式消息 Kafka 主题并解析该消息,将消息从 JSON 转换为 CSV 格式并重写到另一个主题中。您可以处理任何处理转换来代替“json_formatted”函数。
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.functions import col, struct
from pyspark.sql.functions import udf
import json
import csv
import time
import os
# Spark Streaming context :
spark = SparkSession.builder.appName('pda_inst_monitor_status_update').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 20)
# Creating readstream DataFrame :
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "KafkaStreamingSource") \
.load()
df1 = df.selectExpr( "CAST(value AS STRING)")
df1.registerTempTable("test")
def json_formatted(s):
val_dict = json.loads(s)
return str([
val_dict["after"]["ID"]
, val_dict["after"]["INST_NAME"]
, val_dict["after"]["DB_UNIQUE_NAME"]
, val_dict["after"]["DBNAME"]
, val_dict["after"]["MON_START_TIME"]
, val_dict["after"]["MON_END_TIME"]
]).strip('[]').replace("'","").replace('"','')
spark.udf.register("JsonformatterWithPython", json_formatted)
squared_udf = udf(json_formatted)
df1 = spark.table("test")
df2 = df1.select(squared_udf("value"))
# Declaring the Readstream Schema DataFrame :
df2.coalesce(1).writeStream \
.writeStream \
.outputMode("update") \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "StreamSink") \
.option("checkpointLocation", "./testdir")\
.start()
ssc.awaitTermination()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)