您可以使用 Google.Cloud.BigQuery.V2 中具有 CreateLoadJob 方法的 BigQueryClient 创建 csv 加载作业,以从 Google Cloud Storage 中的 csv 文件加载数据。
如何保证此 API 的幂等性,以确保在获得响应之前网络断开且您开始重试时,不会导致相同的数据多次加载到 BigQuery 中?
API 使用示例
private void LoadCsv(string sourceUri, string tableId, string timePartitionField)
{
var tableReference = new TableReference()
{
DatasetId = _dataSetId,
ProjectId = _projectId,
TableId = tableId
};
var options = new CreateLoadJobOptions
{
WriteDisposition = WriteDisposition.WriteAppend,
CreateDisposition = CreateDisposition.CreateNever,
SkipLeadingRows = 1,
SourceFormat = FileFormat.Csv,
TimePartitioning = new TimePartitioning
{
Type = _partitionByDayType,
Field = timePartitionField
}
};
BigQueryJob loadJob = _bigQueryClient.CreateLoadJob(sourceUri: sourceUri,
destination: tableReference,
schema: null,
options: options);
loadJob.PollUntilCompletedAsync().Wait();
if (loadJob.Status.Errors == null || !loadJob.Status.Errors.Any())
{
//Log success
return;
}
//Log error
}
您可以通过基于例如生成您自己的 jobid 来实现幂等性您加载的文件位置和目标表。
job_id = 'my_load_job_{}'.format(hashlib.md5(sourceUri+_projectId+_datasetId+tableId).hexdigest())
var options = new CreateLoadJobOptions
{
WriteDisposition = WriteDisposition.WriteAppend,
CreateDisposition = CreateDisposition.CreateNever,
SkipLeadingRows = 1,
JobId = job_id, #add this
SourceFormat = FileFormat.Csv,
TimePartitioning = new TimePartitioning
{
Type = _partitionByDayType,
Field = timePartitionField
}
};
在这种情况下,如果您尝试重新插入相同的 job_id,则会出现错误。
您还可以轻松生成此 job_id 以检查池是否失败。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)