ExecutorService executorService = Executors.newFixedThreadPool(10);

            LOG.info("Setting up the poller for directory {} ", finalDirectory);
            StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(new CustomFileReadingSource(finalDirectory),
                    c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS, 5)
                            .advice(new LoggerSourceAdvisor(finalDirectory))

                    //move file to processing first processing                    
                    .transform(new FileMoveTransformer("C:/processing", true))

正如所见,我已经设置固定threadpool每次轮询最多 10 条消息,最多 10 条消息。如果我放入 10 个文件,它仍然会一一处理。这里可能出了什么问题?

* 更新 *



setDirectory(new File(path));
        DefaultDirectoryScanner scanner = new DefaultDirectoryScanner();

        scanner.setFilter(new AcceptAllFileListFilter<>());





    public boolean accept(F file) {
        String key = buildKey(file);
        synchronized (this.monitor) {
            String newValue = value(file);
            String oldValue = this.store.putIfAbsent(key, newValue);
            if (oldValue == null) { // not in store
                return true;
            // same value in store
            if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
                return true;
            return false;

现在,例如,如果我设置了 max per poll 5 并且有两个文件,那么两个线程可能会选取相同的文件。


但另一个线程到达accept method



如果它是 0 那么它应该返回 false,因为该文件不再存在。

当您将任务执行器添加到轮询器时;所做的只是调度程序线程将轮询任务交给线程池中的线程;这maxMessagesPerPoll是轮询任务的一部分。轮询器本身每 5 秒仅运行一次。为了得到你想要的,你应该向流程添加一个执行者通道......

public class So53521593Application {

    private static final Logger logger = LoggerFactory.getLogger(So53521593Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So53521593Application.class, args);

    public IntegrationFlow flow() {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        return IntegrationFlows.from(() -> "foo", e -> e
                    .poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                .<String>handle((p, h) -> {
                    try {
                    catch (InterruptedException e1) {
                    return null;



public IntegrationFlow flow() {
    ExecutorService exec = Executors.newFixedThreadPool(10);
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
                new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
                    e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
            .handle((p, h) -> {
                try {
                catch (InterruptedException e1) {
                return null;


2018-11-28 11:46:05.196 信息 57607 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt

2018-11-28 11:46:05.197 INFO 57607 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt

touch test1.txt

2018-11-28 11:48:00.284 INFO 57607 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt


同意 - 转载此...

public IntegrationFlow flow() {
    ExecutorService exec = Executors.newFixedThreadPool(10);
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
                new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
                    e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
            .<File>handle((p, h) -> {
                try {
                catch (InterruptedException e1) {
                return null;


2018-11-28 13:22:23.689 INFO 75681 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt

2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt

2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt

2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-4] com.example.So53521593Application : /tmp/foo/test2.txt


