You will be fine

<Coroutine> 12. SharedFlow(공유플로우)와 StateFlow(상태플로우)

by BFine
반응형


출처&참고 : 
https://m.yes24.com/Goods/Detail/123034354

 

코틀린 코루틴 - 예스24

코틀린 전문 강사가 알려 주는 코틀린 코루틴에 대한 모든 것!코틀린 코루틴은 효율적이고 신뢰할 수 있는 멀티스레드 프로그램을 쉽게 구현할 수 있게 해 주어 자바 가상 머신(JVM), 특히 안드로

m.yes24.com

 

가.  SharedFlow

 a. 무엇인가

  -  일반적으로 플로우는 콜드데이터라서 요청할때마다 값이 계산된다. 여러 개의 수신자가 하나의 데이터를 변경되는지 감지하는 경우도 종종 있다.

      => 이때 메일링 리스트와 비슷한 개념인 공유플로우(SharedFlow)를 사용한다.

 

 b. MutableSharedFlow

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch

suspend fun main() = coroutineScope {
    val sharedFlow = MutableSharedFlow<String>()

    launch {
        sharedFlow.collect {
            println("1번 $it")
        }
    }

    launch {
        sharedFlow.collect {
            println("2번 $it")
        }
    }

    delay(100L)
    sharedFlow.emit("Hello")
    sharedFlow.emit("World")
    sharedFlow.emit("!!")
}

2번 Hello
1번 Hello
2번 World
1번 World
2번 !!
1번 !!

  -  브로드캐스트 채널과 비슷한 MutableSharedFlow를 이용하면 대기하고 있는 여러 코루틴에서 메세지를 전달할 수 있다.

      => 주의할점은 자식코루틴이 MutableSharedFlow를 감지하고 있는 상태이므로 프로세스가 종료되지 않는다.

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch

suspend fun main() = coroutineScope {
    val sharedFlow = MutableSharedFlow<String>(
        replay = 3
    )

    launch {
        sharedFlow.collect {
            println("1번 $it")
        }
    }

    launch {
        sharedFlow.collect {
            println("2번 $it")
        }
    }

    delay(100L)
    sharedFlow.emit("Hello")
    sharedFlow.emit("World")
    sharedFlow.emit("!!")

    delay(1000L)
    println("## ${sharedFlow.replayCache}")
    sharedFlow.resetReplayCache()
    println("## ${sharedFlow.replayCache}")
}

1번 Hello
2번 Hello
2번 World
1번 World
2번 !!
1번 !!
## [Hello, World, !!]
## []

  -  replay 인자를 설정하면 마지막으로 전송한 값들이 정해진 수 만큼 저장된다. 

  -  코틀린에서는 감지만하는 인터페이스와 변경하는 인터페이스를 구분하는 것이 관행이라고 한다.

      => MutableSharedFlow도 역시 살펴보면 ShardFlow 인터페이스와 FlowCollector 인터페이스를 상속하고 있는것을 볼 수 있다. 

 

 c. shareIn 

  -  Flow는 사용자 액션, 데이터베이스 변경, 새로운 메세지와 같은 변화를 감지할때 주로 사용된다.

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch

suspend fun main(): Unit = coroutineScope {
    val flow = flowOf("Hello", "World", "!!")
        .onEach {
            delay(1000L)
        }
    val sharedFlow = flow.shareIn(
        scope = this,
        started = SharingStarted.Eagerly
    )

    launch {
        sharedFlow.collect {
            println("1번 $it")
        }
    }

    delay(2000)

    launch {
        sharedFlow.collect {
            println("2번 $it")
        }
    }
}

1번 Hello
2번 World
1번 World
2번 !!
1번 !!

  -  다양한 클래스가 변화를 감지하는 상황에서 하나의 Flow로 여러 개의 Flow를 만들고 싶을때는 SharedFlow를 사용하면 된다.

      => 이때 Flow를 SharedFlow로 바꾸는 함수가 shareIn 이다.

  -  두번째 인자인 stared 의 값에 따라서 값을 언제부터 감지할지 결정한다.

  -  SharingStarted.Eagerly : 즉시 값을 감지하며 시작하기 전에 위의 예제처럼 값이 나오면 유실될 수 도 있다.


import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch

suspend fun main(): Unit = coroutineScope {
    val flow = flowOf("Hello", "World", "!!")
        .onEach {
            println("onEach")
            delay(1000L)
        }
    val sharedFlow = flow.shareIn(
        scope = this,
        started = SharingStarted.Lazily
    )

    launch {
        println("1번 코루틴 시작")
        sharedFlow.collect {
            println("1번 $it")
        }
    }

    delay(2000)

    launch {
        sharedFlow.collect {
            println("2번 $it")
        }
    }
}

1번 코루틴 시작
onEach
onEach
1번 Hello
2번 World
onEach
1번 World
2번 !!
1번 !!

  -  SharingStarted.Lazily : 첫번째 구독자가 나올때 감지를 시작하고 첫번째 구독자는 모든 값을 수신하는 것이 보장된다. 

                                         => 구독자가 없으면 replay 인자에 설정한 수만큼 가장 최근값들만 캐싱 


import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch

suspend fun main(): Unit = coroutineScope {
    val flow = flowOf("Hello", "World", "!!")
        .onStart { println("시작") }
        .onEach { delay(1000L) }
        .onCompletion { println("완료!") }
    val sharedFlow = flow.shareIn(
        scope = this,
        started = SharingStarted.WhileSubscribed()
    )

    delay(1000L)
    launch {
        println("1번 코루틴 시작")
        println("1번 : ${sharedFlow.first()}")
    }

    launch {

        println("2번 : ${sharedFlow.first()}")
    }

    delay(3000)

    launch {
        sharedFlow.collect {
            println("3번 $it")
        }
    }
}

1번 코루틴 시작
시작
1번 : Hello
2번 : Hello
완료!
시작
3번 Hello
3번 World
완료!
3번 !!

  -  SharingStarted.WhileSubscribed() : 첫번째 구독자가 나올때 감지를 시작하고 마지막 구독자가 사라지면 Flow도 멈추며

                                                           새로운 구독자가 나오면 다시 시작된다.

      

나.  StateFlow

 a. 무엇인가?

  -  StateFlow 는 SharedFlow 의 개념을 확장시킨것으로 replay 인자 값이 1인 SharedFlow 와 비슷하게 작동한다.

      => value 프로퍼티로 접근 가능한 값 하나를 항상 가지고 있다.

  -  번외) 코틀린에서 open val 프로퍼티는 var 프로퍼티로 오버라이드 할 수 있다. (open val은 get, var는 set 만 지원한다.)

 

 b.  MutableShareFlow


import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch

suspend fun main(): Unit = coroutineScope {
    val stateFlow = MutableStateFlow("Hello")

    launch {
        stateFlow.collect {
            println("1번 : $it")

        }
    }
    
    delay(1000L)

    stateFlow.value = "World"
    launch {
        stateFlow.collect {
            println("2번 : $it")
        }
    }
}

1번 : Hello
1번 : World
2번 : World

  -  StateFlow는 위의 예시에서 볼 수 있듯이 상태를 나타내고 변화를 감지해야할때 주로 사용되며 초기값은 필수로 설정해주어야한다.


import com.example.dispatcher
import com.typesafe.config.ConfigException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch

suspend fun main(): Unit = coroutineScope {
    val stateFlow = MutableStateFlow("Hello")

    launch {
        listOf("Hello","World", "!!").forEach { word ->
            println("${Thread.currentThread().name} 1번 : $word 로 변경")
            stateFlow.value = word
        }
    }

    launch {
        stateFlow.collect {
            println("${Thread.currentThread().name} 2번 : $it")
        }
    }
}

DefaultDispatcher-worker-2 2번 : Hello
DefaultDispatcher-worker-1 1번 : Hello 로 변경
DefaultDispatcher-worker-1 1번 : World 로 변경
DefaultDispatcher-worker-1 1번 : !! 로 변경
DefaultDispatcher-worker-2 2번 : !!

  -  StateFlow는 데이터가 덮어쓰기때문에 감지가 느린경우 중간 변화를 받을 수도 있다. 모든 이벤트를 다받으려면 sharedFlow 를 사용해야한다.

      => 현재상태만 가지고 있는 것이 StateFlow의 특징이므로 이전 상태에는 관심을 가지지 않는다.

 

 c. stateIn 

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch

suspend fun main(): Unit = coroutineScope {

    val stateFlow : StateFlow<String> = flowOf("Hello","World", "!!")
        .onEach {
            delay(1000L)
            println("${Thread.currentThread().name} onEach")
        }
        .stateIn(this)

    launch {
        stateFlow.collect {
            println("${Thread.currentThread().name} 1번 : $it")
        }
    }

    launch {
        stateFlow.collect {
            println("${Thread.currentThread().name} 2번 : $it")
        }
    }
}

DefaultDispatcher-worker-1 onEach
DefaultDispatcher-worker-3 2번 : Hello
DefaultDispatcher-worker-2 1번 : Hello
DefaultDispatcher-worker-3 onEach
DefaultDispatcher-worker-2 2번 : World
DefaultDispatcher-worker-3 1번 : World
DefaultDispatcher-worker-2 onEach
DefaultDispatcher-worker-3 1번 : !!
DefaultDispatcher-worker-2 2번 : !!

  -  stateIn은 Flow를 StateFlow로 변환하는 함수로 scope 내에서만 호출 가능한 일시중단함수 이다.

  -  위의 예시에서 알수 있듯이 StateFlow는 항상 값을 가져야하기때문에 명시하지 않았을때는 첫 번째 값이 계산될 때까지 기다려야한다.

  -  shareIn과 동일하게 인자로 started가 있어 언제부터 값을 감지할것인지 정할 수 있다.

반응형

블로그의 정보

57개월 BackEnd

BFine

활동하기