You will be fine

<Coroutine> 10. 코루틴 Flow(플로우) 살펴보기

by BFine
반응형

출처&참고 : https://m.yes24.com/Goods/Detail/123034354

가.  Flow 란 

 a.  다 끝날때까지 기다려야하나..?

import kotlinx.coroutines.*
import kotlin.random.Random

fun main(): Unit = runBlocking {

    val jobs :List<Deferred<Long>> = List(10) { hotelId ->
        async {
            getHotelPriceApi(hotelId = hotelId.toLong())
        }
    }.toList()

    val results = jobs.awaitAll()
    print(results)
}

suspend fun getHotelPriceApi(hotelId: Long) : Long  {
    delay(Random.nextInt(1000, 2000).toLong())
    return Random.nextLong(1000 * hotelId, 100000)
}

  -  위의 예시는 단순히 호텔의 실시간 가격을 비동기로 조회한다 비동기로 조회하기 때문에 일반적으로는 1초 살짝 넘게 걸릴것으로 예상된다.

  -  아쉬운 점은 모두 완료될때까지 기다려야한다는 점이 있다. 후속작업이 있다면 먼저 응답이 온거에 대해서 먼저 처리하면 좀 더 효율적일 것 같다.

      =>  물론 async 내에서 처리하면 될 수 있지만 후속작업이 복잡(추가 비동기 처리 등) 하고 다양해진다면 처리가 어려워질수 있다. 

 

 b. 무엇인가?

  -  Flow는 데이터의 흐름을 관리하고 필요할 때 데이터를 하나씩 제공해주는 역할을 한다. 즉 비동기 데이터 스트림을 처리하는 역할을 한다.

     => 데이터를 하나씩 주고 받는 느낌이 아닌 파이프를 연결해서 데이터를 흘려보내는 느낌이 아닐까 라는 생각이 들었다. ex) SSE, Websocket 등 

  -  위의 이미지에서 볼 수 있듯이 Flow는 단순 일시중단 함수 collect 하나를 가진 인터페이스이다.

  -  Flow는 어떤 연산을 실행할지 정의한 것이며 일시중단 가능한 람다식에 몇가지 요소를 추가한 것으로 볼 수 있다.

 

 c. 예시바꾸기

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.Random

fun main(): Unit = runBlocking {

    val startTime = System.currentTimeMillis()
    (1..10).asFlow().flatMapMerge { hotelId ->
        flow {
            val price = getHotelPriceApi(hotelId.toLong())
            emit(price) // 데이터를 발행하는 함수
        }
    }.collect {
        val endTime = System.currentTimeMillis()
        println("${it}원 시간 ${endTime - startTime} ms")
    }
}


suspend fun getHotelPriceApi(hotelId: Long) : Long  {
    delay(Random.nextInt(1000, 2000).toLong())
    return Random.nextLong(1000 * hotelId, 100000)
}

// 결과
// 9887원 시간 1094 ms
// 45767원 시간 1222 ms
// 93580원 시간 1222 ms
// 49082원 시간 1448 ms
// 30988원 시간 1518 ms
// 53519원 시간 1556 ms
// 38161원 시간 1606 ms
// 77145원 시간 1750 ms
// 2409원 시간 1776 ms
// 37978원 시간 1966 ms

  -  Flow를 사용해서 위의 예시를 변경해보니 모든 결과를 기다리지 않고 응답받은대로 결과를 출력하는 것을 볼 수 있다..!

      => 이처럼 원소를 하나씩 계산해야 할때는 원소가 나오자마자 바로 얻을 수 있는 것이 낫다

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.Random

fun main(): Unit = runBlocking {
    withTimeoutOrNull(5000) {
       flow {
           repeat(10) {
              val price = getHotelPriceApi(hotelId = it.toLong())
               println("${it+1}번 ${price}원")
               emit(price)
           }
       }.collect()
    }
}


suspend fun getHotelPriceApi(hotelId: Long) : Long  {
    delay(Random.nextInt(1000, 2000).toLong())
    return Random.nextLong(1000 * hotelId, 100000)
}

// 결과
// 1번 48347원
// 2번 76426원
// 3번 80341원

  -  추가로 코루틴의 취소처리와 동일하게 Flow도 취소가 가능하다.

 

 d. sequnce 와 비교

  -  sequence는 Flow와 비슷하게 cold stream 특성을 가지고 있지만 내부에 일시중단 지점이 있으면 스레드가 블로킹 된다. (yield, yieldAll외에 사용불가)

      =>  위의 이미지에서 볼 수 있듯이 컴파일 오류가 발생하도록 되어있다. 

  -  sequence는 데이터소스의 개수가 많거나 원소가 무거운 경우 원소를 필요할때만 계산하거나 읽는 지연 연산 하게 되는 경우에 적합하다.

 

 e. Flow 특징 

  -  Flow의 최종 연산은 스레드를 블로킹 하는 대신에 코루틴을 중단시킨다. 코루틴 컨텍스를 활용하는 등 코루틴 기능도 제공하고 있다.

      => 최종 연산이 실행 될때 부모 코루틴과의 관계가 정립된다. (corotineScope 함수와 비슷)

  -  자바의 stream과 동일하게 시작연산, 중간연산, 최종연산으로 구분 되며 중간연산은 시작연산과 최종 연산 사이에서 Flow를 변경하는 등의 역할을 한다.

     => onEach ~ catch 까지는 중간연산으로 볼 수 있다.

  -  위의 예시에서 볼 수 있듯이 flow는 코루틴 스코프(+suspend 함수)가 있어야 사용할 수 있다.

 

 f. Flow 구현원리 따라하기

import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking


fun main() {


    // 1. 단순 람다식 구현
    val f: () -> Unit = {
        println("Hello")
        println("World")
        println("!!")
    }
    f()
    f()

    println()

    // 2. 일시중단 함수로 변경
    runBlocking {
        val f: suspend () -> Unit = {
            println("Hello")
            delay(1000L)
            println("World")
            delay(1000L)
            println("!!")
        }
        f()
        f()
    }


    // 3. 파라미터를 가진 람다식 형태로 변경
    runBlocking {
        val f: suspend ((String) -> Unit) -> Unit = {
            it("Hello")
            it("World")
            it("!!")
        }

        f { println(it)}
        f { println(it)}
    }

    // 4. 이떄 it(전달람다식)은 중단함수가 되어야함
    runBlocking {
        val f: suspend (FlowCollector) -> Unit = {
            it.emit("Hello")
            it.emit("World")
            it.emit("!!")
        }

        f { println(it)}
        f { println(it)}
    }

    // 5. it.emit() 하지 않게 FlowCollector를 리시버로 변경
    runBlocking {
        val f: suspend FlowCollector.() -> Unit = {
            emit("Hello")
            emit("World")
            emit("!!")
        }

        f { println(it)}
        f { println(it)}
    }

    // 6. 람다식을 전달하는 대신에 인터페이스를 구현한 객체를 만드는 편이 낫다
    runBlocking {
        val builder : suspend FlowCollector.() -> Unit = {
            emit("Hello")
            emit("World")
            emit("!!")
        }

        val flow = object : Flow {
            override suspend fun collect(collector: FlowCollector) {
                collector.builder()
            }
        }
        flow.collect { println(it) }
        flow.collect { println(it) }
    }

    // 7. Flow 생성을 간단하게 변경
    runBlocking {
        val flow = flow {
            emit("Hello")
            emit("World")
            emit("!!")
        }

        flow.collect { println(it) }
        flow.collect { println(it) }
    }

}

fun interface FlowCollector {
    suspend fun emit(value: String)
}

fun interface Flow {
    suspend fun collect(collector: FlowCollector)
}

fun flow (builder: suspend FlowCollector.() -> Unit) = Flow { collector -> collector.builder() }

  -  위의 예시는 실제 구현된 방식과 동일하다고 한다. (책에는 제네릭까지 설정함)

import kotlinx.coroutines.runBlocking

fun main() {
    var order = 1
    runBlocking {
        flowOf("Hello", "World" , "!!")
            .map {
                it.lowercase()
            }
            .filter { it.length > 2 }
            .onEach { println(order++) }
            .collect {
                println(it)
            }
    }

}


suspend fun Flow.map(transformation : suspend (String) -> String ) = flow {
    collect {
        emit(transformation(it))
    }
}

suspend fun Flow.filter(predicate: suspend (String) -> Boolean) = flow {
    collect {
        if (predicate(it)) {
            emit(it)
        }
    }
}

suspend fun Flow.onEach(action: suspend () -> Unit) = flow {
    collect {
        action()
        emit(it)
    }
}

// .. 생략

1
hello
2
world

  -  flow 함수들도 위 처럼 직접 만들어 볼 수 있는데 Flow 처리 함수들은 Flow를 새로운 연산으로 데코레이트 해야한다.

      => Flow가 시작되면 래핑하고 있는 Flow를 다시 시작하게 되므로 내부에서 collect 메서드를 호출하고 방출한다.

  -  Flow는 중단 함수처럼 동기로 작동하기 때문에 Flow가 완료될때까지 collect 함수가 중단된다.

      => 즉 Flow는 새로운 코루틴을 시작하지 않는다.(각각의 처리 단계는 동기)

 

 g. Flow의 공유상태

import kotlinx.coroutines.runBlocking

fun main() {
    runBlocking {
        flowOf("Hello", "World!!")
            .count1()
            .collect{
                println(it)
            }
    }

    println()

    runBlocking {
        flowOf("Hello", "World!!")
            .count2()
            .collect{
                println(it)
            }
    }
}

fun Flow.count1() = flow {
    collect{
        var count = 0
        (1..it.length).forEach {
            count++
        }
        emit(count.toString()) // 편의상 String으로 변환
    }
}

fun Flow.count2() = flow {
    var count = 0
    collect{
        (1..it.length).forEach {
            count++
        }
        emit(count.toString())
    }
}

// .. 생략
5
7

5
12

  -  Flow 처리를 통해 복잡한 알고리즘을 구현해야할 때는 언제 변수에 대한 접근을 동기화해야하는지 알아야 한다.

  -  커스텀한 Flow 를 구현할 때 각 단계가 동기로 작동하기 때문에 동기화 없이도 Flow 내부에 변경 가능한 상태를 정의할 수 있다.

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlin.random.Random

suspend fun main() {
    coroutineScope {
        val f1 = List(1000) { "$it" }.asFlow()
        val f2 = List(1000) { "$it" }.asFlow().counter1()

        launch {
            println(f1.counter1().last()) // 1000
            println(f1.counter1().last()) // 1000
        }
        launch { println(f1.counter1().last())} // 1000
        launch { println(f2.last()) } // 1000
        launch { println(f2.last()) } // 1000
    }

    coroutineScope {
        val f1 = List(1000) { "$it" }.asFlow()
        val f2 = List(1000) { "$it" }.asFlow().counter2()

        launch {
            println(f1.counter2().last())  // 1000
            println(f1.counter2().last())  // 1000
        }
        launch { println(f1.counter2().last())} // 1000
        launch { println(f2.last()) } // 실행할때 마다 값이다름
        launch { println(f2.last()) } // 실행할때 마다 값이다름
    }

}

fun Flow<*>.counter1() = flow {
    var counter = 0
    collect {
        counter++
        List(100) { Random.nextLong() }.shuffled().sorted()
        emit(counter)
    }
}

fun Flow<*>.counter2() : Flow<Int> {
    var counter = 0
    return this.map {
        counter++
        List(100) { Random.nextLong() }.shuffled().sorted()
        counter
    }
}

  -  흔히 저지르는 실수 중 하나가 외부 변수를 추출해서 함수에서 사용하는 것이다. 외부변수는 같은 Flow가 모으는 모든 코루틴이 공유하게 되기 때문이다.

      => 이런 경우에는 동기화가 필수이고 Flow 컬렉션이 아닌 Flow 에 종속 되어버린다.

 

나.  Flow 빌더

다양한 Flow 빌더가 있다..!

 a. flow

fun main(): Unit = runBlocking {
    flow {
        var x = BigInteger.ZERO
        var y = BigInteger.ONE
        while (true) {
            delay(100L)
            emit(x)
            x = y.also {
                y += x
            }
        }
    } // 실행되지않음
    fibonacci().take(5).collect { println(it) }
}

fun fibonacci(): Flow<BigInteger> = flow {
    var x = BigInteger.ZERO
    var y = BigInteger.ONE
    while (true) {
        delay(100L)
        emit(x)
        x = y.also {
            y += x
        }
    }
}

  -  작성한 로직에 대해서 하나의 Flow 타입으로 발행하며 다양한 이벤트나 데이터 변경사항에 대해서 순차처리 하는 함수이다.

     => 내부적으로 코루틴을 사용하여 비동기적으로 데이터를 처리하기 때문에 suspend 키워드를 사용하지 않아도 된다.

  -  flow 는 대표적인 Cold Stream으로 collect가 될때까지 블록 내부 코드가 실행되지 않는다는 특징이 있다.

  -  flow 빌더를 호출하면 단지 객체를 만들 뿐이고 collect를 호출해야 FlowCollector 인터페이스의 block 함수를 호출하게 되는 구조이다.

      => 다른 flow 빌더들도 이런 원리에 기초하여 만들어져있다.!

   

 b. asFlow 

  -  확장함수로 다양한 타입에 대해 Flow 로 변경해주는 함수이다.

import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.*

@OptIn(FlowPreview::class)
suspend fun main() {
    ::normalFunction
        .asFlow()
        .collect()
}

suspend fun normalFunction() : String = "Hello"

  -  위의 예시처럼 일반함수도 메서드 참조 (::)를 활용하면 결과값을 Flow로 변환할수 있다.

  리액티브 스트림 Flux, Flowable, Observable 등도 kotlinx-coroutines-reactive 라이브러리의 asFlow를 사용해 Flow로 변호나 가능하다.

 

 c. flowOf 

  -  고정된 값(원시)들에 대해서 Flow를 생성하며 위의 코드에서 볼수 있듯이 각각의 요소들을 바로 발행한다. 

 

 d. channelFlow 

  -  채널(Channel)을 기반으로 데이터를 발행하는 Flow를 생성하는 함수이다.

       =>  여기서 채널은 비동기 처리에서 코루틴 간에 데이터를 전송하기 위한 것이라고 보면 된다.

  -  channelFlow 는 ProducerScope 에서 작동하며 CoroutineScope를 구현했기 떄문에 빌더에서 새로운 코루틴을 시작할때 사용할수 있다.

     => 다른 코루틴 처럼 channelFlow 도 자식 코루틴이 종료 상태가 될떄까지 끝나지 않는다.

  -  주로 여러 개의 값을 독립적으로 계산해야 할 때 channelFlow를 주로 사용한다.

     => 책에는 모든유저를 API로 페이징 조회하고 특정페이지에 찾는 유저가 있는 경우 이때 API조회/찾는 부분을 독립적으로 처리하는 예시가 있다.

 

 e. callbackFlow 

  -  channelFlow랑 비슷하지만 가장 큰 차이점은 callbackFlow가 콜백함수를 래핑하는 방식으로 구현되어있는 부분이다.  

 

다.  Flow 주요 기능 

 a.  map, filter

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main(): Unit = runBlocking {
    flowOf(1,2,3)
        .map { Pair(it, it * it) }
        .filter { it.second < 5 }
        .collect {
            println(it)
        }
}
// 결과
// (1, 1)
// (2, 4)

  -  위의 예시에서 볼수 있듯이 Java의 Stream에서 제공하는 함수와 동일 기능을 한다.

 

 c. flatMap 친구들

  - Java의 flatMap 과 동일한 느낌으로 Flow들을 연결해서 단일 Flow 로 만드는 기능을 한다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.measureTimeMillis

fun main() = runBlocking {

    val elapsedTime = measureTimeMillis {
        val resultFlow = flowOf(1, 2, 3).flatMapConcat {
            flow {
                delay(1000)
                emit("$it")
            }
        }

        resultFlow.collect { value ->
            println(value)
        }
    }

    println("$elapsedTime ms")
}

// 결과
// 1
// 2
// 3
// 3025 ms

  - flatMapConcat 은 각 요소에 대해 순차적으로 처리한다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.measureTimeMillis

fun main() = runBlocking {

    val elapsedTime = measureTimeMillis {
        val resultFlow = flowOf(1, 2, 3).flatMapMerge {
            flow {
                delay(1000)
                emit("$it")
            }
        }

        resultFlow.collect { value ->
            println(value)
        }
    }

    println("$elapsedTime ms")
}

// 결과
// 1
// 2
// 3
// 1033 ms

  - flatMapMerge 은 각 요소에 대해 병렬로 처리한다. 내부 파라미터로 concurrency가 있기 때문에 조절가능하다

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.measureTimeMillis

fun main() = runBlocking {

    val elapsedTime = measureTimeMillis {
        val resultFlow = flowOf(1, 2, 3).flatMapLatest {
            flow {
                delay(1000)
                emit("$it")
            }
        }

        resultFlow.collect { value ->
            println(value)
        }
    }

    println("$elapsedTime ms")
}

// 결과
// 3
// 1031 ms

  - flatMapLatest 는 가장 최신의 Flow만 구독하기 때문에 이전 Flow들은 취소된다.

 

 d. take, drop 

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {

    val resultFlow = flowOf(1, 2, 3)

    println("take")
    resultFlow.take(2).collect {
        println(it)
    }

    println("drop")
    resultFlow.drop(1).collect {
        println(it)
    }

}

  -  take는 Flow들 중에 앞에서 몇개를 가져올지이며 drop은 앞에서 몇개를 제외할지에 대한 기능이다.

 

 e. combine

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val flow1 = flowOf("Hello", "Good", "Nice")

    val flow2 = flowOf("World", "Morning", "To Meet You")

    val flow3 = flowOf("!")

    val combinedFlow = flow1.combine(flow2) { str1, str2 ->
        "$str1 $str2" // 두 문자열을 결합
    }.combine(flow3) { prev, str3 ->
        "${prev}${str3}"
    }

    combinedFlow.collect { value ->
        println(value)
    }
}

// 결과
// Hello World!
// Good Morning!
// Nice To Meet You!

  -  combine은 Flow들을 결합하는 역할을 한다. 

 

반응형

블로그의 정보

57개월 BackEnd

BFine

활동하기