루피도 코딩한다

[Flow Basics 2] Flow Intermediate and Terminal Operators 본문

Coroutine

[Flow Basics 2] Flow Intermediate and Terminal Operators

xiaolin219 2024. 1. 26. 18:28

Intermediate flow operators

Basic Operators

  1. map : map 내부에서 변경한 데이터를 downStream으로 흘려보냄
  2. filter : 조건에 맞는 것만 남기기 (조건을 술어 혹은 predicate라고 한다)
  3. filterNot : 조건에 맞지 않는것만 남기기

Transform Operator

  1. transform : stream 수정 (임의의 값을 임의의 횟수만큼 emit 가능)

Size-limiting Operators

  1. take(n) : 시작지점부터 n개만큼 data를 다 받아들이면 flow 실행 cancle 시키기
  2. takeWhile : 특정 조건을 만족하는 동안만 값을 가져오게 하기. 조건 안맞으면 즉시 실행 취소
  3. drop(n) : 시작부터 n 개만큼 data 버리기
    • 만약 emit된 개수보다 n이 더 크다면 정상종료 됨
  4. dropWhile : 특정 조건을 만족시키는 동안만 data 버리기.
    • 만약 첫번째 조건이 안맞았다 그러면 아무것도 안버리고 다 collect 하게 됨

Terminal Flow Operators

Terminal Flow(종단 연산자)는 suspend function이며 단일값을 반환함으로써 flow를 끝낸다. (내부적으로 collect 호출함)

image-20240126182149719

  1. toList, toSet을 활용해 flow를 Collection으로 변형할 수 있다.
  2. first : flow의 첫번째 값 가져오기
  3. single : flow의 단일 값 가져오기. 만약 flow가 emit하는 데이터가 두 개 이상일 경우 IllegalArgumentException발생
  4. reduce : collection을 누적해서 계산. 첫번째 데이터가 콜렉션의 첫번째 원소임.
  5. fold : collection을 누적해서 계산. 첫번째 데이터를 개발자가 직접 지정해주어야함.
  6. cf. reducefold는 Kotlin collection 기본 함수이기도 한데, 만약 해당 함수가 낯선 사람이라면 Kotlin 공식문서 예시를 보면 쉽게 이해할 수 있을것이다:)
  7. count : count{ /* here(술어) */} 술어를 만족하는 자료의 개수 세기

Example of Operators

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun flowSomething(): Flow<Int> = flow {
    for (i in 0..20) {
        emit(i)
        delay(10L)
    }
}

fun map() = runBlocking {
    flowSomething()
        .map { "$it $it" }
        .collect { print("$it -> ") }
}

fun filter() = runBlocking {
    flowSomething()
        .filter { (it % 2) == 0 }
        .collect { print("$it -> ") }
}

fun filterNot() = runBlocking {
    flowSomething()
        .filterNot { (it % 2) == 0 }
        .collect { print("$it -> ") }
}


fun transform() = runBlocking {
    flowSomething()
        .transform {
            emit(it)
            emit(it * 2)
        }.collect { print("$it -> ") }
}


fun take() = runBlocking {
    flowSomething()
        .take(5)
        .collect { print("$it -> ") }
}

fun takeWhile() = runBlocking {
    flowSomething()
        .takeWhile { it == 3 }
        .collect { print("$it -> ") }
}

fun drop() = runBlocking {
    flowSomething()
        .drop(21) // 0, 1, 2 이렇게 3개 버림
        .collect { print("$it -> ") }
}

fun dropWhile() = runBlocking {
    flowSomething()
        .dropWhile { it % 2 == 0 }
        .collect { print("$it -> ") }
}

fun toList() = runBlocking {
    flowSomething()
        .dropWhile { it % 2 == 0 }
        .collect { print("$it -> ") }
}

fun toSet() = runBlocking {
    flowSomething()
        .dropWhile { it % 2 == 0 }
        .collect { print("$it -> ") }
}

fun first() = runBlocking {
    println(flowSomething().first())
}

fun single() = runBlocking {
    println(flowOf("singleValue").single())
//    println(flowOf("multipleValue", "occurException").single())
}

fun reduce() = runBlocking {
    // 0, 1, .. 20 차례로 더하기
    println(
        flowSomething().reduce { accumulator, value ->
            accumulator + value // sum of 0 to 20
        }
    )
}

fun fold() = runBlocking {
    // 100, 0, 1, .. 20 차례로 더하기
    println(
        flowSomething().fold(100) { accumulator, value ->
            accumulator + value // sum of 0 to 20, plus 100
        }
    )
}

fun count() = runBlocking {
    // 홀수 몇개인지 세기
    println(
        flowSomething().count { it % 2 == 0 }
    )
}

fun main(): Unit = runBlocking {
//    map()
//    filter()
//    filterNot()
//    transform()
//    take()
//    takeWhile()
//    drop()
//    dropWhile()
//    first()
//    single()
//    reduce()
//    fold()
    count()
    delay(1000)
}
Comments