<Coroutine> 11. 코루틴 Flow(플로우)의 함수들
by BFine출처&참고 : https://m.yes24.com/Goods/Detail/123034354
가. 생명주기 함수
a. onEach
- Flow 값을 하나씩 받기 위해 onEach 함수를 사용한다. onEach 람다식은 중단함수이고 순서대로 처리된다.
b. onStart
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
suspend fun main() {
flowOf("hello", "world", "!")
.onEach { delay(1000) }
.onStart { println("## Start") }
.collect { println(it) }
}
## Start
hello
world
!
- 첫번째 원소를 요청했을 때호출되는 함수로 emit으로 원소를 내보낼 수도 있다.
c. onCompletion
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
suspend fun main() {
flowOf("hello", "world", "!")
.onEach { delay(1000) }
.onStart { println("## Start") }
.onCompletion { println("## End") }
.collect { println(it) }
}
## Start
hello
world
!
## End
- Flow 가 완료되는 시점이 여러가지가 있는데 잡히지 않은 예외가 발생하거나 코루틴이 취소 되었을때 특히 Flow 빌더가 끝났을때가 있는데
그때 호출되는 리스너를 이 onCompletion 함수를 사용해서 추가할 수 있다.
d. onEmpty 와 catch
import kotlinx.coroutines.flow.*
suspend fun main() {
flowOf<String>()
.onEmpty { emit("EMPTY") }
.collect { println(it) }
}
- Flow는 예기지 않은 이벤트가 발생하면 값을 내보내기 전에 완료될 수 있는데 이렇게 원소를 내보내기 전에 Flow가 완료되면 실행된다.
=> 기본값을 내보내기 위한 목적으로도 사용될수있다.
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
flow {
for (i in 1..5) {
if (i == 4) throw IllegalArgumentException("예외 발생")
emit(i)
}
}
.onEach { println("Emitting value: $it") }
.catch { e -> println("Caught exception: ${e.message}") }
.collect { println("Collected: $it") }
}
Emitting value: 1
Collected: 1
Emitting value: 2
Collected: 2
Emitting value: 3
Collected: 3
Caught exception: 예외 발생
- Flow를 만들거나 처리하는 도중에 예외가 발생할 수 있는데 이러한 예외를 잡고 관리할 수 있도록 catch 함수가 있다.
e. flowOn
- Flow 함수들이 컨텍스트를 얻어오는 시점은 collect가 호출될때이고 호출한 곳의 컨텍스트를 가져온다. flowOn 함수는 이런 컨텍스트를 변경할수있다.
f. launchIn
- 이 함수를 사용하면 유일한 인자로 scope를 받아서 collect를 새로운 코루틴에서 시작할 수 있게 할 수 있다.
나. 처리 함수
a. map
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flow = flowOf("Hello", "World", "!!")
.map { it.uppercase() }
flow.collect {
println(it)
}
}
HELLO
WORLD
!!
- Flow의 각원소를 변환 함수에 따라 변환하는 함수이다.
b. fliter
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flow = flowOf("Hello", "World", "!!")
.filterNot { it == "!!" }
flow.collect {
println(it)
}
}
Hello
World
- Flow에서 주어진 조건에 맞는 값들만 가진 Flow를 반환 한다.
c. take와 drop
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flow = flowOf("Hello", "World", "!!")
flow.take(2).collect {
println(it)
}
flow.drop(2).collect {
println(it)
}
}
Hello
World
!!
- Flow에서 주어진 원소들 중 특정 수만 통과시키기 위해서는 take, 특정 수의 원소를 무시하고 받기 위해서는 drop을 사용 할 수 있다.
d. merge, zip 그리고 combine
- 코루틴 Flow에서는 두 개의 Flow를 하나의 Flow를 합치는 함수를 제공하고 있다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flow1 = flowOf("Apple", "Orange", "Melon")
val flow2 = flowOf(1000, 2000, 3000)
merge(flow1, flow2)
.toList()
.forEach {
println(it)
}
merge(flow1, flow2)
.collect{ println(it) } // 컴파일 오류
}
Apple
Orange
Melon
1000
2000
3000
- 가장 간단한 방법으로는 merge를 사용하는 방법이다. 한 Flow의 원소가 다른 Flow를 기다리지 않는 것이 특징이다.
=> Flow의 원소 생성이 지연된다고 해서 다른 Flow의 원소 생성이 중단되지는 않는다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
flowOf("Apple", "Orange", "Melon")
.zip(flowOf(1000, 2000, 3000)) { fruit, price -> "${fruit}-${price}" }
.collect { println(it)}
}
Apple-1000
Orange-2000
Melon-3000
- 서로 다른 두개의 Flow에서 원소 쌍을 만드는 방법이 zip 함수 이다.
=> 각각의 Flow 원소는 한 쌍의 일부가 되므로 쌍이 될 원소를 기다려야 한다. ( 쌍을 이루지 못하고 남은 원소는 유실된다. )
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flow = flow {
listOf(1000, 2000, 3000)
.forEach {
delay(1000L)
emit(it)
}
}
flowOf("Apple", "Orange", "Melon")
.combine(flow) { fruit, price -> "${fruit}-${price}" }
.collect { println(it)}
}
Melon-1000
Melon-2000
Melon-3000
- combine 함수는 위의 zip 처럼 원소들로 쌍을 형성하기 때문에 첫번째 쌍을 만들기 위해서 더 느린 Flow를 기다려야 한다.
=> 그러나 zip과는 다르게 두 Flow가 모두 닫힐때까지 원소를 내보낸다.
- combine은 두 데이터 소싀의 변화를 능동적으로 감지해야할때 주로 사용된다.
e. fold와 scan
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val result = flowOf("Hello", "World", "!!")
.fold("") { acc, value -> if (acc.isNotEmpty()) "$acc $value" else value }
println(result)
}
Hello World !!
- flod는 Flow에서 주워진 원소 각각에 대해 두 개의 값을 하나로 합치는 작성한 연산을 적용하여 Flow의 모든 값을 하나로 합치는 최종 연산 함수이다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val results = flowOf("Hello", "World", "!!")
.scan("") { acc, value -> if (acc.isNotEmpty()) "$acc $value" else value }
results.collect {
println(it)
}
}
Hello
Hello World
Hello World !!
- scan에 경우에는 누적되는 과정의 모든 값을 생성하는 중간 연산이다.
=> 이전 단계에서 값을 받은 즉시 새로운 값을 만들기 때문에 Flow에서 유용하게 사용된다.
f. flatMapConcat, flatMapMerge, flatMapLatest
- 컬렉션의 flatMap이 평탄화된 컬렉션을 반환하는 것처럼 Flow의 flatMap도 평탄화된 Flow를 반환한다.
- 하지만 Flow의 경우 서로 다른 Flow에서 원소가 나오는 시간이 다르기 때문에 이거에 대한 고려를 위해 여러 flatMap 함수가 존재한다.
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
flowOf("Apple")
.map { it.uppercase() }
.flatMapConcat { fruit ->
fruit.asFlow()
}
.collect { letter ->
println(letter)
}
}
fun String.asFlow(): Flow<Char> = flow {
for (char in this@asFlow) {
emit(char)
}
}
A
P
P
L
E
- flatMapConcat 은 생성된 Flow를 하나씩 처리하기때문에 두번째 Flow는 첫번째 Flow가 완료되었을때 시작된다.
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
flowOf("APPLE", "orange")
.flatMapMerge(1) { fruit ->
fruit.asFlow()
}
.collect { letter ->
println(letter)
}
}
fun String.asFlow(): Flow<Char> = flow {
for (char in this@asFlow) {
delay(1000)
emit(char)
}
}
A
P
P
L
E
o
r
a
n
g
e
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
flowOf("APPLE", "orange")
.flatMapMerge(2) { fruit ->
fruit.asFlow()
}
.collect { letter ->
println(letter)
}
}
fun String.asFlow(): Flow<Char> = flow {
for (char in this@asFlow) {
delay(1000)
emit(char)
}
}
A
o
P
r
P
a
L
n
E
g
e
- flatMapMerge 는 만들어진 Flow를 동시처리한다. concurrency 인자를 사용해 동시에 처리할 수 있는 Flow 수를 설정할 수 있다.
=> 기본값은 16이지만 JVM에서 DEFAULT_CONCURRENCY_PROPERTY_NAME 프로퍼티를 사용해 변경이 가능하다.
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {
flowOf("APPLE", "orange")
.flatMapLatest { fruit ->
fruit.asFlow()
}
.collect { letter ->
println(letter)
}
}
fun String.asFlow(): Flow<Char> = flow {
for (char in this@asFlow) {
delay(1000)
emit(char)
}
}
o
r
a
n
g
e
- flatMapLast는 새로운 Flow가 나타나면 이전에 처리하던 Flow를 잊는 형태이다.
g. retry 와 retryWhen
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val fruits = flow {
listOf("Apple", "Orange", "Melon").forEach {
if (it == "Orange") {
throw IllegalStateException()
}
emit(it)
}
}
fruits
.retry(retries = 2) { cause ->
println(cause.message)
true
}
.collect { value ->
println("collect : $value")
}
}
collect : Apple
null
collect : Apple
null
collect : Apple
Exception in thread "main" java.lang.IllegalStateException
at com.example.service.Week9Kt$main$1$fruits$1.invokeSuspend(Week9.kt:11)
- 예외는 Flow를 따라 흐르면서 각 단계를 하나씩 종료하는데 종료된 단계는 비활성화 되어서 예외가 발생한 뒤에 메세지를 보낸는것은 불가능하지만
각 단계가 이전 단계에 대한 참조를 가지고 있어서 Flow를 다시 시작하기 위해 참조를 사용할 수 있다. 이를 이용한 것인 retry와 retryWhen이다.
=> retryWhen은 Flow의 이전 단계에서 예외가 발생할때마다 predicate를 확인하고 재시도할지 여부를 결정한다.
h. distinctUntilChanged
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
flowOf("Apple", "Apple", "Orange", "Orange", "Melon", "Apple")
.distinctUntilChanged()
.collect {
println(it)
}
}
Apple
Orange
Melon
Apple
- 반복되는 원소가 동일하다고 판단되면 제거하는 함수로 바로 이전의 원소와 동일 원소만 제거한다.
=> 비교할 선택자가 필요한 경우에는 distinctUntilChangedBy 를 사용하면 된다.
i. 최종연산 함수
- 위에 fold와 scan 이외에도 count, first(firstOrNull), reduce 등 다양한 최종연산 함수가 있기 때문에 잘 활용해보면 좋을것 같다.
'공부 > Coroutine' 카테고리의 다른 글
<Coroutine> 12. SharedFlow(공유플로우)와 StateFlow(상태플로우) (1) | 2024.09.17 |
---|---|
<Coroutine> 10. 코루틴 Flow(플로우) 살펴보기 (2) | 2024.09.01 |
<Coroutine> 9. Hot & Cold Data Source (0) | 2024.08.24 |
<Coroutine> 8. 채널(Channel), 셀렉트(Select) (0) | 2024.08.15 |
<Coroutine> 7. 코루틴 단위 테스트 (1) | 2024.07.23 |
블로그의 정보
57개월 BackEnd
BFine