[Flow Basics 4] Flow Buffering
지난 포스트에서는 Flow를 방출하는 쪽에서 flowOn연산자를 사용해 Context를 변경하는 방법에 대해 살펴보았다.
이렇게 Producer의 Context를 변경하게 될 경우 오래 걸리는 작업들을 비동기적으로 처리할 수 있어서 도움이 된다.
그러나 만약 Collect 하는 쪽의 작업이 오래 걸리면 어떻게 해야 할까?
방출되는 속도에 비해 처리되는 속도가 현저히 느리며 데이터가 계속 쌓이게 되는 문제를 Back Pressure라고 한다.
이번 포스트는 이런 문제를 해결할 수 있는 Buffering에 대한 내용을 다룬다.
아래 예시를 통해 Collector의 시간이 긴 상황을 살펴보자.
- emit : 100ms의 delay 걸림
- collect : 300ms의 delay 걸림
- 둘 다 thread를 지정하지 않았으므로 main에서 실행
아래 코드는 완료되기까지 1235 ms정도의 시간이 필요하다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
개선 방법 1. Buffer 사용
아래는 코틀린 공식 문서인데, 이미지가 직관적으로 잘 나와있길래 한번 가져와봤다.
이렇게 중간에 buffer()를 사용해서 서로 다른 코루틴에서 작업을 시행하게 지정할 수 있다.
flowOn 연산자의 경우에도 Buffer를 사용한 구조를 활용하긴 하지만, flowOn은 dispatcher자체를 변경하는 거고, buffer() 함수의 경우에는 context를 변경하지 않는다는 차이점이 있다.
개선 방법 2. Conflate 사용
만약에 collecter가 연산을 너무 느리게 처리하게 될 경우 && 중간 연산 값이 중요하지 않은 경우에 conflate를 사용할 수 있다. emission은 confated channel로 처리하고, collector의 경우 각기 다른 coroutine으로 처리하는 방법이다. emitter의 입장에서는 suspend 없이 계속 데이터를 방출하고, Consumer가 가장 최신의 값을 전달받게 된다.
val flow = flow {
for (i in 1..30) {
delay(100)
emit(i)
}
}
val result = flow.conflate().onEach { delay(1000) }.toList()
assertEquals(listOf(1, 10, 20, 30), result)
- cf. 참고로 conflate 연산자는 buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST) 와 동일하다
개선 방법 3. collectLatest 사용
만약에 collecter의 작업이 너무 느리다는 것을 가정해 보자.
가장 최신의 데이터가 필요한 상황이라면 collect내부 로직을 수행하던 도중에, 새로운 값이 도착을 했다.
이럴 경우 내부 로직을 하나하나 충실하게 수행할 필요가 없을 것이다.
collectLatest 연산자는 이전 collect 블록 내부 로직을 취소하고, 새로운 값에 대한 연산을 시작한다.
conflate와 유사해 보이지만, 최신의 데이터를 받고 collect 과정에서 로직을 취소하냐 안 하냐의 여부가 가장 큰 차이인 듯하다.
val time = measureTimeMillis {
simple()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
println("Collected in $time ms")
// 🖥️ 출력 결과
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms
결론 : Buffer, Conflate, collectLatest를 활용해서 BackPressure 이슈를 처리할 수 있다.