<Coroutine> 12. SharedFlow(공유플로우)와 StateFlow(상태플로우)
by BFine
출처&참고 : https://m.yes24.com/Goods/Detail/123034354
가. SharedFlow
a. 무엇인가
- 일반적으로 플로우는 콜드데이터라서 요청할때마다 값이 계산된다. 여러 개의 수신자가 하나의 데이터를 변경되는지 감지하는 경우도 종종 있다.
=> 이때 메일링 리스트와 비슷한 개념인 공유플로우(SharedFlow)를 사용한다.
b. MutableSharedFlow
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
suspend fun main() = coroutineScope {
val sharedFlow = MutableSharedFlow<String>()
launch {
sharedFlow.collect {
println("1번 $it")
}
}
launch {
sharedFlow.collect {
println("2번 $it")
}
}
delay(100L)
sharedFlow.emit("Hello")
sharedFlow.emit("World")
sharedFlow.emit("!!")
}
2번 Hello
1번 Hello
2번 World
1번 World
2번 !!
1번 !!
- 브로드캐스트 채널과 비슷한 MutableSharedFlow를 이용하면 대기하고 있는 여러 코루틴에서 메세지를 전달할 수 있다.
=> 주의할점은 자식코루틴이 MutableSharedFlow를 감지하고 있는 상태이므로 프로세스가 종료되지 않는다.
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
suspend fun main() = coroutineScope {
val sharedFlow = MutableSharedFlow<String>(
replay = 3
)
launch {
sharedFlow.collect {
println("1번 $it")
}
}
launch {
sharedFlow.collect {
println("2번 $it")
}
}
delay(100L)
sharedFlow.emit("Hello")
sharedFlow.emit("World")
sharedFlow.emit("!!")
delay(1000L)
println("## ${sharedFlow.replayCache}")
sharedFlow.resetReplayCache()
println("## ${sharedFlow.replayCache}")
}
1번 Hello
2번 Hello
2번 World
1번 World
2번 !!
1번 !!
## [Hello, World, !!]
## []
- replay 인자를 설정하면 마지막으로 전송한 값들이 정해진 수 만큼 저장된다.
- 코틀린에서는 감지만하는 인터페이스와 변경하는 인터페이스를 구분하는 것이 관행이라고 한다.
=> MutableSharedFlow도 역시 살펴보면 ShardFlow 인터페이스와 FlowCollector 인터페이스를 상속하고 있는것을 볼 수 있다.
c. shareIn
- Flow는 사용자 액션, 데이터베이스 변경, 새로운 메세지와 같은 변화를 감지할때 주로 사용된다.
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("Hello", "World", "!!")
.onEach {
delay(1000L)
}
val sharedFlow = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly
)
launch {
sharedFlow.collect {
println("1번 $it")
}
}
delay(2000)
launch {
sharedFlow.collect {
println("2번 $it")
}
}
}
1번 Hello
2번 World
1번 World
2번 !!
1번 !!
- 다양한 클래스가 변화를 감지하는 상황에서 하나의 Flow로 여러 개의 Flow를 만들고 싶을때는 SharedFlow를 사용하면 된다.
=> 이때 Flow를 SharedFlow로 바꾸는 함수가 shareIn 이다.
- 두번째 인자인 stared 의 값에 따라서 값을 언제부터 감지할지 결정한다.
- SharingStarted.Eagerly : 즉시 값을 감지하며 시작하기 전에 위의 예제처럼 값이 나오면 유실될 수 도 있다.
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("Hello", "World", "!!")
.onEach {
println("onEach")
delay(1000L)
}
val sharedFlow = flow.shareIn(
scope = this,
started = SharingStarted.Lazily
)
launch {
println("1번 코루틴 시작")
sharedFlow.collect {
println("1번 $it")
}
}
delay(2000)
launch {
sharedFlow.collect {
println("2번 $it")
}
}
}
1번 코루틴 시작
onEach
onEach
1번 Hello
2번 World
onEach
1번 World
2번 !!
1번 !!
- SharingStarted.Lazily : 첫번째 구독자가 나올때 감지를 시작하고 첫번째 구독자는 모든 값을 수신하는 것이 보장된다.
=> 구독자가 없으면 replay 인자에 설정한 수만큼 가장 최근값들만 캐싱
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("Hello", "World", "!!")
.onStart { println("시작") }
.onEach { delay(1000L) }
.onCompletion { println("완료!") }
val sharedFlow = flow.shareIn(
scope = this,
started = SharingStarted.WhileSubscribed()
)
delay(1000L)
launch {
println("1번 코루틴 시작")
println("1번 : ${sharedFlow.first()}")
}
launch {
println("2번 : ${sharedFlow.first()}")
}
delay(3000)
launch {
sharedFlow.collect {
println("3번 $it")
}
}
}
1번 코루틴 시작
시작
1번 : Hello
2번 : Hello
완료!
시작
3번 Hello
3번 World
완료!
3번 !!
- SharingStarted.WhileSubscribed() : 첫번째 구독자가 나올때 감지를 시작하고 마지막 구독자가 사라지면 Flow도 멈추며
새로운 구독자가 나오면 다시 시작된다.
나. StateFlow
a. 무엇인가?
- StateFlow 는 SharedFlow 의 개념을 확장시킨것으로 replay 인자 값이 1인 SharedFlow 와 비슷하게 작동한다.
=> value 프로퍼티로 접근 가능한 값 하나를 항상 가지고 있다.
- 번외) 코틀린에서 open val 프로퍼티는 var 프로퍼티로 오버라이드 할 수 있다. (open val은 get, var는 set 만 지원한다.)
b. MutableShareFlow
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
suspend fun main(): Unit = coroutineScope {
val stateFlow = MutableStateFlow("Hello")
launch {
stateFlow.collect {
println("1번 : $it")
}
}
delay(1000L)
stateFlow.value = "World"
launch {
stateFlow.collect {
println("2번 : $it")
}
}
}
1번 : Hello
1번 : World
2번 : World
- StateFlow는 위의 예시에서 볼 수 있듯이 상태를 나타내고 변화를 감지해야할때 주로 사용되며 초기값은 필수로 설정해주어야한다.
import com.example.dispatcher
import com.typesafe.config.ConfigException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
suspend fun main(): Unit = coroutineScope {
val stateFlow = MutableStateFlow("Hello")
launch {
listOf("Hello","World", "!!").forEach { word ->
println("${Thread.currentThread().name} 1번 : $word 로 변경")
stateFlow.value = word
}
}
launch {
stateFlow.collect {
println("${Thread.currentThread().name} 2번 : $it")
}
}
}
DefaultDispatcher-worker-2 2번 : Hello
DefaultDispatcher-worker-1 1번 : Hello 로 변경
DefaultDispatcher-worker-1 1번 : World 로 변경
DefaultDispatcher-worker-1 1번 : !! 로 변경
DefaultDispatcher-worker-2 2번 : !!
- StateFlow는 데이터가 덮어쓰기때문에 감지가 느린경우 중간 변화를 받을 수도 있다. 모든 이벤트를 다받으려면 sharedFlow 를 사용해야한다.
=> 현재상태만 가지고 있는 것이 StateFlow의 특징이므로 이전 상태에는 관심을 가지지 않는다.
c. stateIn
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
suspend fun main(): Unit = coroutineScope {
val stateFlow : StateFlow<String> = flowOf("Hello","World", "!!")
.onEach {
delay(1000L)
println("${Thread.currentThread().name} onEach")
}
.stateIn(this)
launch {
stateFlow.collect {
println("${Thread.currentThread().name} 1번 : $it")
}
}
launch {
stateFlow.collect {
println("${Thread.currentThread().name} 2번 : $it")
}
}
}
DefaultDispatcher-worker-1 onEach
DefaultDispatcher-worker-3 2번 : Hello
DefaultDispatcher-worker-2 1번 : Hello
DefaultDispatcher-worker-3 onEach
DefaultDispatcher-worker-2 2번 : World
DefaultDispatcher-worker-3 1번 : World
DefaultDispatcher-worker-2 onEach
DefaultDispatcher-worker-3 1번 : !!
DefaultDispatcher-worker-2 2번 : !!
- stateIn은 Flow를 StateFlow로 변환하는 함수로 scope 내에서만 호출 가능한 일시중단함수 이다.
- 위의 예시에서 알수 있듯이 StateFlow는 항상 값을 가져야하기때문에 명시하지 않았을때는 첫 번째 값이 계산될 때까지 기다려야한다.
- shareIn과 동일하게 인자로 started가 있어 언제부터 값을 감지할것인지 정할 수 있다.
'공부 > Coroutine' 카테고리의 다른 글
<Coroutine> 11. 코루틴 Flow(플로우)의 함수들 (2) | 2024.09.07 |
---|---|
<Coroutine> 10. 코루틴 Flow(플로우) 살펴보기 (2) | 2024.09.01 |
<Coroutine> 9. Hot & Cold Data Source (0) | 2024.08.24 |
<Coroutine> 8. 채널(Channel), 셀렉트(Select) (0) | 2024.08.15 |
<Coroutine> 7. 코루틴 단위 테스트 (1) | 2024.07.23 |
블로그의 정보
57개월 BackEnd
BFine