You will be fine

<Coroutine> 11. 코루틴 Flow(플로우)의 함수들

by BFine
반응형

출처&참고 : https://m.yes24.com/Goods/Detail/123034354

 

코틀린 코루틴 - 예스24

코틀린 전문 강사가 알려 주는 코틀린 코루틴에 대한 모든 것!코틀린 코루틴은 효율적이고 신뢰할 수 있는 멀티스레드 프로그램을 쉽게 구현할 수 있게 해 주어 자바 가상 머신(JVM), 특히 안드로

m.yes24.com

 

가. 생명주기 함수

 a. onEach

  -  Flow 값을 하나씩 받기 위해 onEach 함수를 사용한다. onEach 람다식은 중단함수이고 순서대로 처리된다.

 

 b. onStart 

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*

suspend fun main() {
    flowOf("hello", "world", "!")
        .onEach { delay(1000) }
        .onStart { println("## Start") }
        .collect { println(it) }
}

## Start
hello
world
!

  -  첫번째 원소를 요청했을 때호출되는 함수로 emit으로 원소를 내보낼 수도 있다.

 

 c. onCompletion

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*

suspend fun main() {
    flowOf("hello", "world", "!")
        .onEach { delay(1000) }
        .onStart { println("## Start") }
        .onCompletion { println("## End") }
        .collect { println(it) }
}

## Start
hello
world
!
## End

  - Flow 가 완료되는 시점이 여러가지가 있는데 잡히지 않은 예외가 발생하거나 코루틴이 취소 되었을때 특히 Flow 빌더가 끝났을때가 있는데

     그때 호출되는 리스너를 이 onCompletion 함수를 사용해서 추가할 수 있다.

 

 d. onEmpty 와 catch

import kotlinx.coroutines.flow.*

suspend fun main() {
    flowOf<String>()
        .onEmpty { emit("EMPTY") }
        .collect { println(it) }
}

  -  Flow는 예기지 않은 이벤트가 발생하면 값을 내보내기 전에 완료될 수 있는데 이렇게 원소를 내보내기 전에 Flow가 완료되면 실행된다.

      => 기본값을 내보내기 위한 목적으로도 사용될수있다.

 

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            if (i == 4) throw IllegalArgumentException("예외 발생")
            emit(i)
        }
    }
        .onEach { println("Emitting value: $it") }
        .catch { e -> println("Caught exception: ${e.message}") }
        .collect { println("Collected: $it") }
}

Emitting value: 1
Collected: 1
Emitting value: 2
Collected: 2
Emitting value: 3
Collected: 3
Caught exception: 예외 발생

  -  Flow를 만들거나 처리하는 도중에 예외가 발생할 수 있는데 이러한 예외를 잡고 관리할 수 있도록 catch 함수가 있다.

 

 e. flowOn

  -  Flow 함수들이 컨텍스트를 얻어오는 시점은 collect가 호출될때이고 호출한 곳의 컨텍스트를 가져온다. flowOn 함수는 이런 컨텍스트를 변경할수있다.

 

 f. launchIn

  -  이 함수를 사용하면 유일한 인자로 scope를 받아서 collect를 새로운 코루틴에서 시작할 수 있게 할 수 있다.

 

나. 처리 함수

 a. map

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val flow = flowOf("Hello", "World", "!!")
        .map { it.uppercase() }

    flow.collect {
        println(it)
    }
}

HELLO
WORLD
!!

  -  Flow의 각원소를 변환 함수에 따라 변환하는 함수이다. 

 

 b. fliter

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val flow = flowOf("Hello", "World", "!!")
        .filterNot { it  == "!!" }

    flow.collect {
        println(it)
    }
}

Hello
World

  -  Flow에서 주어진 조건에 맞는 값들만 가진 Flow를 반환 한다.

 

 c.  take와 drop

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val flow = flowOf("Hello", "World", "!!")


    flow.take(2).collect {
        println(it)
    }
    flow.drop(2).collect {
        println(it)
    }
}

Hello
World
!!

  -  Flow에서 주어진 원소들 중 특정 수만 통과시키기 위해서는 take, 특정 수의 원소를 무시하고 받기 위해서는 drop을 사용 할 수 있다.

 

 d. merge, zip 그리고 combine 

  -  코루틴 Flow에서는  두 개의 Flow를 하나의 Flow를 합치는 함수를 제공하고 있다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val flow1 = flowOf("Apple", "Orange", "Melon")
    val flow2 = flowOf(1000, 2000, 3000)

    merge(flow1, flow2)
       .toList()
        .forEach {
            println(it)
        }
    
    merge(flow1, flow2)
        .collect{ println(it) } // 컴파일 오류
}

Apple
Orange
Melon
1000
2000
3000

  -  가장 간단한 방법으로는 merge를 사용하는 방법이다. 한 Flow의 원소가 다른 Flow를 기다리지 않는 것이 특징이다.

      => Flow의 원소 생성이 지연된다고 해서 다른 Flow의 원소 생성이 중단되지는 않는다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {

    flowOf("Apple", "Orange", "Melon")
        .zip(flowOf(1000, 2000, 3000)) { fruit, price -> "${fruit}-${price}" }
        .collect { println(it)}

}

Apple-1000
Orange-2000
Melon-3000

  -  서로 다른 두개의 Flow에서 원소 쌍을 만드는 방법이 zip 함수 이다.

      =>  각각의 Flow 원소는 한 쌍의 일부가 되므로 쌍이 될 원소를 기다려야 한다. ( 쌍을 이루지 못하고 남은 원소는 유실된다. )

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {

    val flow = flow {
        listOf(1000, 2000, 3000)
            .forEach {
                delay(1000L)
                emit(it)
            }
    }

    flowOf("Apple", "Orange", "Melon")
        .combine(flow) { fruit, price -> "${fruit}-${price}" }
        .collect { println(it)}
}

Melon-1000
Melon-2000
Melon-3000

  -  combine 함수는 위의 zip 처럼 원소들로 쌍을 형성하기 때문에 첫번째 쌍을 만들기 위해서 더 느린 Flow를 기다려야 한다.

      =>  그러나 zip과는 다르게 두 Flow가 모두 닫힐때까지 원소를 내보낸다.

  -  combine은 두 데이터 소싀의 변화를 능동적으로 감지해야할때 주로 사용된다.

 

 e. fold와 scan 

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {

    val result = flowOf("Hello", "World", "!!")
        .fold("") { acc, value -> if (acc.isNotEmpty()) "$acc $value" else value }

    println(result)
}

Hello World !!

  - flod는 Flow에서 주워진 원소 각각에 대해 두 개의 값을 하나로 합치는 작성한 연산을 적용하여 Flow의 모든 값을 하나로 합치는 최종 연산 함수이다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {

    val results = flowOf("Hello", "World", "!!")
        .scan("") { acc, value -> if (acc.isNotEmpty()) "$acc $value" else value }

    results.collect {
        println(it)
    }
}


Hello
Hello World
Hello World !!

  - scan에 경우에는 누적되는 과정의 모든 값을 생성하는 중간 연산이다. 

     => 이전 단계에서 값을 받은 즉시 새로운 값을 만들기 때문에 Flow에서 유용하게 사용된다.

 

 f. flatMapConcat, flatMapMerge, flatMapLatest 

  -  컬렉션의 flatMap이 평탄화된 컬렉션을 반환하는 것처럼 Flow의 flatMap도 평탄화된 Flow를 반환한다.  

  -  하지만 Flow의 경우 서로 다른 Flow에서 원소가 나오는 시간이 다르기 때문에 이거에 대한 고려를 위해 여러 flatMap 함수가 존재한다.

import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {

    flowOf("Apple")
        .map { it.uppercase() }
        .flatMapConcat { fruit ->
            fruit.asFlow()
        }
        .collect { letter ->
            println(letter)
        }
}

fun String.asFlow(): Flow<Char> = flow {
    for (char in this@asFlow) {
        emit(char)
    }
}


A
P
P
L
E

   -  flatMapConcat 은 생성된 Flow를 하나씩 처리하기때문에 두번째 Flow는 첫번째 Flow가 완료되었을때 시작된다.

import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {

    flowOf("APPLE", "orange")
        .flatMapMerge(1) { fruit ->
            fruit.asFlow()
        }
        .collect { letter ->
            println(letter)
        }
}

fun String.asFlow(): Flow<Char> = flow {
    for (char in this@asFlow) {
        delay(1000)
        emit(char)
    }
}

A
P
P
L
E
o
r
a
n
g
e
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {

    flowOf("APPLE", "orange")
        .flatMapMerge(2) { fruit ->
            fruit.asFlow()
        }
        .collect { letter ->
            println(letter)
        }
}

fun String.asFlow(): Flow<Char> = flow {
    for (char in this@asFlow) {
        delay(1000)
        emit(char)
    }
}

A
o
P
r
P
a
L
n
E
g
e

   -  flatMapMerge 는 만들어진 Flow를 동시처리한다. concurrency 인자를 사용해 동시에 처리할 수 있는 Flow 수를 설정할 수 있다.

       => 기본값은 16이지만 JVM에서 DEFAULT_CONCURRENCY_PROPERTY_NAME 프로퍼티를 사용해 변경이 가능하다.

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {


    flowOf("APPLE", "orange")
        .flatMapLatest { fruit ->
            fruit.asFlow()
        }
        .collect { letter ->
            println(letter)
        }
}

fun String.asFlow(): Flow<Char> = flow {
    for (char in this@asFlow) {
        delay(1000) 
        emit(char)
    }
}

o
r
a
n
g
e

   -  flatMapLast는 새로운 Flow가 나타나면 이전에 처리하던 Flow를 잊는 형태이다.

 

 g. retry 와 retryWhen

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {

    val fruits = flow {
        listOf("Apple", "Orange", "Melon").forEach {
            if (it == "Orange") {
                throw IllegalStateException()
            }
            emit(it)
        }
    }

    fruits
        .retry(retries = 2) { cause ->
            println(cause.message)
            true
        }
        .collect { value ->
            println("collect : $value")
        }
}

collect : Apple
null
collect : Apple
null
collect : Apple
Exception in thread "main" java.lang.IllegalStateException
at com.example.service.Week9Kt$main$1$fruits$1.invokeSuspend(Week9.kt:11)

 -  예외는 Flow를 따라 흐르면서 각 단계를 하나씩 종료하는데 종료된 단계는 비활성화 되어서 예외가 발생한 뒤에 메세지를 보낸는것은 불가능하지만

       각 단계가 이전 단계에 대한 참조를 가지고 있어서 Flow를 다시 시작하기 위해 참조를 사용할 수 있다. 이를 이용한 것인 retry와 retryWhen이다.

     => retryWhen은 Flow의 이전 단계에서 예외가 발생할때마다 predicate를 확인하고 재시도할지 여부를 결정한다.

 

 h. distinctUntilChanged

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {

    flowOf("Apple", "Apple", "Orange", "Orange", "Melon", "Apple")
        .distinctUntilChanged()
        .collect {
            println(it)
        }
}

Apple
Orange
Melon
Apple

  -  반복되는 원소가 동일하다고 판단되면 제거하는 함수로 바로 이전의 원소와 동일 원소만 제거한다.

      => 비교할 선택자가 필요한 경우에는 distinctUntilChangedBy 를 사용하면 된다. 

 

 i.  최종연산 함수

  -  위에 fold와 scan 이외에도 count, first(firstOrNull), reduce 등 다양한 최종연산 함수가 있기 때문에 잘 활용해보면 좋을것 같다. 

반응형

블로그의 정보

57개월 BackEnd

BFine

활동하기