<Coroutine> 10. 코루틴 Flow(플로우) 살펴보기
by BFine출처&참고 : https://m.yes24.com/Goods/Detail/123034354
가. Flow 란
a. 다 끝날때까지 기다려야하나..?
import kotlinx.coroutines.*
import kotlin.random.Random
fun main(): Unit = runBlocking {
val jobs :List<Deferred<Long>> = List(10) { hotelId ->
async {
getHotelPriceApi(hotelId = hotelId.toLong())
}
}.toList()
val results = jobs.awaitAll()
print(results)
}
suspend fun getHotelPriceApi(hotelId: Long) : Long {
delay(Random.nextInt(1000, 2000).toLong())
return Random.nextLong(1000 * hotelId, 100000)
}
- 위의 예시는 단순히 호텔의 실시간 가격을 비동기로 조회한다 비동기로 조회하기 때문에 일반적으로는 1초 살짝 넘게 걸릴것으로 예상된다.
- 아쉬운 점은 모두 완료될때까지 기다려야한다는 점이 있다. 후속작업이 있다면 먼저 응답이 온거에 대해서 먼저 처리하면 좀 더 효율적일 것 같다.
=> 물론 async 내에서 처리하면 될 수 있지만 후속작업이 복잡(추가 비동기 처리 등) 하고 다양해진다면 처리가 어려워질수 있다.
b. 무엇인가?
- Flow는 데이터의 흐름을 관리하고 필요할 때 데이터를 하나씩 제공해주는 역할을 한다. 즉 비동기 데이터 스트림을 처리하는 역할을 한다.
=> 데이터를 하나씩 주고 받는 느낌이 아닌 파이프를 연결해서 데이터를 흘려보내는 느낌이 아닐까 라는 생각이 들었다. ex) SSE, Websocket 등
- 위의 이미지에서 볼 수 있듯이 Flow는 단순 일시중단 함수 collect 하나를 가진 인터페이스이다.
- Flow는 어떤 연산을 실행할지 정의한 것이며 일시중단 가능한 람다식에 몇가지 요소를 추가한 것으로 볼 수 있다.
c. 예시바꾸기
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.Random
fun main(): Unit = runBlocking {
val startTime = System.currentTimeMillis()
(1..10).asFlow().flatMapMerge { hotelId ->
flow {
val price = getHotelPriceApi(hotelId.toLong())
emit(price) // 데이터를 발행하는 함수
}
}.collect {
val endTime = System.currentTimeMillis()
println("${it}원 시간 ${endTime - startTime} ms")
}
}
suspend fun getHotelPriceApi(hotelId: Long) : Long {
delay(Random.nextInt(1000, 2000).toLong())
return Random.nextLong(1000 * hotelId, 100000)
}
// 결과
// 9887원 시간 1094 ms
// 45767원 시간 1222 ms
// 93580원 시간 1222 ms
// 49082원 시간 1448 ms
// 30988원 시간 1518 ms
// 53519원 시간 1556 ms
// 38161원 시간 1606 ms
// 77145원 시간 1750 ms
// 2409원 시간 1776 ms
// 37978원 시간 1966 ms
- Flow를 사용해서 위의 예시를 변경해보니 모든 결과를 기다리지 않고 응답받은대로 결과를 출력하는 것을 볼 수 있다..!
=> 이처럼 원소를 하나씩 계산해야 할때는 원소가 나오자마자 바로 얻을 수 있는 것이 낫다
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.Random
fun main(): Unit = runBlocking {
withTimeoutOrNull(5000) {
flow {
repeat(10) {
val price = getHotelPriceApi(hotelId = it.toLong())
println("${it+1}번 ${price}원")
emit(price)
}
}.collect()
}
}
suspend fun getHotelPriceApi(hotelId: Long) : Long {
delay(Random.nextInt(1000, 2000).toLong())
return Random.nextLong(1000 * hotelId, 100000)
}
// 결과
// 1번 48347원
// 2번 76426원
// 3번 80341원
- 추가로 코루틴의 취소처리와 동일하게 Flow도 취소가 가능하다.
d. sequnce 와 비교
- sequence는 Flow와 비슷하게 cold stream 특성을 가지고 있지만 내부에 일시중단 지점이 있으면 스레드가 블로킹 된다. (yield, yieldAll외에 사용불가)
=> 위의 이미지에서 볼 수 있듯이 컴파일 오류가 발생하도록 되어있다.
- sequence는 데이터소스의 개수가 많거나 원소가 무거운 경우 원소를 필요할때만 계산하거나 읽는 지연 연산 하게 되는 경우에 적합하다.
e. Flow 특징
- Flow의 최종 연산은 스레드를 블로킹 하는 대신에 코루틴을 중단시킨다. 코루틴 컨텍스를 활용하는 등 코루틴 기능도 제공하고 있다.
=> 최종 연산이 실행 될때 부모 코루틴과의 관계가 정립된다. (corotineScope 함수와 비슷)
- 자바의 stream과 동일하게 시작연산, 중간연산, 최종연산으로 구분 되며 중간연산은 시작연산과 최종 연산 사이에서 Flow를 변경하는 등의 역할을 한다.
=> onEach ~ catch 까지는 중간연산으로 볼 수 있다.
- 위의 예시에서 볼 수 있듯이 flow는 코루틴 스코프(+suspend 함수)가 있어야 사용할 수 있다.
f. Flow 구현원리 따라하기
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
fun main() {
// 1. 단순 람다식 구현
val f: () -> Unit = {
println("Hello")
println("World")
println("!!")
}
f()
f()
println()
// 2. 일시중단 함수로 변경
runBlocking {
val f: suspend () -> Unit = {
println("Hello")
delay(1000L)
println("World")
delay(1000L)
println("!!")
}
f()
f()
}
// 3. 파라미터를 가진 람다식 형태로 변경
runBlocking {
val f: suspend ((String) -> Unit) -> Unit = {
it("Hello")
it("World")
it("!!")
}
f { println(it)}
f { println(it)}
}
// 4. 이떄 it(전달람다식)은 중단함수가 되어야함
runBlocking {
val f: suspend (FlowCollector) -> Unit = {
it.emit("Hello")
it.emit("World")
it.emit("!!")
}
f { println(it)}
f { println(it)}
}
// 5. it.emit() 하지 않게 FlowCollector를 리시버로 변경
runBlocking {
val f: suspend FlowCollector.() -> Unit = {
emit("Hello")
emit("World")
emit("!!")
}
f { println(it)}
f { println(it)}
}
// 6. 람다식을 전달하는 대신에 인터페이스를 구현한 객체를 만드는 편이 낫다
runBlocking {
val builder : suspend FlowCollector.() -> Unit = {
emit("Hello")
emit("World")
emit("!!")
}
val flow = object : Flow {
override suspend fun collect(collector: FlowCollector) {
collector.builder()
}
}
flow.collect { println(it) }
flow.collect { println(it) }
}
// 7. Flow 생성을 간단하게 변경
runBlocking {
val flow = flow {
emit("Hello")
emit("World")
emit("!!")
}
flow.collect { println(it) }
flow.collect { println(it) }
}
}
fun interface FlowCollector {
suspend fun emit(value: String)
}
fun interface Flow {
suspend fun collect(collector: FlowCollector)
}
fun flow (builder: suspend FlowCollector.() -> Unit) = Flow { collector -> collector.builder() }
- 위의 예시는 실제 구현된 방식과 동일하다고 한다. (책에는 제네릭까지 설정함)
import kotlinx.coroutines.runBlocking
fun main() {
var order = 1
runBlocking {
flowOf("Hello", "World" , "!!")
.map {
it.lowercase()
}
.filter { it.length > 2 }
.onEach { println(order++) }
.collect {
println(it)
}
}
}
suspend fun Flow.map(transformation : suspend (String) -> String ) = flow {
collect {
emit(transformation(it))
}
}
suspend fun Flow.filter(predicate: suspend (String) -> Boolean) = flow {
collect {
if (predicate(it)) {
emit(it)
}
}
}
suspend fun Flow.onEach(action: suspend () -> Unit) = flow {
collect {
action()
emit(it)
}
}
// .. 생략
1
hello
2
world
- flow 함수들도 위 처럼 직접 만들어 볼 수 있는데 Flow 처리 함수들은 Flow를 새로운 연산으로 데코레이트 해야한다.
=> Flow가 시작되면 래핑하고 있는 Flow를 다시 시작하게 되므로 내부에서 collect 메서드를 호출하고 방출한다.
- Flow는 중단 함수처럼 동기로 작동하기 때문에 Flow가 완료될때까지 collect 함수가 중단된다.
=> 즉 Flow는 새로운 코루틴을 시작하지 않는다.(각각의 처리 단계는 동기)
g. Flow의 공유상태
import kotlinx.coroutines.runBlocking
fun main() {
runBlocking {
flowOf("Hello", "World!!")
.count1()
.collect{
println(it)
}
}
println()
runBlocking {
flowOf("Hello", "World!!")
.count2()
.collect{
println(it)
}
}
}
fun Flow.count1() = flow {
collect{
var count = 0
(1..it.length).forEach {
count++
}
emit(count.toString()) // 편의상 String으로 변환
}
}
fun Flow.count2() = flow {
var count = 0
collect{
(1..it.length).forEach {
count++
}
emit(count.toString())
}
}
// .. 생략
5
7
5
12
- Flow 처리를 통해 복잡한 알고리즘을 구현해야할 때는 언제 변수에 대한 접근을 동기화해야하는지 알아야 한다.
- 커스텀한 Flow 를 구현할 때 각 단계가 동기로 작동하기 때문에 동기화 없이도 Flow 내부에 변경 가능한 상태를 정의할 수 있다.
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlin.random.Random
suspend fun main() {
coroutineScope {
val f1 = List(1000) { "$it" }.asFlow()
val f2 = List(1000) { "$it" }.asFlow().counter1()
launch {
println(f1.counter1().last()) // 1000
println(f1.counter1().last()) // 1000
}
launch { println(f1.counter1().last())} // 1000
launch { println(f2.last()) } // 1000
launch { println(f2.last()) } // 1000
}
coroutineScope {
val f1 = List(1000) { "$it" }.asFlow()
val f2 = List(1000) { "$it" }.asFlow().counter2()
launch {
println(f1.counter2().last()) // 1000
println(f1.counter2().last()) // 1000
}
launch { println(f1.counter2().last())} // 1000
launch { println(f2.last()) } // 실행할때 마다 값이다름
launch { println(f2.last()) } // 실행할때 마다 값이다름
}
}
fun Flow<*>.counter1() = flow {
var counter = 0
collect {
counter++
List(100) { Random.nextLong() }.shuffled().sorted()
emit(counter)
}
}
fun Flow<*>.counter2() : Flow<Int> {
var counter = 0
return this.map {
counter++
List(100) { Random.nextLong() }.shuffled().sorted()
counter
}
}
- 흔히 저지르는 실수 중 하나가 외부 변수를 추출해서 함수에서 사용하는 것이다. 외부변수는 같은 Flow가 모으는 모든 코루틴이 공유하게 되기 때문이다.
=> 이런 경우에는 동기화가 필수이고 Flow 컬렉션이 아닌 Flow 에 종속 되어버린다.
나. Flow 빌더
a. flow
fun main(): Unit = runBlocking {
flow {
var x = BigInteger.ZERO
var y = BigInteger.ONE
while (true) {
delay(100L)
emit(x)
x = y.also {
y += x
}
}
} // 실행되지않음
fibonacci().take(5).collect { println(it) }
}
fun fibonacci(): Flow<BigInteger> = flow {
var x = BigInteger.ZERO
var y = BigInteger.ONE
while (true) {
delay(100L)
emit(x)
x = y.also {
y += x
}
}
}
- 작성한 로직에 대해서 하나의 Flow 타입으로 발행하며 다양한 이벤트나 데이터 변경사항에 대해서 순차처리 하는 함수이다.
=> 내부적으로 코루틴을 사용하여 비동기적으로 데이터를 처리하기 때문에 suspend 키워드를 사용하지 않아도 된다.
- flow 는 대표적인 Cold Stream으로 collect가 될때까지 블록 내부 코드가 실행되지 않는다는 특징이 있다.
- flow 빌더를 호출하면 단지 객체를 만들 뿐이고 collect를 호출해야 FlowCollector 인터페이스의 block 함수를 호출하게 되는 구조이다.
=> 다른 flow 빌더들도 이런 원리에 기초하여 만들어져있다.!
b. asFlow
- 확장함수로 다양한 타입에 대해 Flow 로 변경해주는 함수이다.
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.*
@OptIn(FlowPreview::class)
suspend fun main() {
::normalFunction
.asFlow()
.collect()
}
suspend fun normalFunction() : String = "Hello"
- 위의 예시처럼 일반함수도 메서드 참조 (::)를 활용하면 결과값을 Flow로 변환할수 있다.
- 리액티브 스트림 Flux, Flowable, Observable 등도 kotlinx-coroutines-reactive 라이브러리의 asFlow를 사용해 Flow로 변호나 가능하다.
c. flowOf
- 고정된 값(원시)들에 대해서 Flow를 생성하며 위의 코드에서 볼수 있듯이 각각의 요소들을 바로 발행한다.
d. channelFlow
- 채널(Channel)을 기반으로 데이터를 발행하는 Flow를 생성하는 함수이다.
=> 여기서 채널은 비동기 처리에서 코루틴 간에 데이터를 전송하기 위한 것이라고 보면 된다.
- channelFlow 는 ProducerScope 에서 작동하며 CoroutineScope를 구현했기 떄문에 빌더에서 새로운 코루틴을 시작할때 사용할수 있다.
=> 다른 코루틴 처럼 channelFlow 도 자식 코루틴이 종료 상태가 될떄까지 끝나지 않는다.
- 주로 여러 개의 값을 독립적으로 계산해야 할 때 channelFlow를 주로 사용한다.
=> 책에는 모든유저를 API로 페이징 조회하고 특정페이지에 찾는 유저가 있는 경우 이때 API조회/찾는 부분을 독립적으로 처리하는 예시가 있다.
e. callbackFlow
- channelFlow랑 비슷하지만 가장 큰 차이점은 callbackFlow가 콜백함수를 래핑하는 방식으로 구현되어있는 부분이다.
다. Flow 주요 기능
a. map, filter
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main(): Unit = runBlocking {
flowOf(1,2,3)
.map { Pair(it, it * it) }
.filter { it.second < 5 }
.collect {
println(it)
}
}
// 결과
// (1, 1)
// (2, 4)
- 위의 예시에서 볼수 있듯이 Java의 Stream에서 제공하는 함수와 동일 기능을 한다.
c. flatMap 친구들
- Java의 flatMap 과 동일한 느낌으로 Flow들을 연결해서 단일 Flow 로 만드는 기능을 한다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.measureTimeMillis
fun main() = runBlocking {
val elapsedTime = measureTimeMillis {
val resultFlow = flowOf(1, 2, 3).flatMapConcat {
flow {
delay(1000)
emit("$it")
}
}
resultFlow.collect { value ->
println(value)
}
}
println("$elapsedTime ms")
}
// 결과
// 1
// 2
// 3
// 3025 ms
- flatMapConcat 은 각 요소에 대해 순차적으로 처리한다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.measureTimeMillis
fun main() = runBlocking {
val elapsedTime = measureTimeMillis {
val resultFlow = flowOf(1, 2, 3).flatMapMerge {
flow {
delay(1000)
emit("$it")
}
}
resultFlow.collect { value ->
println(value)
}
}
println("$elapsedTime ms")
}
// 결과
// 1
// 2
// 3
// 1033 ms
- flatMapMerge 은 각 요소에 대해 병렬로 처리한다. 내부 파라미터로 concurrency가 있기 때문에 조절가능하다
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.measureTimeMillis
fun main() = runBlocking {
val elapsedTime = measureTimeMillis {
val resultFlow = flowOf(1, 2, 3).flatMapLatest {
flow {
delay(1000)
emit("$it")
}
}
resultFlow.collect { value ->
println(value)
}
}
println("$elapsedTime ms")
}
// 결과
// 3
// 1031 ms
- flatMapLatest 는 가장 최신의 Flow만 구독하기 때문에 이전 Flow들은 취소된다.
d. take, drop
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val resultFlow = flowOf(1, 2, 3)
println("take")
resultFlow.take(2).collect {
println(it)
}
println("drop")
resultFlow.drop(1).collect {
println(it)
}
}
- take는 Flow들 중에 앞에서 몇개를 가져올지이며 drop은 앞에서 몇개를 제외할지에 대한 기능이다.
e. combine
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flow1 = flowOf("Hello", "Good", "Nice")
val flow2 = flowOf("World", "Morning", "To Meet You")
val flow3 = flowOf("!")
val combinedFlow = flow1.combine(flow2) { str1, str2 ->
"$str1 $str2" // 두 문자열을 결합
}.combine(flow3) { prev, str3 ->
"${prev}${str3}"
}
combinedFlow.collect { value ->
println(value)
}
}
// 결과
// Hello World!
// Good Morning!
// Nice To Meet You!
- combine은 Flow들을 결합하는 역할을 한다.
'공부 > Coroutine' 카테고리의 다른 글
<Coroutine> 12. SharedFlow(공유플로우)와 StateFlow(상태플로우) (1) | 2024.09.17 |
---|---|
<Coroutine> 11. 코루틴 Flow(플로우)의 함수들 (2) | 2024.09.07 |
<Coroutine> 9. Hot & Cold Data Source (0) | 2024.08.24 |
<Coroutine> 8. 채널(Channel), 셀렉트(Select) (0) | 2024.08.15 |
<Coroutine> 7. 코루틴 단위 테스트 (1) | 2024.07.23 |
블로그의 정보
57개월 BackEnd
BFine