我鼓励你使用 Scalacase classes
用于数据建模。
final case class Product(name: String, catalogNumber: String, cas: String, formula: String, weight: Double, mld: String)
现在你可以拥有一个List
of Product
在记忆中:
val inMemoryRecords: List[Product] = List(
Product("Cyclohexanecarboxylic acid", " D19706", "1148027-03-5", "C(11)H(13)Cl(2)NO(5)", 310.131, "MFCD11226417"),
Product("Tacrolimus", "G51159", "104987-11-3", "C(44)H(69)NO(12)", 804.018, "MFCD00869853"),
Product("Methanol", "T57494", "173310-45-7", "C(8)H(8)Cl(2)O", 191.055, "MFCD27756662")
)
The 结构化流API https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html通过使用众所周知的方法可以很容易地推理流处理Dataset[T]
抽象。粗略地说,您只需要担心三件事:
-
Source https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources:源可以生成输入数据流,我们可以将其表示为
Dataset[Input]
。每个新数据项Input
到达的数据将被附加到这个无界数据集中。您可以根据需要操纵数据(例如Dataset[Input]
=> Dataset[Output]
).
-
流式查询 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#starting-streaming-queries and Sink https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks:查询生成一个结果表,该结果表在每个触发间隔从源更新。更改被写入称为接收器的外部存储中。
-
输出方式 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes:可以通过不同的模式将数据写入 Sink:完整模式、追加模式和更新模式。
假设您想了解分子量大于 200 单位的产品。
正如您所说,使用批处理 API 相当简单直接:
// Create an static dataset using the in-memory data
val staticData: Dataset[Product] = spark.createDataset(inMemoryRecords)
// Processing...
val result: Dataset[Product] = staticData.filter(_.weight > 200)
// Print results!
result.show()
使用 Streaming API 时,您只需要定义一个source
and a sink
作为额外的步骤。在这个例子中,我们可以使用MemoryStream
和console
水槽打印结果。
// Create an streaming dataset using the in-memory data (memory source)
val productSource = MemoryStream[Product]
productSource.addData(inMemoryRecords)
val streamingData: Dataset[Product] = productSource.toDS()
// Processing...
val result: Dataset[Product] = streamingData.filter(_.weight > 200)
// Print results by using the console sink.
val query: StreamingQuery = result.writeStream.format("console").start()
// Stop streaming
query.awaitTermination(timeoutMs=5000)
query.stop()
请注意,staticData
和streamingData
具有确切的类型签名(即Dataset[Product]
)。这使得我们无论使用 Batch 还是 Streaming API 都可以应用相同的处理步骤。您还可以考虑实现一个通用方法def processing[In, Out](inputData: Dataset[In]): Dataset[Out] = ???
以避免在这两种方法中重复自己。
完整代码示例:
object ExMemoryStream extends App {
// Boilerplate code...
val spark: SparkSession = SparkSession.builder
.appName("ExMemoryStreaming")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
implicit val sqlContext: SQLContext = spark.sqlContext
// Define your data models
final case class Product(name: String, catalogNumber: String, cas: String, formula: String, weight: Double, mld: String)
// Create some in-memory instances
val inMemoryRecords: List[Product] = List(
Product("Cyclohexanecarboxylic acid", " D19706", "1148027-03-5", "C(11)H(13)Cl(2)NO(5)", 310.131, "MFCD11226417"),
Product("Tacrolimus", "G51159", "104987-11-3", "C(44)H(69)NO(12)", 804.018, "MFCD00869853"),
Product("Methanol", "T57494", "173310-45-7", "C(8)H(8)Cl(2)O", 191.055, "MFCD27756662")
)
// Defining processing step
def processing(inputData: Dataset[Product]): Dataset[Product] =
inputData.filter(_.weight > 200)
// STATIC DATASET
val datasetStatic: Dataset[Product] = spark.createDataset(inMemoryRecords)
println("This is the static dataset:")
processing(datasetStatic).show()
// STREAMING DATASET
val productSource = MemoryStream[Product]
productSource.addData(inMemoryRecords)
val datasetStreaming: Dataset[Product] = productSource.toDS()
println("This is the streaming dataset:")
val query: StreamingQuery = processing(datasetStreaming).writeStream.format("console").start()
query.awaitTermination(timeoutMs=5000)
// Stop query and close Spark
query.stop()
spark.close()
}