我想在带有 python 的数据流模板中使用 FireStore。
我做过这样的事情:
with beam.Pipeline(options=options) as p:
(p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(sub).with_output_types(bytes)
| 'String to dictionary' >> beam.Map(firestore_update_multiple)
)
这是正确的使用方式吗?
额外的信息
def firestore_update_multiple(row):
from google.cloud import firestore
db = firestore.Client()
doc_ref = db.collection(u'data').document(u'one')
doc_ref.update({
u'arrayExample': u'DataflowRunner',
u'booleanExample': True
})
总体思路是正确的,但您应该考虑减少分配 firestore 连接,并批量调用。下面是一个应该执行此操作的 ParDo 示例:
class FirestoreUpdateDoFn(beam.DoFn):
def __init__(self, max_batch_size=500):
self.element_batch = []
self.max_batch_size = max_batch_size
def start_bundle(self):
self.db = firestore.Client()
self.batch = db.batch()
self.some_ref = db.collection(...)
def process(self, row):
self.element_batch.append(row)
if len(self.element_batch) >= self.max_batch_size:
self._flush_updates()
def finish_bundle(self):
self._flush_updates()
self.db.close()
def _flush_updates(self):
for elm in self.element_batch:
self.batch.update(...)
batch.commit()
这应该可以让您减少对 Firestore 的往返调用,并使管道速度更快。然后你会做这样的事情:
with beam.Pipeline(options=options) as p:
(p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(sub)
.with_output_types(bytes)
| 'String to dictionary' >> beam.ParDo(FirestoreUpdateDoFn())
)
查看:
- 有关批量写入的 Firestore 文档 https://firebase.google.com/docs/firestore/manage-data/transactions#batched-writes
PyDoc 与批处理 API https://googleapis.github.io/google-cloud-python/latest/firestore/batch.html
如果你有心情的话可以看看的代码PubSubUnboundedSink https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java#L237,它执行与您尝试执行的操作相同的操作:在流式传输上运行时高效写入外部服务
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)