使用异步收集 Publisher 值

2024-05-01

我一直在为我们拥有的一些组合代码编写一些单元测试。我遇到了一些问题。我想我已经简化了这个测试中的各个部分。注意:这不是一个测试 - 这是我试图理解为什么其中一个测试不起作用!

func test_collectingPassthroughValues() async throws {
    // In the real test this is injected in to the unit under test.
    let subject = PassthroughSubject<Int, Never>()

    // I'm expecting this to only complete once `subject` finishes. I've used
    // `async let` so I can poke some data through `subject` and then later on
    // `await collectValues` to hopefully get back the stuff published by 
    // `subject`. In the real test this is a property from the unit under test
    // which runs various operators on `subject`.
    async let collectValues = await subject.values.reduce(into: []) { $0.append($1) }

    // Send some data through `subject` and then `.finish` it.
    subject.send(10)
    subject.send(20)
    subject.send(completion: .finished)

    // Await the values so we can check we got what's expected.
    let values = await collectValues

    // This fails…
    XCTAssertEqual(values, [10, 20])
}

断言失败并显示:

est_collectingPassthroughValues(): XCTAssertTrue failed - Found difference for 
Different count:
 |  Received: (0) []
 |  Expected: (2) [10, 20]

So subject.values似乎什么也没得到;我不知道为什么?

Thanks!


发生的事情相当简单。如何正确编写它还不太清楚,我的建议是“不要这样做”。

首先,一个不是问题的小问题:

async let collectValues = await subject.values.reduce(into: []) { $0.append($1) }

你不应该使用await这里。如果没有其他问题的话,这可能会成为一个问题。

根本问题是,如果没有订阅者,PassthroughSubject 就会丢弃消息。在您当前的代码中,这绝对会发生,但也很难修复。

// Taking out the extra `await`
async let collectValues = subject.values.reduce(into: []) { $0.append($1) }

// That line is pretty close to:
    let collectValues = Task {
        var values: [Int] = []
        for await value in subject.values {
            values.append(value)
        }
        return values
    }

问题是这会启动一项可能不会立即启动的任务。所以你的下一行代码,subject.send(10)没有订阅者(它甚至还没有到达for-await线),然后就被扔掉了。

你可以通过添加一个来修复它try await Task.sleep(for: .seconds(1))创建任务后,但没有多大帮助。 PassthroughSubject 上没有缓冲。当你打电话时append,没有什么在听。该值将被丢弃,并且您将删除 20。

你可以通过缓冲来改善事情,但你仍然需要睡觉(我认为这是不可接受的)。尽管如此,以下内容对我来说非常可靠:

func test_collectingPassthroughValues() async throws {
    // In the real test this is injected in to the unit under test.
    let subject = PassthroughSubject<Int, Never>()
    let readSubject = subject.buffer(size: 10, prefetch: .keepFull, whenFull: .dropOldest)

    async let collectValues = readSubject.values.reduce(into: []) { $0.append($1) }

    try await Task.sleep(for: .seconds(1))
    subject.send(10)
    subject.send(20)
    subject.send(completion: .finished)

    // Await the values so we can check we got what's expected.
    let values = await collectValues

    XCTAssertEqual(values, [10, 20])
}

但在我看来,这是一种完全失败的方法。

我不会尝试将 PassthroughSubject 与.values。我只是看不出有什么方法可以让它变得健壮。更广泛地说,我建议非常小心地混合组合和结构化并发。他们对于事情应该如何运作往往有非常不同的想法。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用异步收集 Publisher 值 的相关文章

随机推荐