Spark 结构化流内存流 + 行 + 编码器问题

2024-01-19

我正在尝试使用 Spark 结构化流在本地计算机上运行一些测试。

在批处理模式下,这是我正在处理的行:

val recordSchema = StructType(List(StructField("Record", MapType(StringType, StringType), false)))
val rows         = List(
    Row(
      Map("ID" -> "1",
        "STRUCTUREID" -> "MFCD00869853",
        "MOLFILE" -> "The MOL Data",
        "MOLWEIGHT" -> "803.482",
        "FORMULA" -> "C44H69NO12",
        "NAME" -> "Tacrolimus",
        "HASH" -> "52b966c551cfe0fa7d526bac16abcb7be8b8867d",
        "SMILES" -> """[H][C@]12O[C@](O)([C@H](C)C[C@@H]1OC)""",
        "METABOLISM" -> "The metabolism 500"
       )),
    Row(
      Map("ID" -> "2",
        "STRUCTUREID" -> "MFCD00869854",
        "MOLFILE" -> "The MOL Data",
        "MOLWEIGHT" -> "603.482",
        "FORMULA" -> "",
        "NAME" -> "Tacrolimus2",
        "HASH" -> "52b966c551cfe0fa7d526bac16abcb7be8b8867d",
        "SMILES" -> """[H][C@]12O[C@](O)([C@H](C)C[C@@H]1OC)""",
        "METABOLISM" -> "The metabolism 500"
      ))
  )
val df  = spark.createDataFrame(spark.sparkContext.parallelize(rows), recordSchema)

在 Batch more 中使用它是一种魅力,没有问题。

现在我尝试使用 MemoryStream 进入流模式进行测试。我添加了以下内容:

implicit val ctx = spark.sqlContext
val intsInput = MemoryStream[Row]

但编译器抱怨如下:

未找到参数证据$1的隐式:编码器[行]

因此,我的问题是:我应该在这里做什么才能使其正常工作

我还看到,如果添加以下导入,错误就会消失:

导入spark.implicits._

实际上,我现在收到以下警告而不是错误

参数证据 $1 的不明确隐式:编码器 [行]

我不太了解编码器机制,如果有人可以向我解释如何不使用这些隐式,我将不胜感激。原因是当涉及到从 Rows 创建 DataFrame 时,我在一本书中红色了以下内容。

推荐方法:

val myManualSchema = new StructType(Array(
  new StructField("some", StringType, true),
  new StructField("col", StringType, true),
  new StructField("names", LongType, false)))
val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)
myDf.show()

然后作者继续这样说:

在Scala中,我们还可以利用Spark的隐含功能 控制台(如果您将它们导入 JAR 代码中),方法是在 序列类型。这不适用于空类型,所以它不是 必须推荐用于生产用例。

val myDF = Seq(("Hello", 2, 1L)).toDF("col1", "col2", "col3")

如果有人可以花时间解释当我使用隐式时在我的场景中发生了什么,并且这样做是否相当安全,或者是否有一种方法可以更明确地做到这一点而不导入隐式。

最后,如果有人能给我指点关于编码器和 Spark 类型映射的好文档,那就太好了。

EDIT1

我终于可以使用它了

  implicit val ctx = spark.sqlContext
  import spark.implicits._
  val rows = MemoryStream[Map[String,String]]
  val df = rows.toDF()

尽管我的问题是我对自己所做的事情没有信心。在我看来,就像在某些情况下我需要创建一个 DataSet 才能将其转换为 DF[ROW] 并进行 toDF 转换。我知道使用 DS 是类型安全的,但比使用 DF 慢。那么为什么要使用 DataSet 这个中介呢?这不是我第一次在 Spark 结构化流中看到这一点。再说一次,如果有人能帮助我解决这些问题,那就太好了。


我鼓励你使用 Scalacase classes用于数据建模。

final case class Product(name: String, catalogNumber: String, cas: String, formula: String, weight: Double, mld: String)

现在你可以拥有一个List of Product在记忆中:

  val inMemoryRecords: List[Product] = List(
    Product("Cyclohexanecarboxylic acid", " D19706", "1148027-03-5", "C(11)H(13)Cl(2)NO(5)", 310.131, "MFCD11226417"),
    Product("Tacrolimus", "G51159", "104987-11-3", "C(44)H(69)NO(12)", 804.018, "MFCD00869853"),
    Product("Methanol", "T57494", "173310-45-7", "C(8)H(8)Cl(2)O", 191.055, "MFCD27756662")
  )

The 结构化流API https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html通过使用众所周知的方法可以很容易地推理流处理Dataset[T]抽象。粗略地说,您只需要担心三件事:

  • Source https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources:源可以生成输入数据流,我们可以将其表示为Dataset[Input]。每个新数据项Input到达的数据将被附加到这个无界数据集中。您可以根据需要操纵数据(例如Dataset[Input] => Dataset[Output]).
  • 流式查询 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#starting-streaming-queries and Sink https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks:查询生成一个结果表,该结果表在每个触发间隔从源更新。更改被写入称为接收器的外部存储中。
  • 输出方式 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes:可以通过不同的模式将数据写入 Sink:完整模式、追加模式和更新模式。

假设您想了解分子量大于 200 单位的产品。

正如您所说,使用批处理 API 相当简单直接:

// Create an static dataset using the in-memory data
val staticData: Dataset[Product] = spark.createDataset(inMemoryRecords)

// Processing...
val result: Dataset[Product] = staticData.filter(_.weight > 200)

// Print results!
result.show()

使用 Streaming API 时,您只需要定义一个source and a sink作为额外的步骤。在这个例子中,我们可以使用MemoryStreamconsole水槽打印结果。

// Create an streaming dataset using the in-memory data (memory source)
val productSource = MemoryStream[Product]
productSource.addData(inMemoryRecords)

val streamingData: Dataset[Product] = productSource.toDS()

// Processing...
val result: Dataset[Product] = streamingData.filter(_.weight > 200)

// Print results by using the console sink. 
val query: StreamingQuery = result.writeStream.format("console").start()

// Stop streaming
query.awaitTermination(timeoutMs=5000)
query.stop()

请注意,staticDatastreamingData具有确切的类型签名(即Dataset[Product])。这使得我们无论使用 Batch 还是 Streaming API 都可以应用相同的处理步骤。您还可以考虑实现一个通用方法def processing[In, Out](inputData: Dataset[In]): Dataset[Out] = ???以避免在这两种方法中重复自己。

完整代码示例:

object ExMemoryStream extends App {

  // Boilerplate code...
  val spark: SparkSession = SparkSession.builder
    .appName("ExMemoryStreaming")
    .master("local[*]")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  import spark.implicits._
  implicit val sqlContext: SQLContext = spark.sqlContext

  // Define your data models 
  final case class Product(name: String, catalogNumber: String, cas: String, formula: String, weight: Double, mld: String)

  // Create some in-memory instances
  val inMemoryRecords: List[Product] = List(
    Product("Cyclohexanecarboxylic acid", " D19706", "1148027-03-5", "C(11)H(13)Cl(2)NO(5)", 310.131, "MFCD11226417"),
    Product("Tacrolimus", "G51159", "104987-11-3", "C(44)H(69)NO(12)", 804.018, "MFCD00869853"),
    Product("Methanol", "T57494", "173310-45-7", "C(8)H(8)Cl(2)O", 191.055, "MFCD27756662")
  )

  // Defining processing step
  def processing(inputData: Dataset[Product]): Dataset[Product] =
    inputData.filter(_.weight > 200)

  // STATIC DATASET
  val datasetStatic: Dataset[Product] = spark.createDataset(inMemoryRecords)

  println("This is the static dataset:")
  processing(datasetStatic).show()

  // STREAMING DATASET
  val productSource = MemoryStream[Product]
  productSource.addData(inMemoryRecords)

  val datasetStreaming: Dataset[Product] = productSource.toDS()

  println("This is the streaming dataset:")
  val query: StreamingQuery = processing(datasetStreaming).writeStream.format("console").start()
  query.awaitTermination(timeoutMs=5000)
  
  // Stop query and close Spark
  query.stop()
  spark.close()

}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 结构化流内存流 + 行 + 编码器问题 的相关文章

随机推荐

  • 将服务器端事件添加到扩展器控件

    我有一个扩展控件 可以提升文本框的OnTextChanged用户完成输入后 500 毫秒发生事件 问题在于OnTextChanged当文本框失去焦点时引发 这会导致问题 因为回发 我想做的是给扩展器控件它自己的服务器端事件 比如说 OnDe
  • 如何使用 ggplot2 剪切、裁剪或白色填充紧紧包围多边形外部的矩形

    我只是想用白色填充简单多边形之外的区域 出于某种原因 它在中心画了一根奇怪的木桩 就像它认为这是一个吸血鬼杀手或其他什么东西一样 搞砸了 我尝试跟随这个帖子 https stackoverflow com questions 2128664
  • Vue.js 路由器:历史模式和 AWS S3 (RoutingRules)

    我有一个使用 Amazon S3 和 Cloudflare 启动并运行的 Vue js 应用程序 当我打开索引并浏览到 dashboard 时 一切正常 但是 当我直接在新选项卡中打开仪表板之类的路线或刷新页面时 我从 S3 收到以下错误
  • RoR 设计:sign_in 总是返回无效的电子邮件/密码

    每次登录时 我都会收到错误消息 表明电子邮件 密码无效 routes devise for users devise scope users do get users sign out gt devise sessions destroy
  • 最好的异常处理策略应该是什么

    我正在开发用户从 UI 调用方法的应用程序 在此我从业务类调用一个方法 该方法调用另一个方法 用户界面 gt 方法1 gt 方法2 gt 方法3 如果任何方法中发生任何异常 我想向用户显示错误消息 我应该直接向调用者方法抛出异常吗 在 UI
  • Discord JS - 如何对同一个嵌入多次做出反应?

    我只拿到了第一个 钱袋子 表情符号对频道中的最新消息做出反应 这是机器人发送的嵌入 但是 我希望机器人对新嵌入做出反应 钱袋子 and ticket 表情符号 到目前为止它会与 钱袋子 表情符号 但是 当它尝试与 ticket 表情符号 如
  • Angular 6 Firebase 快照返回未定义

    我正在将对象上传到我的数据库 然后尝试检索所有项目 在第二步中我遇到错误 我的对象类 export class Data key string name string address string address2 string pscod
  • Sublime Text 2:如何在不移动光标的情况下向上/向下翻页

    我使用的是 OS X 10 8 4 ST2 当我使用 Home 和 End 键时 视口移动并且光标保持不变 这是标准的 Mac 行为 也是我所期望的 但是 当我使用 Page Up pageup pgup 和 Page Down paged
  • Python3 shebang 线未按预期工作

    我在 Solaris 环境中运行 Python 脚本时遇到以下问题 看来我在 shebang 线上做了一些不正确的事情 但我无法判断这是 Python 3 问题还是命令行问题 但我怀疑它与 shebang 行有某种关系 因为当我在命令行上显
  • “砰”或“!”是什么意思?在 git 命令之前?

    正如您从这段摘录中看到的 有一个 在 git 命令之前 重点是什么 alias commitx git add git commit https stackoverflow com a 8956546 1354543 https stack
  • 如何每 10 秒发出一次 Ajax 请求(长轮询除外)?

    我尝试使用以下命令每 10 秒从服务器请求一个 json 对象 setInterval function ajax url success function data do stuff with data 10000 但这不是很有效 我了解
  • 使用 WordNet 确定两个文本之间的语义相似度?

    如何使用 WordNet 确定 python 中两个文本之间的语义相似度 明显的预处理是删除停用词和词干 但是然后呢 我能想到的唯一方法是计算两个文本中每个单词之间的 WordNet 路径距离 这是一元语法的标准 但这些都是大型 400 个
  • 无法解析符号“FusedLocationProviderClient”

    我有一个错误无法解析符号 FusedLocationProviderClient 声明时 private FusedLocationProviderClient mFusedLocationClient 这里也问同样的问题无法解析符号 Fu
  • sqlalchemy 按计数列过滤

    我有一个用户查询 它按每个用户拥有的订单数量 ordersCount 进行过滤 User query filter ordersCount gt 2 如果我运行它 它会显示 where 子句 中的未知列 ordersCount 根据我的经验
  • 捕获文本框滚动事件?

    Textbox or richtextbox 我唯一想要的就是当滚动条移动时触发一个函数 我已经找到了GetScrollPos and SetScrollPos 我想过定期检查滚动条位置 但必须有更好的方法 那么 使用 WinForms 更
  • Spring MVC 和 Velocity:模板结构

    我想实现这种模板功能 有一个模板 定义完整 x html 网页的页眉 页脚和公共部分 当返回字符串时 Controller它将定义包含到模板特定部分的视图 像这样 控制器 RequestMapping value method Reques
  • EF Core 迁移错误:“无法创建‘ApplicationContext’类型的对象”

    我尝试使用 EF Core 进行迁移 但收到错误 如何修复此错误 PM gt add migration ini 无法创建 ApplicationContext 类型的对象 添加一个 IDesignTimeDbContextFactory
  • 在iPhone中使用带有userid参数的base64 haxcode将图像上传到服务器

    我正在使用这段代码 但问题是它在 nsdata 转换块中对 Haxcode 进行编码 我想发送相同的代码 我用 userid 得到的代码是固定整数 请帮忙 NSData imageData NSData dataWithData UIIma
  • 表达式树 - 不必要的转换为 int32

    在处理字节和短整型时 表达式树似乎构建了不必要的转换 它们将两侧 例如在二进制表达式中 转换为 int32 这是我见过的一些 Linq 提供程序中的一个问题 每个提供程序都必须剥离这个冗余层才能得到原始表达式 NHibernate 不会删除
  • Spark 结构化流内存流 + 行 + 编码器问题

    我正在尝试使用 Spark 结构化流在本地计算机上运行一些测试 在批处理模式下 这是我正在处理的行 val recordSchema StructType List StructField Record MapType StringType