我有一个简单的工作,将数据从 pub sub 移动到 gcs。 pub sub 主题是一个共享主题,具有许多不同大小的不同消息类型
我希望结果在 GCS 中相应地垂直分区:
架构/版本/年/月/日/
该父键下应该是当天的一组文件,并且文件的大小应该合理,即 10-200 mb
我正在使用 scio,并且能够通过 groupby 操作来创建 [String, Iterable[Event]] 的 PCollection,其中键基于上面的分区方案。
我无法使用默认文本接收器,因为它们不支持垂直分区,它只能将整个 pcollection 写入一个位置。相反,请遵循以下答案中的建议:
如何在 Apache Beam 中写入多个文件? https://stackoverflow.com/questions/43291058/how-do-i-write-to-multiple-files-in-apache-beam/43356323
使用 DoFn 使用 Cloud Dataflow 从 PubSub 写入 Google Cloud Storage https://stackoverflow.com/questions/36509116/writing-to-google-cloud-storage-from-pubsub-using-cloud-dataflow-using-dofn
我创建了一个简单的函数,将我的组写入 gcs。
object GcsWriter {
private val gcs: storage.Storage = StorageOptions.getDefaultInstance.getService
val EXTENSION = ".jsonl.gz"
//todo no idea if this is ok. see org.apache.beam.sdk.io.WriteFiles is a ptransform that writes text files and seems very complex
//maybe beam is aimed at a different use case
//this is an output 'transform' that writes text files
//org.apache.beam.sdk.io.TextIO.write().to("output")
def gzip(bytes: Array[Byte]): Array[Byte] = {
val byteOutputStream = new ByteArrayOutputStream()
val compressedStream = new GZIPOutputStream(byteOutputStream)
compressedStream.write(bytes)
compressedStream.close()
byteOutputStream.toByteArray
}
def writeAsTextToGcs(bucketName: String, key: String, items: Iterable[String]): Unit = {
val bytes = items.mkString(start = "",sep ="\n" ,end = "\n").getBytes("UTF-8")
val compressed = gzip(bytes)
val blobInfo = BlobInfo.newBuilder(bucketName, key + System.currentTimeMillis() + EXTENSION).build()
gcs.create(blobInfo, compressed)
}
}
这可以工作并按照我喜欢的方式写入文件,我使用以下带有固定窗口的触发规则:
val WINDOW_DURATION: Duration = Duration.standardMinutes(10)
val WINDOW_ELEMENT_MAX_COUNT = 5000
val LATE_FIRING_DELAY: Duration = Duration.standardMinutes(10) //this is the time after receiving late data to refiring
val ALLOWED_LATENESS: Duration = Duration.standardHours(1)
val WINDOW_OPTIONS = WindowOptions(
trigger = AfterFirst.of(
ListBuffer(
AfterPane.elementCountAtLeast(WINDOW_ELEMENT_MAX_COUNT),
AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(LATE_FIRING_DELAY)))),
allowedLateness = ALLOWED_LATENESS,
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
)
基本上是根据水印在窗口末尾或收到 x 元素时的复合触发。
问题在于源数据可能包含不同大小的消息。因此,如果我选择固定数量的元素来触发,我将:
1)选择太大的数字,对于较大的事件组,它会炸毁worker上的java堆
2)选择一个较小的数字,然后我最终会得到一些用于安静事件的小文件,我希望在文件中积累更多事件。
我没有看到可以传递 lambda 的自定义触发器,该 lambda 可以输出每个元素或类似内容的指标。有没有一种方法可以实现我自己的触发器来触发窗口中的字节数。
其他一些问题
我假设每个组中元素的迭代器位于内存中而不是从存储中流式传输,是否正确?如果不是,我可以以更内存有效的方式从迭代器流式传输到 gcs
对于我的 GCS 作家来说,我只是在地图或 ParDo 中进行操作。它没有实现文件输出接收器,也没有看起来像 TextIo。这个简单的实现会出现问题吗?在文档中,它说如果转换抛出异常,它会简单地重试(对于流应用程序无限期地重试)