Skip to content

Avoid Combine FlatMap

Compare
Choose a tag to compare
@luizmb luizmb released this 25 Jan 17:49
· 23 commits to master since this release
21010b4

Apparently, Combine FlatMap leaks memory, never cancelling the upstream when, from inside its block, you return a Fail.
Other operators like tryMap and flatMapLatest (map+switchToLatest) work as expected.
Let's check the following example:

cancellable = Timer
    .publish(every: 2, on: .main, in: .default)
    .autoconnect()
    .print("🤷‍♂️ point A")
    .scan(0, { result, _ in
        result + 1
    })
    .print("🤷‍♂️ point B")
    .setFailureType(to: Error.self)
    .flatMap { int -> AnyPublisher<Int, Error> in
        if int == 4 {
            return Fail<Int, Error>(error: BlaError()).eraseToAnyPublisher()
        }
        return Just<Int>(int).setFailureType(to: Error.self).eraseToAnyPublisher()
    }
    .print("🤷‍♂️ point C")
    .sink { (completion: Subscribers.Completion<Error>) in
        print("🤷‍♂️ completion: \(completion)")
    } receiveValue: { (value: Int) in
        print("🤷‍♂️", value)
    }

DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(15)) {
    print("🤷‍♂️ Cancelling")
    cancellable = nil
}

The example above shows a Timer that starts feeding a flatMap, which on the 4th event sends a Fail. As the Fail reaches the main pipeline, we expect it to complete the whole pipeline and tell the Timer to stop (cancel). This is what happens with all the publishers upon failure, it propagates downstream with the failure but even before that it tells all upstreams to cancel.

But not flatMap. It propagates the error to the downstream but forgets to tell upstream to cancel, which causes the timer to keep rolling forever and ever. To make it worse, calling .cancel on the subscription also doesn't stop the timer. Even if everything goes out of scope now, the Timer will remain alive.

If it's not a timer, but something like a LongPolling, WebSocket or whatever side-effect upstream does, the requests will continue forever, wasting bandwidth, battery and memory, and eventually also causing bugs.

This is the output for the example above:

🤷‍♂️ point C: receive subscription: (FlatMap)
🤷‍♂️ point C: request unlimited
🤷‍♂️ point A: receive subscription: ((extension in Foundation):__C.NSTimer.TimerPublisher.Inner<Combine.Publishers.Autoconnect<(extension in Foundation):__C.NSTimer.TimerPublisher>.(unknown context at $7fff4edb7680).Inner<Combine.Publishers.Print<Combine.Publishers.Autoconnect<(extension in Foundation):__C.NSTimer.TimerPublisher>>.(unknown context at $7fff4edb7a18).Inner<Combine.Publishers.Scan<Combine.Publishers.Print<Combine.Publishers.Autoconnect<(extension in Foundation):__C.NSTimer.TimerPublisher>>, Swift.Int>.(unknown context at $7fff4edbdad0).Inner<Combine.Publishers.Print<Combine.Publishers.Scan<Combine.Publishers.Print<Combine.Publishers.Autoconnect<(extension in Foundation):__C.NSTimer.TimerPublisher>>, Swift.Int>>.(unknown context at $7fff4edb7a18).Inner<Combine.Publishers.SetFailureType<Combine.Publishers.Print<Combine.Publishers.Scan<Combine.Publishers.Print<Combine.Publishers.Autoconnect<(extension in Foundation):__C.NSTimer.TimerPublisher>>, Swift.Int>>, Swift.Error>.(unknown context at $7fff4edb8a80).Inner<Combine.Publishers.FlatMap<Combine.AnyPublisher<Swift.Int, Swift.Error>, Combine.Publishers.SetFailureType<Combine.Publishers.Print<Combine.Publishers.Scan<Combine.Publishers.Print<Combine.Publishers.Autoconnect<(extension in Foundation):__C.NSTimer.TimerPublisher>>, Swift.Int>>, Swift.Error>>.(unknown context at $7fff4edc4580).Outer<Combine.Publishers.Print<Combine.Publishers.FlatMap<Combine.AnyPublisher<Swift.Int, Swift.Error>, Combine.Publishers.SetFailureType<Combine.Publishers.Print<Combine.Publishers.Scan<Combine.Publishers.Print<Combine.Publishers.Autoconnect<(extension in Foundation):__C.NSTimer.TimerPublisher>>, Swift.Int>>, Swift.Error>>>.(unknown context at $7fff4edb7a18).Inner<Combine.Subscribers.Sink<Swift.Int, Swift.Error>>>, Swift.Error>>>>>>)
🤷‍♂️ point B: receive subscription: (Print)
🤷‍♂️ point B: request unlimited
🤷‍♂️ point A: request unlimited
🤷‍♂️ point A: receive value: (2022-01-25 17:38:19 +0000)
🤷‍♂️ point B: receive value: (1)
🤷‍♂️ point C: receive value: (1)
🤷‍♂️ 1
🤷‍♂️ point A: receive value: (2022-01-25 17:38:21 +0000)
🤷‍♂️ point B: receive value: (2)
🤷‍♂️ point C: receive value: (2)
🤷‍♂️ 2
🤷‍♂️ point A: receive value: (2022-01-25 17:38:23 +0000)
🤷‍♂️ point B: receive value: (3)
🤷‍♂️ point C: receive value: (3)
🤷‍♂️ 3
🤷‍♂️ point A: receive value: (2022-01-25 17:38:25 +0000)
🤷‍♂️ point B: receive value: (4)
🤷‍♂️ point C: receive error: (BlaError())
🤷‍♂️ completion: failure(TeufelStreaming_iOS.BlaError())
🤷‍♂️ point A: receive value: (2022-01-25 17:38:27 +0000)
🤷‍♂️ point B: receive value: (5)
🤷‍♂️ point A: receive value: (2022-01-25 17:38:29 +0000)
🤷‍♂️ point B: receive value: (6)
🤷‍♂️ point A: receive value: (2022-01-25 17:38:31 +0000)
🤷‍♂️ point B: receive value: (7)
🤷‍♂️ Cancelling
🤷‍♂️ point A: receive value: (2022-01-25 17:38:33 +0000)
🤷‍♂️ point B: receive value: (8)
🤷‍♂️ point A: receive value: (2022-01-25 17:38:35 +0000)
🤷‍♂️ point B: receive value: (9)
🤷‍♂️ point A: receive value: (2022-01-25 17:38:37 +0000)
🤷‍♂️ point B: receive value: (10)
🤷‍♂️ point A: receive value: (2022-01-25 17:38:39 +0000)
🤷‍♂️ point B: receive value: (11)
🤷‍♂️ point A: receive value: (2022-01-25 17:38:41 +0000)
🤷‍♂️ point B: receive value: (12)
🤷‍♂️ point A: receive value: (2022-01-25 17:38:43 +0000)
🤷‍♂️ point B: receive value: (13)
🤷‍♂️ point A: receive value: (2022-01-25 17:38:45 +0000)
🤷‍♂️ point B: receive value: (14)
🤷‍♂️ point A: receive value: (2022-01-25 17:38:47 +0000)
🤷‍♂️ point B: receive value: (15)
🤷‍♂️ point A: receive value: (2022-01-25 17:38:49 +0000)
🤷‍♂️ point B: receive value: (16)
🤷‍♂️ point A: receive value: (2022-01-25 17:38:51 +0000)
🤷‍♂️ point B: receive value: (17)
🤷‍♂️ point A: receive value: (2022-01-25 17:38:53 +0000)
🤷‍♂️ point B: receive value: (18)
🤷‍♂️ point A: receive value: (2022-01-25 17:38:55 +0000)
🤷‍♂️ point B: receive value: (19)
🤷‍♂️ point A: receive value: (2022-01-25 17:38:57 +0000)
🤷‍♂️ point B: receive value: (20)

Points A and B keep sending values even after cancellation. And FlatMap, in a faulty state, ignores both, the cancelation from the downstream, and the events from the upstream.

Moving to map+switchToLatest fixes the problem.