数据流/apache beam 窗口中字节数的触发窗口

2024-03-13

我有一个简单的工作,将数据从 pub sub 移动到 gcs。 pub sub 主题是一个共享主题,具有许多不同大小的不同消息类型

我希望结果在 GCS 中相应地垂直分区:

架构/版本/年/月/日/

该父键下应该是当天的一组文件,并且文件的大小应该合理,即 10-200 mb

我正在使用 scio,并且能够通过 groupby 操作来创建 [String, Iterable[Event]] 的 PCollection,其中键基于上面的分区方案。

我无法使用默认文本接收器,因为它们不支持垂直分区,它只能将整个 pcollection 写入一个位置。相反,请遵循以下答案中的建议:

如何在 Apache Beam 中写入多个文件? https://stackoverflow.com/questions/43291058/how-do-i-write-to-multiple-files-in-apache-beam/43356323

使用 DoFn 使用 Cloud Dataflow 从 PubSub 写入 Google Cloud Storage https://stackoverflow.com/questions/36509116/writing-to-google-cloud-storage-from-pubsub-using-cloud-dataflow-using-dofn

我创建了一个简单的函数,将我的组写入 gcs。

object GcsWriter {

  private val gcs: storage.Storage = StorageOptions.getDefaultInstance.getService

  val EXTENSION = ".jsonl.gz"

  //todo no idea if this is ok. see org.apache.beam.sdk.io.WriteFiles is a ptransform that writes text files and seems very complex
  //maybe beam is aimed at a different use case
  //this is an output 'transform' that writes text files
  //org.apache.beam.sdk.io.TextIO.write().to("output")


  def gzip(bytes: Array[Byte]): Array[Byte] = {
    val byteOutputStream = new ByteArrayOutputStream()
    val compressedStream = new GZIPOutputStream(byteOutputStream)
    compressedStream.write(bytes)
    compressedStream.close()
    byteOutputStream.toByteArray
  }

  def writeAsTextToGcs(bucketName: String, key: String, items: Iterable[String]): Unit = {
    val bytes = items.mkString(start = "",sep ="\n" ,end = "\n").getBytes("UTF-8")
    val compressed = gzip(bytes)
    val blobInfo = BlobInfo.newBuilder(bucketName, key + System.currentTimeMillis() + EXTENSION).build()
    gcs.create(blobInfo, compressed)
  }

}

这可以工作并按照我喜欢的方式写入文件,我使用以下带有固定窗口的触发规则:

val WINDOW_DURATION: Duration = Duration.standardMinutes(10)
  val WINDOW_ELEMENT_MAX_COUNT = 5000
  val LATE_FIRING_DELAY: Duration = Duration.standardMinutes(10) //this is the time after receiving late data to refiring
  val ALLOWED_LATENESS: Duration = Duration.standardHours(1)


  val WINDOW_OPTIONS = WindowOptions(
    trigger = AfterFirst.of(
      ListBuffer(
        AfterPane.elementCountAtLeast(WINDOW_ELEMENT_MAX_COUNT),
        AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(LATE_FIRING_DELAY)))),
    allowedLateness = ALLOWED_LATENESS,
    accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
  )

基本上是根据水印在窗口末尾或收到 x 元素时的复合触发。

问题在于源数据可能包含不同大小的消息。因此,如果我选择固定数量的元素来触发,我将:

1)选择太大的数字,对于较大的事件组,它会炸毁worker上的java堆 2)选择一个较小的数字,然后我最终会得到一些用于安静事件的小文件,我希望在文件中积累更多事件。

我没有看到可以传递 lambda 的自定义触发器,该 lambda 可以输出每个元素或类似内容的指标。有没有一种方法可以实现我自己的触发器来触发窗口中的字节数。

其他一些问题

我假设每个组中元素的迭代器位于内存中而不是从存储中流式传输,是否正确?如果不是,我可以以更内存有效的方式从迭代器流式传输到 gcs

对于我的 GCS 作家来说,我只是在地图或 ParDo 中进行操作。它没有实现文件输出接收器,也没有看起来像 TextIo。这个简单的实现会出现问题吗?在文档中,它说如果转换抛出异常,它会简单地重试(对于流应用程序无限期地重试)


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

数据流/apache beam 窗口中字节数的触发窗口 的相关文章

随机推荐

  • 如何根据opencv中的某些条件修改Mat的值?

    在Matlab中a a gt 50 0可以替换所有元素a大于 50 到 0 我想对 Mat 做同样的事情OpenCV https www mathworks com matlabcentral fileexchange 47953 comp
  • 从 Excel 导入数据时出现 SQL 错误 [已关闭]

    Closed 这个问题需要调试细节 help minimal reproducible example 目前不接受答案 我正在从 Excel 工作表导入数据 我正在努力解决以下问题 执行 错误 消息错误 0xc020901c 数据流任务 1
  • 递归地检查给定字符串是否是平衡括号字符串

    作为 java 新手 以及编程新手 我在处理分配给我们的作业时遇到了麻烦 作业分为 3 部分 以检查给定字符串是否具有平衡括号 规则 如下 abcdefksdhgs 是平衡的 aaa
  • jquery 数据表固定列未定义

    我正在尝试使用jquery 数据表插件 http www datatables net 和以下固定列示例 http www datatables net extras fixedcolumns 但我收到错误Error ReferenceEr
  • NHibernate 合并问题

    我试图用 NHibernate 表达以下 SQL 查询 DECLARE date DATETIME NULL SELECT ER Id ER DocumentDate FROM ExpenseReport ER WHERE ER Perio
  • 容器模板参数的Value_type

    在他今年的 Going Native 主题演讲中C 的本质 http channel9 msdn com Events GoingNative 2013 Opening Keynote Bjarne Stroustrup 转至 40 30
  • 如何确定 upsert 是否是 PostgreSQL 9.5+ UPSERT 的更新?

    可写 CTE 在 9 5 之前被认为是 UPSERT 的解决方案 如中所述在 PostgreSQL 中重复更新时插入 https stackoverflow com questions 1109061 insert on duplicate
  • 现有 R 图中的子图

    我有一个如下图所示的情节 对于这个图 我想在图中的某处添加类似的线图 右下或左下 我正在使用的子图的命令是 plot 1 121 sample 1 121 type l 它绘制在第一个的顶部 我需要它作为一个小图 位于左下角或右下角 有人可
  • 从外部 html 文件加载handlebars.js 模板没有显示任何内容

    这个问题有点考验我的 JS 技能 所以我可能会像白痴一样解释它 这是我的 JavaScript 用于从服务器获取 json 并尝试将其推送到模板中 Server Interface Start Access the web api for
  • 我可以使用 Xcode 模拟器测试重大变化吗?

    我想知道是否可以在 Xcode Simulator 中测试重大更改位置服务 startMonitoringSignificantLocationChanges 方法 或者它仅适用于实际设备 请注意 我已经在模拟器中尝试过它 但它不起作用 但
  • R 中取消嵌套列表并连接

    我希望取消嵌套 展平 并连接文本中的字符串 逗号分隔 tibble 示例数据 library tidyverse tibble person c Alice Bob Mary score list c Red Green Blue c Or
  • UTC 日期/时间字符串到时区

    如何将 UTC 日期 时间字符串 例如 2011 01 01 15 00 00 转换为 php 支持的任何给定时区 例如 America New York 或 Europe San Marino PHP s DateTime http ph
  • 如何在根据 XML 架构验证 XML 文件时获取错误的行号

    我正在尝试根据 W3C XML 架构验证 XML 以下代码完成该工作并在发生错误时报告 但我无法获取错误的行号 它总是返回 1 有没有简单的方法来获取行号 import java io File import javax xml XMLCo
  • Android 智能手机与其他设备之间的直接 Wifi 通信

    我想在 Android 设备和另一个设备 不是另一个 Android 智能手机 而是使用 C 实现的带 wifi 的设备 之间建立通信 通过 WIFI 我已经发现android提供了直接无线网络 http developer android
  • 获取两个表单以内联方式显示

    这不起作用
  • 在intellij中重新附加源?

    我希望调试到 jaxb impl 的源代码 我下载了版本 2 2 6 附加了源代码 然后意识到我正在调试的应用程序正在使用 jaxb impl 版本 2 2 3 现在我有了正确版本的源代码 2 2 3 我不知道如何从 2 2 6 源代码中删
  • 为什么 PHP 不使用 Internet Explorer 为特定用户保存会话变量?

    我的网站存在问题 其中 PHP 不为使用 Internet Explorer 的特定用户保存会话变量 但对于其他一些使用 Internet Explorer 的用户来说完全没有问题 使用其他浏览器的用户也没有任何问题 我创建了以下三个小脚本
  • 列表框,同步滚动

    我有两个列表框 大小相同 彼此相邻 基本上我用它们来表示链接的项目 类似于 Excel 中的 2 列和多行 其中一个是名字 另一个是姓氏 我想知道是否可以使一个列表框滚动时 另一个列表框与其同步滚动 希望这是有道理的 先谢谢了 None
  • EWS - 如何找到所有未完成的任务?

    我正在使用 Exchange Web 服务尝试获取所有未完成的 Outlook 任务的列表 我有一个 ExchangeService 实例 并尝试查找所有未完成的任务 如下所示 SearchFilter searchFilter new S
  • 数据流/apache beam 窗口中字节数的触发窗口

    我有一个简单的工作 将数据从 pub sub 移动到 gcs pub sub 主题是一个共享主题 具有许多不同大小的不同消息类型 我希望结果在 GCS 中相应地垂直分区 架构 版本 年 月 日 该父键下应该是当天的一组文件 并且文件的大小应