루피도 코딩한다

[Flow Basics 5] Composing multiple flows and Flattening flows 본문

Coroutine

[Flow Basics 5] Composing multiple flows and Flattening flows

xiaolin219 2024. 1. 30. 22:17

1. Composing multiple flows (여러개 flow 결합하기)

collect 이전에 연산자가 위치하게 된다

  1. Zip : 두개의 flow를 순차적으로 결합

    val flow = flowOf(1, 2, 3).onEach { delay(10) }
    val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
    flow.zip(flow2) { i, s -> i.toString() + s }.collect {
       println(it) // Will print "1a 2b 3c"
    }
image-20240130171026341
  1. Combine : 방출되는게 생길때마다 두개의 flow

    (combine 블록 내부에서 emit하려고 하면 에러남)

    val flow = flowOf(1, 2).onEach { delay(10) }
    val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
    flow.combine(flow2) { i, s -> i.toString() + s }.collect {
        println(it) // Will print "1a 2a 2b 2c"
    }
    image-20240130171009310
  1. CombineTransform : 합쳐서 가공한 결과를 다시 Emit함 ()

    (combineTransform 블록 내부에서 emit 호출 안하면 에러남)

    val flow = requestFlow()
    val flow2 = searchEngineFlow()
    flow.combineTransform(flow2) { request, searchEngine ->
        emit("Downloading in progress")
        val result = download(request, searchEngine)
        emit(result)
    }
image-20240130174300526

2. Flow Flattening

flow내부에서 flow가 있다면 어떻게 처리할까..? 여러개의 flow가 있을때 합쳐서 하나의 flow를 만드는것을 flattening이라고 한다.

val flowInsideFlow : Flow<Flow<String>> = (1..3).asFlow().map { requestFlow(it) }

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) // wait 500 ms
    emit("$i: Second")
}

flow내부에서 flow를 호출하는 이 자태를 보면.. 연산자의 필요성을 느끼게 된다.

공식문서에 따르면, Collection과 Sequence에 대해 flatten, flatMap연산자를 지원하긴 하나, flow가 비동기적으로 처리되기에 각각의 flow가 fattening연산자를 호출하게 된다. 따라서 flow의 연산을 처리할 수 있는 flattening 연산자들이 따로 필요한것이라고 한다.

  1. flatMapConcat : 여러 flow들의 발생값을 순서대로 이어붙여(concat) 하나의 flow를 만든다.

    • flowOf(1, 2, 3) -> flowOf(4, 5, 6) -> flowOf(7, 8, 9) 순으로 이어붙이게 되는것

    • delay를 붙여도 1, 2, 3, ... 8, 9 순서가 유지됨

    val flowOfFlows = flowOf(
        flowOf(1, 2, 3),
        flowOf(4, 5, 6),
        flowOf(7, 8, 9)
    )
    
    flowOfFlows
        .flatMapConcat { it }
        .collect { println(it) }
    
    // 출력 결과
    1, 2, 3, 4, ... 8, 9
  2. flatMapMerge : 모든 flow를 동시에 처리하며, 값이 발생하는 순서는 보장되지 않는다

    • flowOf(4, 5, 6)가 flowOf(1, 2, 3) 가 끝나기를 기다리지 않음

    • 동시에 flowOf(1, 2, 3) , flowOf(4, 5, 6) , flowOf(7, 8, 9) 를 합쳐버림

    • 여러개의 flow를 동시에 처리하다 보니 delay의 영향을 받음

    val flowOfFlows = flowOf(
        flowOf(1, 2, 3).onEach { delay(100) },
        flowOf(4, 5, 6).onEach { delay(50) },
        flowOf(7, 8, 9).onEach { delay(75) }
    )
    
    flowOfFlows
        .flatMapMerge { it }
        .collect { println(it) }
    
    // 출력 결과
    4, 7, 1, 5, 8, 6, 2, 9, 3
  3. flatMapLatest : 마지막 flow인 flowOf(7, 8, 9)만 처리하고 앞에 방출된 flow는 취소해버린다

    val flowOfFlows = flowOf(
        flowOf(1, 2, 3).onEach { delay(100) },
        flowOf(4, 5, 6).onEach { delay(50) },
        flowOf(7, 8, 9).onEach { delay(75) }
    )
    
    flowOfFlows
        .flatMapLatest { it }
        .collect { println(it) }
    
    // 출력 결과
    7, 8, 9

이미지 출처

Comments