如何处理来自 S3 的大文件并在 Spring Batch 中使用它

2024-04-10

我有一个 CSV 文件,其中包含数百万条记录,大小约为 2GB。我的用例是从 S3 读取 CSV 文件并对其进行处理。请在下面找到我的代码:

在下面的代码中,我从 S3 存储桶读取文件并使用inputStream直接在 Spring 批处理中平面文件项读取器 reader.setResource(new InputStreamResource(inputStream));

根据此实现,我在内存中保存 2GB 内容并对其进行处理,这不是一种有效的方法 - 有人可以建议从 S3 存储桶中读取大文件并在 S3 存储桶中处理它的有效方法是什么吗?春季批次。

提前感谢您的帮助!谢谢。

@Component
public class GetFileFromS3 {

    public S3ObjectInputStream dowloadFile(String keyName, String bucketName, String region) {
        try {
            AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withClientConfiguration(new ClientConfiguration())
                    .withRegion(region).build();

            S3Object s3object = s3Client.getObject(bucketName, keyName);
            return s3object.getObjectContent();
        } catch (AmazonServiceException e) {
            e.printStackTrace();
        }
        return null;
    }

}




public class SpringBatch {

    @Autowired
    private GetFileFromS3 getFileFromS3;


 @Bean(name = "csvFile")
    public Step step1() {
        return stepBuilderFactory.get("step1").<Employee, Employee>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }

    @Bean
    public FlatFileItemReader<Employee> reader() {
        S3ObjectInputStream inputStream = getFileFromS3.dowloadFile("employee.csv", "testBucket", "us-east-1");
        FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
        reader.setResource(new InputStreamResource(inputStream));
        reader.setLinesToSkip(1);
        reader.setLineMapper(new DefaultLineMapper() {
            {
                setLineTokenizer(new DelimitedLineTokenizer() {
                    {
                        setNames(Employee.fields());
                    }
                });
                setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
                    {
                        setTargetType(Employee.class);
                    }
                });
            }
        });
        return reader;
    }

    @Bean
    public ItemProcessor<Employee, Employee> processor() {
        return new ItemProcessor();
    }

    @Bean
    public ItemWriter<Employee> writer() {
        return new ItemWriter<Event>();
    }

    }

利用ResourceLoader,我们可以像其他资源一样在ItemReader中读取S3中的文件。这将有助于以块的形式读取 S3 中的文件,而不是将整个文件加载到内存中。

随着依赖注入ResourceLoader and AmazonS3 client,已更改阅读器配置如下:

替换值sourceBucket and sourceObjectPrefix如所须。

@Autowired
private ResourceLoader resourceLoader;

@Autowired
private AmazonS3 amazonS3Client;

// READER
@Bean(destroyMethod="")
@StepScope
public SynchronizedItemStreamReader<Employee> employeeDataReader() {
    SynchronizedItemStreamReader synchronizedItemStreamReader = new SynchronizedItemStreamReader();
    List<Resource> resourceList = new ArrayList<>();
    String sourceBucket = yourBucketName;
    String sourceObjectPrefix = yourSourceObjectPrefix;
    log.info("sourceObjectPrefix::"+sourceObjectPrefix);
    ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
            .withBucketName(sourceBucket)
            .withPrefix(sourceObjectPrefix);
    ObjectListing sourceObjectsListing;
    do{
        sourceObjectsListing = amazonS3Client.listObjects(listObjectsRequest);
        for (S3ObjectSummary sourceFile : sourceObjectsListing.getObjectSummaries()){

            if(!(sourceFile.getSize() > 0)
                    || (!sourceFile.getKey().endsWith(DOT.concat("csv")))
            ){
                // Skip if file is empty (or) file extension is not "csv"
                continue;
            }
            log.info("Reading "+sourceFile.getKey());
            resourceList.add(resourceLoader.getResource("s3://".concat(sourceBucket).concat("/")
                    .concat(sourceFile.getKey())));
        }
        listObjectsRequest.setMarker(sourceObjectsListing.getNextMarker());
    }while(sourceObjectsListing.isTruncated());

    Resource[] resources = resourceList.toArray(new Resource[resourceList.size()]);
    MultiResourceItemReader<Employee> multiResourceItemReader = new MultiResourceItemReader<>();
    multiResourceItemReader.setName("employee-multiResource-Reader");
    multiResourceItemReader.setResources(resources);
    multiResourceItemReader.setDelegate(employeeFileItemReader());
    synchronizedItemStreamReader.setDelegate(multiResourceItemReader);
    return synchronizedItemStreamReader;
}

@Bean
@StepScope
public FlatFileItemReader<Employee> employeeFileItemReader()
{
    FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
    reader.setLinesToSkip(1);
    reader.setLineMapper(new DefaultLineMapper() {
        {
            setLineTokenizer(new DelimitedLineTokenizer() {
                {
                    setNames(Employee.fields());
                }
            });
            setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
                {
                    setTargetType(Employee.class);
                }
            });
        }
    });
    return reader;
}

以 MultiResourceItemReader 为例。即使您正在查找的特定 S3 路径中有多个 CSV 文件,这也可以工作。

如果只处理某个位置的一个 CSV 文件,它也可以隐式地使用Resources[] resources包含一个条目。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何处理来自 S3 的大文件并在 Spring Batch 中使用它 的相关文章

随机推荐